🌞

Flannel源码分析

Flannel源码分析

前言

flannel作为kubernetes的一种网络解决方案,在社区是比较活跃的。支持多种backend。

flannel源码地址在:https://github.com/coreos/flannel

从官网上把flannel的代码clone下来,目录结构如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
$ ll -ah
total 222K
drwxr-xr-x 1 user 197121    0 8月  30 18:37 ./
drwxr-xr-x 1 user 197121    0 8月  30 18:31 ../
-rw-r--r-- 1 user 197121  351 8月  30 18:33 .appveyor.yml
-rw-r--r-- 1 user 197121   56 8月  30 18:33 .dockerignore
drwxr-xr-x 1 user 197121    0 9月  17 09:34 .git/
drwxr-xr-x 1 user 197121    0 8月  30 18:33 .github/
-rw-r--r-- 1 user 197121  147 8月  30 18:33 .gitignore
drwxr-xr-x 1 user 197121    0 9月  17 11:22 .idea/
-rw-r--r-- 1 user 197121  411 8月  30 18:33 .travis.yml
drwxr-xr-x 1 user 197121    0 8月  30 18:33 backend/
-rw-r--r-- 1 user 197121 3.7K 8月  30 18:33 bill-of-materials.json
-rw-r--r-- 1 user 197121   98 8月  30 18:33 bill-of-materials.override.json
-rw-r--r-- 1 user 197121 3.1K 8月  30 18:33 code-of-conduct.md
-rw-r--r-- 1 user 197121 2.5K 8月  30 18:33 CONTRIBUTING.md
-rw-r--r-- 1 user 197121 1.5K 8月  30 18:33 DCO
drwxr-xr-x 1 user 197121    0 8月  30 18:33 dist/
-rw-r--r-- 1 user 197121  427 8月  30 18:33 Dockerfile.amd64
-rw-r--r-- 1 user 197121  504 8月  30 18:33 Dockerfile.arm
-rw-r--r-- 1 user 197121  494 8月  30 18:33 Dockerfile.arm64
-rw-r--r-- 1 user 197121  508 8月  30 18:33 Dockerfile.ppc64le
-rw-r--r-- 1 user 197121  504 8月  30 18:33 Dockerfile.s390x
drwxr-xr-x 1 user 197121    0 8月  30 18:33 Documentation/
-rw-r--r-- 1 user 197121  11K 8月  30 18:33 glide.lock
-rw-r--r-- 1 user 197121 1.6K 8月  30 18:33 glide.yaml
-rwxr-xr-x 1 user 197121  298 8月  30 18:33 header-check.sh*
drwxr-xr-x 1 user 197121    0 8月  30 18:33 images/
-rw-r--r-- 1 user 197121  12K 8月  30 18:33 LICENSE
drwxr-xr-x 1 user 197121    0 8月  30 18:33 logos/
-rw-r--r-- 1 user 197121  21K 8月  30 18:33 main.go
-rw-r--r-- 1 user 197121   98 8月  30 18:33 MAINTAINERS
-rw-r--r-- 1 user 197121  12K 8月  30 18:33 Makefile
drwxr-xr-x 1 user 197121    0 8月  30 18:33 network/
-rw-r--r-- 1 user 197121  131 8月  30 18:33 NOTICE
-rw-r--r-- 1 user 197121  212 8月  30 18:33 OWNERS
-rw-r--r-- 1 user 197121  75K 8月  30 18:33 packet-01.png
drwxr-xr-x 1 user 197121    0 8月  30 18:33 pkg/
-rw-r--r-- 1 user 197121 4.3K 8月  30 18:33 README.md
drwxr-xr-x 1 user 197121    0 8月  30 18:33 subnet/
drwxr-xr-x 1 user 197121    0 8月  30 18:33 vendor/
drwxr-xr-x 1 user 197121    0 8月  30 18:33 version/

这里介绍几个主要目录的用途:

  • backend,flannel支持多种flannel,比如host-gw、vxlan等
  • dist,一些脚本和dockerfile文件
  • network,对主机的iptables进行操作
  • pkg,外部可引用的包
  • subnet,flannel中每个host对应一个subnet
  • ./main.go,flannel程序的入口

经过上述分析,分析flannel的源代码就从./main.go文件入手。

1.参数解析

./main.go中结构体CmdLineOpts包含了所有命令行传入的参数解析,具体如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
type CmdLineOpts struct {
	etcdEndpoints          string
	etcdPrefix             string
	etcdKeyfile            string
	etcdCertfile           string
	etcdCAFile             string
	etcdUsername           string
	etcdPassword           string
    // 上述的etcd参数,指的是如果采用etcd作为后端subnet存储的话,要提供上述etcd连接参数
	help                   bool
	version                bool
    // 帮助和打印出version
	kubeSubnetMgr          bool
    // 如果上述kubeSubnetMgr为true,则使用kube-apiserver为subnet的后端存储(其实是将每个node的subnet信息存入到了node的annotations中)
	kubeApiUrl             string
	kubeAnnotationPrefix   string
	kubeConfigFile         string
    // 上述三个kube相关的参数标识连接kube-apiserver的必要参数
	iface                  flagSlice
	ifaceRegex             flagSlice
    // iface参数,用来作为多节点通信的网卡,不指定的话,flannel会使用默认网卡和其上IP信息
	ipMasq                 bool
    // ipMasq (具体意义待定)
	subnetFile             string
    // 存储subnet的文件,默认为/run/flannel/subnet.env,其中存储了整个集群的pod subnet段,本机上pod的subnet段等信息
	subnetDir              string
	publicIP               string
    // 用来跨节点通信的IP地址,和上述iface参数类似
	subnetLeaseRenewMargin int
    // subnet是以lease(租约)的形式存储起来的,到期会自动过期。需要定时续租,改时间表示在lease过期前多少时间续租,单位是min,默认值是60
	healthzIP              string
	healthzPort            int
    // 用来健康检查的IP和PORT
	charonExecutablePath   string
	charonViciUri          string
	iptablesResyncSeconds  int
    // 同步iptables规则的时间间隔。默认5s同步一次
	iptablesForwardRules   bool
    // add default accept rules to FORWARD chain in iptables
	netConfPath            string
    // networkConfiguration PATH
}

main.go中首先是init函数对命令行传入参数进行赋值,没有赋值的采用默认值。然后进到main()函数中。

2.main函数结构

首先看一下main函数的主要结构,然后分步分析:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
func main() {
	// 如果是打印版本,则打印退出
	if opts.version {
		fmt.Fprintln(os.Stderr, version.Version)
		os.Exit(0)
	}

	flagutil.SetFlagsFromEnv(flannelFlags, "FLANNELD")

	// Validate flags
    // 验证参数,如果续租时间大于24*60或者小于等于0,则直接报错。
    // 因为默认的租约时间为24*60
	if opts.subnetLeaseRenewMargin >= 24*60 || opts.subnetLeaseRenewMargin <= 0 {
		log.Error("Invalid subnet-lease-renew-margin option, out of acceptable range")
		os.Exit(1)
	}
    
    // Work out which interface to use
    // 找出externalInterface,很简单,如果命令行参数指定了使用哪个网卡,则直接使用,若没指定,则需要自己去找默认的网卡,主要是LookupExtIface这个函数来寻找。这个函数中所调用的ip.GetDefaultGatewayIface()是位于pkg目录下,在自己写程序来获取linux主机默认网卡和ip信息的时候也可以直接引用该函数
	var extIface *backend.ExternalInterface
	var err error
	// Check the default interface only if no interfaces are specified
	if len(opts.iface) == 0 && len(opts.ifaceRegex) == 0 {
		extIface, err = LookupExtIface("", "")
		if err != nil {
			log.Error("Failed to find any valid interface to use: ", err)
			os.Exit(1)
		}
	} else {
		// Check explicitly specified interfaces
		for _, iface := range opts.iface {
			extIface, err = LookupExtIface(iface, "")
			if err != nil {
				log.Infof("Could not find valid interface matching %s: %s", iface, err)
			}

			if extIface != nil {
				break
			}
		}

		// Check interfaces that match any specified regexes
		if extIface == nil {
			for _, ifaceRegex := range opts.ifaceRegex {
				extIface, err = LookupExtIface("", ifaceRegex)
				if err != nil {
					log.Infof("Could not find valid interface matching %s: %s", ifaceRegex, err)
				}

				if extIface != nil {
					break
				}
			}
		}

		if extIface == nil {
			// Exit if any of the specified interfaces do not match
			log.Error("Failed to find interface to use that matches the interfaces and/or regexes provided")
			os.Exit(1)
		}
	}
    
    // 第一步:创建SubnetManager,主要是用来管理subnet的。
    sm, err := newSubnetManager()
	if err != nil {
		log.Error("Failed to create SubnetManager: ", err)
		os.Exit(1)
	}
	log.Infof("Created subnet manager: %s", sm.Name())
    
    // 下述这段代码是控制程序优雅退出的,首先是创建了sigs channel,类型为os.Signal,buffer为1.
    // 此处两个golang的用法,第一个是context,第二个是sync.waitGroup
    // Register for SIGINT and SIGTERM
	log.Info("Installing signal handlers")
	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)

	// This is the main context that everything should run in.
	// All spawned goroutines should exit when cancel is called on this context.
	// Go routines spawned from main.go coordinate using a WaitGroup. This provides a mechanism to allow the shutdownHandler goroutine
	// to block until all the goroutines return . If those goroutines spawn other goroutines then they are responsible for
	// blocking and returning only when cancel() is called.
	// 创建全局的ctx
    ctx, cancel := context.WithCancel(context.Background())
	
    // 创建任务
    wg := sync.WaitGroup{}

   // 添加一个任务
	wg.Add(1)
	go func() {
        // shutdownHander中如果监听到系统信号,则调用cancel函数,那么所有的调用都将终止
		shutdownHandler(ctx, sigs, cancel)
		wg.Done()
	}()
    
    // 如果定义了健康检查端口,则要启动一个http服务监听该端口
    if opts.healthzPort > 0 {
		// It's not super easy to shutdown the HTTP server so don't attempt to stop it cleanly
		go mustRunHealthz()
	}
    
    // 第二步:创建backend,并且通过该backend来注册subnet
    // Create a backend manager then use it to create the backend and register the network with it.
	bm := backend.NewManager(ctx, sm, extIface)
	be, err := bm.GetBackend(config.BackendType)
	if err != nil {
		log.Errorf("Error fetching backend: %s", err)
		cancel()
		wg.Wait()
		os.Exit(1)
	}

    // 注册subnet
	bn, err := be.RegisterNetwork(ctx, wg, config)
	if err != nil {
		log.Errorf("Error registering network: %s", err)
		cancel()
		wg.Wait()
		os.Exit(1)
	}
    
    // 第三步:如果指定了ipMasq,则定期同步iptables
    // Set up ipMasq if needed
	if opts.ipMasq {
		if err = recycleIPTables(config.Network, bn.Lease()); err != nil {
			log.Errorf("Failed to recycle IPTables rules, %v", err)
			cancel()
			wg.Wait()
			os.Exit(1)
		}
		log.Infof("Setting up masking rules")
		go network.SetupAndEnsureIPTables(network.MasqRules(config.Network, bn.Lease()), opts.iptablesResyncSeconds)
	}
    
    // 第四步:iptablesForwardRules指定的话,定期去同步指定的iptables
    // Always enables forwarding rules. This is needed for Docker versions >1.13 (https://docs.docker.com/engine/userguide/networking/default_network/container-communication/#container-communication-between-hosts)
	// In Docker 1.12 and earlier, the default FORWARD chain policy was ACCEPT.
	// In Docker 1.13 and later, Docker sets the default policy of the FORWARD chain to DROP.
	if opts.iptablesForwardRules {
		log.Infof("Changing default FORWARD chain policy to ACCEPT")
		go network.SetupAndEnsureIPTables(network.ForwardRules(config.Network.String()), opts.iptablesResyncSeconds)
	}
    
    // 第五步:写入subnet file
    if err := WriteSubnetFile(opts.subnetFile, config.Network, opts.ipMasq, bn); err != nil {
		// Continue, even though it failed.
		log.Warningf("Failed to write subnet file: %s", err)
	} else {
		log.Infof("Wrote subnet file to %s", opts.subnetFile)
	}
    
    // 第六步:backend run起来
    // Start "Running" the backend network. This will block until the context is done so run in another goroutine.
	log.Info("Running backend.")
	wg.Add(1)
	go func() {
		bn.Run(ctx)
		wg.Done()
	}()
    
    // 第七步:kube subnet mgr监听subnet lease
    // Kube subnet mgr doesn't lease the subnet for this node - it just uses the podCidr that's already assigned.
	if !opts.kubeSubnetMgr {
		err = MonitorLease(ctx, sm, bn, &wg)
		if err == errInterrupted {
			// The lease was "revoked" - shut everything down
			cancel()
		}
	}

    
    log.Info("Waiting for all goroutines to exit")
	// Block waiting for all the goroutines to finish.
	wg.Wait()
	log.Info("Exiting cleanly...")
	os.Exit(0)
    

上述main函数中列出了7个主要的步骤,下面依次分析。

3.第一步:创建SubnetManager

main.go中的newSubnetManager函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func newSubnetManager() (subnet.Manager, error) {
	if opts.kubeSubnetMgr {
		return kube.NewSubnetManager(opts.kubeApiUrl, opts.kubeConfigFile, opts.kubeAnnotationPrefix, opts.netConfPath)
	}

	cfg := &etcdv2.EtcdConfig{
		Endpoints: strings.Split(opts.etcdEndpoints, ","),
		Keyfile:   opts.etcdKeyfile,
		Certfile:  opts.etcdCertfile,
		CAFile:    opts.etcdCAFile,
		Prefix:    opts.etcdPrefix,
		Username:  opts.etcdUsername,
		Password:  opts.etcdPassword,
	}

	// Attempt to renew the lease for the subnet specified in the subnetFile
	prevSubnet := ReadCIDRFromSubnetFile(opts.subnetFile, "FLANNEL_SUBNET")

	return etcdv2.NewLocalManager(cfg, prevSubnet)
}

如果使用了kubeSubnetMgr,则调用kube.NewSubnetManager,否则调用etcdv2。这一点需要说明的是etcd发展到现在支持不同的版本,v2和v3,且v3版本具有很多优势。flannel仓库中v2版本相关的代码提交是2 years之前,因此可见目前flannel实际上只支持kube-apiserver作为后端存储,也是默认的存储方式。

./flannel/subnet/kube/kube.go:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
type kubeSubnetManager struct {
	// 往node的yaml文件中添加的annotations
	annotations    annotations
	// 与kube-apiserver交互的client
	client         clientset.Interface
    // 运行在哪个node上
	nodeName       string
    // node lister
	nodeStore      listers.NodeLister
    // node controller
	nodeController cache.Controller
	subnetConf     *subnet.Config
	events         chan subnet.Event
}

下面进入flannel/subnet/kube/kube.go里的NewSubnetManager函数中,看看都干了啥:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func NewSubnetManager(apiUrl, kubeconfig, prefix, netConfPath string) (subnet.Manager, error) {
	//先看看入参函数:kube-apiserver的地址,kubeconfig的地址,prefix是给node打anno的时候前缀,netConfPath是配置存储目录,实际上安装完成之后,是通过configmap的形式挂载到/etc/kube-flannel/net-conf.json里的
	var cfg *rest.Config
	var err error
	// Try to build kubernetes config from a master url or a kubeconfig filepath. If neither masterUrl
	// or kubeconfigPath are passed in we fall back to inClusterConfig. If inClusterConfig fails,
	// we fallback to the default config.
	cfg, err = clientcmd.BuildConfigFromFlags(apiUrl, kubeconfig)
	if err != nil {
		return nil, fmt.Errorf("fail to create kubernetes config: %v", err)
	}

	c, err := clientset.NewForConfig(cfg)
	if err != nil {
		return nil, fmt.Errorf("unable to initialize client: %v", err)
	}
    // 上述是通过提供的配置,创建clientset,用来连接kube-apiserver

	// The kube subnet mgr needs to know the k8s node name that it's running on so it can annotate it.
	// If we're running as a pod then the POD_NAME and POD_NAMESPACE will be populated and can be used to find the node
	// name. Otherwise, the environment variable NODE_NAME can be passed in.
	// 获取nodename
    nodeName := os.Getenv("NODE_NAME")
	if nodeName == "" {
		podName := os.Getenv("POD_NAME")
		podNamespace := os.Getenv("POD_NAMESPACE")
		if podName == "" || podNamespace == "" {
			return nil, fmt.Errorf("env variables POD_NAME and POD_NAMESPACE must be set")
		}

		pod, err := c.Pods(podNamespace).Get(podName, metav1.GetOptions{})
		if err != nil {
			return nil, fmt.Errorf("error retrieving pod spec for '%s/%s': %v", podNamespace, podName, err)
		}
		nodeName = pod.Spec.NodeName
		if nodeName == "" {
			return nil, fmt.Errorf("node name not present in pod spec '%s/%s'", podNamespace, podName)
		}
	}

    // 读取网络配置文件
	netConf, err := ioutil.ReadFile(netConfPath)
	if err != nil {
		return nil, fmt.Errorf("failed to read net conf: %v", err)
	}

    // 解析网络配置
	sc, err := subnet.ParseConfig(string(netConf))
	if err != nil {
		return nil, fmt.Errorf("error parsing subnet config: %s", err)
	}

    // 初始化并运行subnetManager,传入的参数有clientset,网络配置,nodename,anno前缀,具体的new和Run函数下文分析
	sm, err := newKubeSubnetManager(c, sc, nodeName, prefix)
	if err != nil {
		return nil, fmt.Errorf("error creating network manager: %s", err)
	}
	go sm.Run(context.Background())

    // 下面确保hasSynced
	glog.Infof("Waiting %s for node controller to sync", nodeControllerSyncTimeout)
	err = wait.Poll(time.Second, nodeControllerSyncTimeout, func() (bool, error) {
		return sm.nodeController.HasSynced(), nil
	})
	if err != nil {
		return nil, fmt.Errorf("error waiting for nodeController to sync state: %v", err)
	}
	glog.Infof("Node controller sync successful")

	return sm, nil
}

下面去看看如何newKubeSubnetManager和让其Run起来的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName, prefix string) (*kubeSubnetManager, error) {
   var err error
   var ksm kubeSubnetManager
   ksm.annotations, err = newAnnotations(prefix)
   if err != nil {
      return nil, err
   }
   ksm.client = c
   ksm.nodeName = nodeName
   ksm.subnetConf = sc
   ksm.events = make(chan subnet.Event, 5000)
   indexer, controller := cache.NewIndexerInformer(
      &cache.ListWatch{
         ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
            return ksm.client.CoreV1().Nodes().List(options)
         },
         WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
            return ksm.client.CoreV1().Nodes().Watch(options)
         },
      },
      &v1.Node{},
      resyncPeriod,
      cache.ResourceEventHandlerFuncs{
         AddFunc: func(obj interface{}) {
            ksm.handleAddLeaseEvent(subnet.EventAdded, obj)
         },
         UpdateFunc: ksm.handleUpdateLeaseEvent,
         DeleteFunc: func(obj interface{}) {
            node, isNode := obj.(*v1.Node)
            // We can get DeletedFinalStateUnknown instead of *api.Node here and we need to handle that correctly.
            if !isNode {
               deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
               if !ok {
                  glog.Infof("Error received unexpected object: %v", obj)
                  return
               }
               node, ok = deletedState.Obj.(*v1.Node)
               if !ok {
                  glog.Infof("Error deletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
                  return
               }
               obj = node
            }
            ksm.handleAddLeaseEvent(subnet.EventRemoved, obj)
         },
      },
      cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
   )
   ksm.nodeController = controller
   ksm.nodeStore = listers.NewNodeLister(indexer)
   return &ksm, nil
}

总结一下上述函数都干了啥:

  • 初始化kubeSubnetManager
  • ksm主要是对集群中node进行监听,因为flannel是根据node来划分网段的
  • 根据监听到的node的事件,放入到ksm的events channel中

此外,ksm.Run函数就是运行起来。

4.第二步:创建backend

首先看backend/manager.go中的NewManager函数

1
2
3
4
5
6
7
8
func NewManager(ctx context.Context, sm subnet.Manager, extIface *ExternalInterface) Manager {
   return &manager{
      ctx:      ctx,
      sm:       sm,
      extIface: extIface,
      active:   make(map[string]Backend),
   }
}

只是返回了一个manager对象:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type manager struct {
   ctx      context.Context
    // 上文创建好的ksm
   sm       subnet.Manager
    // 用来与外界通信的interface
   extIface *ExternalInterface
    // 互斥锁
   mux      sync.Mutex
   active   map[string]Backend
   wg       sync.WaitGroup
}

下面看bm.GetBackend(config.BackendType)函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (bm *manager) GetBackend(backendType string) (Backend, error) {
    //传入的参数是config的BackendType,此配置项是通过configmap写入到flannel的pod中的,因此是一个固定的值,比如vxlan
   bm.mux.Lock()
   defer bm.mux.Unlock()

   betype := strings.ToLower(backendType)
   // see if one is already running
   if be, ok := bm.active[betype]; ok {
      return be, nil
   }

   // first request, need to create and run it
   befunc, ok := constructors[betype]
   if !ok {
      return nil, fmt.Errorf("unknown backend type: %v", betype)
   }

   be, err := befunc(bm.sm, bm.extIface)
   if err != nil {
      return nil, err
   }
   bm.active[betype] = be

   bm.wg.Add(1)
   go func() {
      <-bm.ctx.Done()

      // TODO(eyakubovich): this obviosly introduces a race.
      // GetBackend() could get called while we are here.
      // Currently though, all backends' Run exit only
      // on shutdown

      bm.mux.Lock()
      delete(bm.active, betype)
      bm.mux.Unlock()

      bm.wg.Done()
   }()

   return be, nil
}

main.go的289行bn, err := be.RegisterNetwork(ctx, wg, config),实际上调用的是具体的backend的Register,比如配置了vxlan就调用vxlan的register:

flannel/backend/vxlan/vxlan.go:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (be *VXLANBackend) RegisterNetwork(ctx context.Context, wg sync.WaitGroup, config *subnet.Config) (backend.Network, error) {
   // Parse our configuration
   cfg := struct {
      VNI           int
      Port          int
      GBP           bool
      Learning      bool
      DirectRouting bool
   }{
      VNI: defaultVNI,
   }

   if len(config.Backend) > 0 {
      if err := json.Unmarshal(config.Backend, &cfg); err != nil {
         return nil, fmt.Errorf("error decoding VXLAN backend config: %v", err)
      }
   }
   log.Infof("VXLAN config: VNI=%d Port=%d GBP=%v Learning=%v DirectRouting=%v", cfg.VNI, cfg.Port, cfg.GBP, cfg.Learning, cfg.DirectRouting)

   devAttrs := vxlanDeviceAttrs{
      vni:       uint32(cfg.VNI),
      name:      fmt.Sprintf("flannel.%v", cfg.VNI),
      vtepIndex: be.extIface.Iface.Index,
      vtepAddr:  be.extIface.IfaceAddr,
      vtepPort:  cfg.Port,
      gbp:       cfg.GBP,
      learning:  cfg.Learning,
   }

   dev, err := newVXLANDevice(&devAttrs)
   if err != nil {
      return nil, err
   }
   dev.directRouting = cfg.DirectRouting

   subnetAttrs, err := newSubnetAttrs(be.extIface.ExtAddr, dev.MACAddr())
   if err != nil {
      return nil, err
   }

   lease, err := be.subnetMgr.AcquireLease(ctx, subnetAttrs)
   switch err {
   case nil:
   case context.Canceled, context.DeadlineExceeded:
      return nil, err
   default:
      return nil, fmt.Errorf("failed to acquire lease: %v", err)
   }

   // Ensure that the device has a /32 address so that no broadcast routes are created.
   // This IP is just used as a source address for host to workload traffic (so
   // the return path for the traffic has an address on the flannel network to use as the destination)
   if err := dev.Configure(ip.IP4Net{IP: lease.Subnet.IP, PrefixLen: 32}); err != nil {
      return nil, fmt.Errorf("failed to configure interface %s: %s", dev.link.Attrs().Name, err)
   }

   return newNetwork(be.subnetMgr, be.extIface, dev, ip.IP4Net{}, lease)
}

该函数主要是创建vxlan设备。配置对应的路由,给vxlan设备配置IP信息等等。

updatedupdated2019-11-242019-11-24