Informer 机制简介
在Kubernetes系统中,组件之间通过HTTP协议进行通信,在不依赖任何中间件的情况下需要保证消息的实时性、可靠性、顺序性等,Kubernetes则通过使用cient-go的Informer机制达到这一效果。其他组件也是通过Informer
机制与Kubernetes API Server进行通信的。
Informer
是负责与Kubernetes APIServer进行watch操作,watch的资源,可以是k8s内置资源对象,也可以是CRD。
Informer
是一个带有本地缓存以及索引机制的核心工具包,当请求为查询操作的时候,会优先从本地缓存中去查找数据,而创建、更新、删除,这类操作,则会更加事件通知写入到队列DeltaFIFO中,同时对应的事件处理过后,更新本地缓存,使本地缓存与ETCD的数据保持一致。
Informer抽象出来的这个缓存层,将查询相关的压力接收下来,这样就不用每次调用APIServer的接口,减轻了APIServer的数据交互压力。
Informer机制架构设计
Informer运行原理图:
Informer
代码示例:
Informers Example代码示例如下:
package main
import (
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"log"
"time"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", "etc/kube.conf")
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
stopCh := make(chan struct{})
defer close(stopCh)
sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)
informer := sharedInformers.Core().V1().Pods().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("New Pod Added to Store:%s", mObj.GetName())
},
UpdateFunc: func(oldObj, newObj interface{}) {
oObj := oldObj.(v1.Object)
nObj := newObj.(v1.Object)
log.Printf("%s Pod Updated to %s", oObj.GetName(), nObj.GetName())
},
DeleteFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("Pod Deleted from Store:%s", mObj.GetName())
},
})
informer.Run(stopCh)
}
首先通过kubernetes.NewForConfig创建clientset对象,Informer需要通过ClientSet与Kubernetes API Server进行交互。另外,创建stopCh对象,该对象用于在程序进程退出之前通知Informer提前退出,因为Informer是一个持久运行的goroutine。informers.NewSharedInformerFactory函数实例化了SharedInformer对象,它接收两个参数:第1个参数clientset是用于与Kubernetes API Server交互的客户端,第2个参数time.Minute用于设置多久进行一次resync(重新同步),resync会周期性地执行List操作,将所有的资源存放在Informer Store中,如果该参数为0,则禁用resync功能。
在Informers Example代码示例中,通过sharedInformers.Core().V1().Pods().Informer可以得到具体Pod资源的informer对象。通过informer.AddEventHandler函数可以为Pod资源添加资源事件回调方法,支持3种资源事件回调方法,分别介绍如下。
● AddFunc:当创建Pod资源对象时触发的事件回调方法。
● UpdateFunc:当更新Pod资源对象时触发的事件回调方法。
● DeleteFunc:当删除Pod资源对象时触发的事件回调方法。在正常的情况下,Kubernetes的其他组件在使用Informer机制时触发资源事件回调方法,将资源对象推送到WorkQueue或其他队列中(实际过程中大都是这样的),在InformersExample代码示例中,我们直接输出触发的资源事件。最后通过informer.Run函数运行当前的Informer,内部为Pod资源类型创建Informer。
每一个Kubernetes资源上都实现了Informer机制。每一个Informer上都会实现Informer和Lister方法,例如PodInformer,代码示例如下
// PodInformer provides access to a shared informer and lister for
// Pods.
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}
用不同资源的Informer,代码示例如下:
podInformer := sharedInformers.Core().V1().Pods().Informer()
deploymentInformer := sharedInformers.Apps().V1().Deployments().Informer()
定义不同资源的Informer,允许监控不同资源的资源事件,例如,监听deployment资源对象,当Kubernetes集群中deployment发生变化时,client-go能够及时收到资源对象的变更信息。
shared informer
可以认为 informer都是 shared informer
Informer也被称为Shared Informer,它是可以共享使用的。在用client-go编写代码程序时,若同一资源的Informer被实例化了多次,每个Informer使用一个Reflector,那么会运行过多相同的ListAndWatch,太多重复的序列化和反序列化操作会导致Kubernetes API Server负载过重。Shared Informer可以使同一类资源Informer共享一个Reflector,这样可以节约很多资源。通过map数据结构实现共享的Informer机制。SharedInformer定义了一个map数据结构,用于存放所有Informer的字段,代码示例如下
package main
import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"log"
)
type PodHandler struct{}
func (this *PodHandler) OnAdd(obj interface{}) {
fmt.Println(obj.(*v1.Pod).Name)
}
func (this *PodHandler) OnUpdate(oldObj, newObj interface{}) {
}
func (this *PodHandler) OnDelete(obj interface{}) {
fmt.Println("Delete: ", obj.(*v1.Pod).Name)
}
func main() {
configPath := "etc/kube.conf"
config, err := clientcmd.BuildConfigFromFlags("", configPath)
if err != nil {
log.Fatal(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatal(err)
}
stopCh := make(chan struct{})
defer close(stopCh)
sharedInformer := informers.NewSharedInformerFactory(clientset, 0)
podInformer := sharedInformer.Core().V1().Pods().Informer()
podInformer.AddEventHandler(&PodHandler{})
podInformer.Run(stopCh)
}