calicoのCNIが何をしているのかを理解するために、コードリーディングする。
前提としてk8sで動作させている場合の挙動のみを追うようにする。
calicoの提供するCNIプラグイン
calicoの提供するCNIプラグインはcalico-cniとcalico-ipamの2つがある。
本記事で紹介するのはcalico-cniの方である。
calico-cni
reference pluginの区分でいうMain
に分類されるCNIプラグイン
バイナリファイルの名前はcalico
calico-ipam
reference pluginの区分でいうIPAM
に分類されるCNIプラグイン
バイナリファイルの名前はcalico-ipam
CNIコンフィグ
kubernetesでcalicoを動かす際のデフォルトのCNIコンフィグは以下になっている。
{ "name": "k8s-pod-network", "cniVersion": "0.3.1", "plugins": [ { "type": "calico", "log_level": "info", "log_file_path": "/var/log/calico/cni/cni.log", "datastore_type": "kubernetes", "nodename": "__KUBERNETES_NODE_NAME__", "mtu": __CNI_MTU__, "ipam": { "type": "calico-ipam" }, "policy": { "type": "k8s" }, "kubernetes": { "kubeconfig": "__KUBECONFIG_FILEPATH__" } }, { "type": "portmap", "snat": true, "capabilities": {"portMappings": true} }, { "type": "bandwidth", "capabilities": {"bandwidth": true} } ] }
type: calico
の部分に注目すると、calicoの中でipamとpolicyという記述がある。
それぞれcalico-ipamとKubernetesのNetworkPolicyを指している。
ipamの設定に関するDoc
policyの設定に関するDoc
ADD処理の中身
ここからは上述したCNIコンフィグを使った際のcalicoのADD処理についてその動作を追っていく。
CNIコンフィグの読み込み
標準入力、つまりはCNIコンフィグをNetConf構造体に読みこむ。
type NetConf struct { CNIVersion string `json:"cniVersion,omitempty"` Name string `json:"name"` Type string `json:"type"` Mode string `json:"mode"` VXLANMacPrefix string `json:"vxlan_mac_prefix"` VXLANVNI uint64 `json:"vxlan_vni"` IPAM struct { Name string Type string `json:"type"` Subnet string `json:"subnet"` AssignIpv4 *string `json:"assign_ipv4"` AssignIpv6 *string `json:"assign_ipv6"` IPv4Pools []string `json:"ipv4_pools,omitempty"` IPv6Pools []string `json:"ipv6_pools,omitempty"` } `json:"ipam,omitempty"` Args Args `json:"args"` MTU int `json:"mtu"` NumQueues int `json:"num_queues"` Nodename string `json:"nodename"` NodenameFile string `json:"nodename_file"` IPAMLockFile string `json:"ipam_lock_file"` NodenameFileOptional bool `json:"nodename_file_optional"` DatastoreType string `json:"datastore_type"` EtcdEndpoints string `json:"etcd_endpoints"` EtcdDiscoverySrv string `json:"etcd_discovery_srv"` LogLevel string `json:"log_level"` LogFilePath string `json:"log_file_path"` LogFileMaxSize int `json:"log_file_max_size"` LogFileMaxAge int `json:"log_file_max_age"` LogFileMaxCount int `json:"log_file_max_count"` Policy Policy `json:"policy"` Kubernetes Kubernetes `json:"kubernetes"` FeatureControl FeatureControl `json:"feature_control"` EtcdScheme string `json:"etcd_scheme"` EtcdKeyFile string `json:"etcd_key_file"` EtcdCertFile string `json:"etcd_cert_file"` EtcdCaCertFile string `json:"etcd_ca_cert_file"` ContainerSettings ContainerSettings `json:"container_settings,omitempty"` IncludeDefaultRoutes bool `json:"include_default_routes,omitempty"` DataplaneOptions map[string]interface{} `json:"dataplane_options,omitempty"` // Windows-specific configuration. // WindowsPodDeletionTimestampTimeout defines number of seconds before a pod deletion timestamp timeout and // should be removed from registry. Default: 600 seconds WindowsPodDeletionTimestampTimeout int `json:"windows_pod_deletion_timestamp_timeout,omitempty"` // WindowsUseSingleNetwork disables the use of multiple IPAM blocks on a single host and forces // a static HNS network name. WindowsUseSingleNetwork bool `json:"windows_use_single_network,omitempty"` // WindowsDisableDefaultBlockAllPolicy disables the default "block all traffic" policy on the pod endpoint. // By default, WindowsDisableDefaultBlockAllPolicy = false, as the default "block all traffic" policy is placed at // the time of creating the pod network. // If WindowsDisableDefaultBlockAllPolicy = true, then the default policy is disabled and pod network // is created without "block all traffic" policy. WindowsDisableDefaultDenyAllPolicy bool `json:"windows_disable_default_deny_all_policy"` // WindowsLoopbackDSR indicates if the running platform supports loopback DSR. WindowsLoopbackDSR bool `json:"windows_loopback_DSR,omitempty"` RuntimeConfig RuntimeConfig // The CNI plugin waits until all the endpoints specified in ReadinessGates are ready ReadinessGates []string `json:"readiness_gates"` // Options below here are deprecated. EtcdAuthority string `json:"etcd_authority"` Hostname string `json:"hostname"` }
CNIバージョンのチェック
CNIのバージョンが1.0.0より新しいものであるかチェックする。
if version.Compare(conf.CNIVersion, "1.0.0", ">") { return fmt.Errorf("unsupported CNI version %s", conf.CNIVersion) }
WEPIDsの作成
WEPとはCalicoで定義されている言葉でworkloadEndpointの略である。
WEPはpodの接続情報をまとめたものになっている。
WEPリソース自体はpod作成直後に、calico-kube-controllerが作成をしてくれている。(この時点ではPodへのIP等各種フィールドは空になっている。)
WEPIDsはWEPの識別情報をまとめたもので以下の構造体で表現される。 calico-cniはCmdArgsからWEPIDsを作成する。
type WEPIdentifiers struct { Namespace string WEPName string names.WorkloadEndpointIdentifiers } ## names.WorkloadEndpointIdentifiers type WorkloadEndpointIdentifiers struct { Node string Orchestrator string Endpoint string Workload string Pod string ContainerID string }
calico clientの作成
CNIコンフィグでdatastore_typeをkubernetesに指定しているので、kubernetesのクライアントが作成される。
kuberntesクライアントが使用するkubeconfigはCNIコンフィグで指定されたものになる。
calicoClient, err := utils.CreateClient(conf) if err != nil { err = fmt.Errorf("error creating calico client: %v", err) return }
WEPIDsの更新
WEPリソースと先程作成したWEPIDsを突き合わせて、WEPIDsのEndpoint
とWEPName
フィールドをアップデートする。
k8sの固有のADD処理
ここからいよいよADD処理の具体的な実装をみていく。
if wepIDs.Orchestrator == api.OrchestratorKubernetes { if result, err = k8s.CmdAddK8s(ctx, args, conf, *wepIDs, calicoClient, endpoint); err != nil { return }
CmdAddK8sというのが、その処理の呼び出しになっている。
dataplaneの特定
dataplaneはCNIコンフィグで指定されたものを使用する。
今回のようにCNIコンフィグで特に指定がされていない場合はlinux dataplane(iproute2)を利用する。
d, err := dataplane.GetDataplane(conf, logger)
IPAM pluginの呼び出し
IPAMプラグインを呼び出してIPアドレスを取得する。
ここでcalico-ipamというCNIプラグインが呼び出されるが、本記事ではcalico-ipamの説明は省略。
result, err = utils.AddIPAM(conf, args, logger) if err != nil { return nil, err }
WorkloadEndpointインスタンスの更新
podから得られた各種情報やIPAMプラグインの呼び出しによって得られたIPアドレスでWorkloadEndpointインスタンスを更新する。
endpoint.Name = epIDs.WEPName endpoint.Namespace = epIDs.Namespace endpoint.Labels = labels endpoint.GenerateName = generateName endpoint.Spec.Endpoint = epIDs.Endpoint endpoint.Spec.Node = epIDs.Node endpoint.Spec.Orchestrator = epIDs.Orchestrator endpoint.Spec.Pod = epIDs.Pod endpoint.Spec.Ports = ports endpoint.Spec.IPNetworks = []string{} endpoint.Spec.ServiceAccountName = serviceAccount // Set the profileID according to whether Kubernetes policy is required. // If it's not, then just use the network name (which is the normal behavior) // otherwise use one based on the Kubernetes pod's profile(s). if conf.Policy.PolicyType == "k8s" { endpoint.Spec.Profiles = profiles } else { endpoint.Spec.Profiles = []string{conf.Name} } // Populate the endpoint with the output from the IPAM plugin. if err = utils.PopulateEndpointNets(endpoint, result); err != nil { // Cleanup IP allocation and return the error. utils.ReleaseIPAllocation(logger, conf, args) return nil, err } logger.WithField("endpoint", endpoint).Info("Populated endpoint") logger.Infof("Calico CNI using IPs: %s", endpoint.Spec.IPNetworks)
vethの名前の決定
host側に設定する方のvethの名前を決定します。
desiredVethName := k8sconversion.NewConverter().VethNameForWorkload(epIDs.Namespace, epIDs.Pod)
VethNameForWorkload
の処理の中身は以下のようになっていて、vethの名前はprefixとハッシュの組み合わせになっている。
prefixはデフォルトではcali
が指定されるが、FELIX_INTERFACEPREFIX
経由で変更可能である。
h := sha1.New() h.Write([]byte(fmt.Sprintf("%s.%s", namespace, podname))) prefix := os.Getenv("FELIX_INTERFACEPREFIX") if prefix == "" { // Prefix is not set. Default to "cali" prefix = "cali" } else { // Prefix is set - use the first value in the list. splits := strings.Split(prefix, ",") prefix = splits[0] } log.WithField("prefix", prefix).Debugf("Using prefix to create a WorkloadEndpoint veth name") return fmt.Sprintf("%s%s", prefix, hex.EncodeToString(h.Sum(nil))[:11])
interfaceの作成と設定
DoNetworkingというのがインターフェースの各種設定をやる関数。
hostVethName, contVethMac, err := d.DoNetworking( ctx, calicoClient, args, result, desiredVethName, routes, endpoint, annot) if err != nil { logger.WithError(err).Error("Error setting up networking") releaseIPAM() return nil, err }
以降はDoNeworkingの中身を解説する。
※calicoはnetworkの操作にvishvananda/netlink
を使っているが、使用しているバージョンは本記事執筆時点のlatestである1.1.0でなくbeta版のv1.2.1-beta.2
である。
既存インターフェースのクリア
もし、同名のインターフェースが既に存在していたら、それを削除する。
if oldHostVeth, err := hostNlHandle.LinkByName(hostVethName); err == nil { if err = hostNlHandle.LinkDel(oldHostVeth); err != nil { return "", "", fmt.Errorf("failed to delete old hostVeth %v: %v", hostVethName, err) } d.logger.Infof("Cleaning old hostVeth: %v", hostVethName) }
vethの作成
la := netlink.NewLinkAttrs() la.Name = contVethName la.MTU = d.mtu la.NumTxQueues = d.queues la.NumRxQueues = d.queues veth := &netlink.Veth{ LinkAttrs: la, PeerName: hostVethName, PeerNamespace: netlink.NsFd(int(hostNS.Fd())), } if err := netlink.LinkAdd(veth); err != nil { d.logger.Errorf("Error adding veth %+v: %s", veth, err) return err }
host側vethへのMACの設定
if mac, err := net.ParseMAC("EE:EE:EE:EE:EE:EE"); err != nil { d.logger.Infof("failed to parse MAC Address: %v. Using kernel generated MAC.", err) } else { // Set the MAC address on the host side interface so the kernel does not // have to generate a persistent address which fails some times. if err = hostNlHandle.LinkSetHardwareAddr(hostVeth, mac); err != nil { d.logger.Warnf("failed to Set MAC of %q: %v. Using kernel generated MAC.", hostVethName, err) } }
上記コードを見てわかる通り、calicoではhost側のvethのMACは常にEE:EE:EE:EE:EE:EE
になる。
calico的には使わないんだから良いだろということらしい。
ホスト側vethへのカーネルパラメータの設定
ループバックへのルーティングを有効化する。
err := writeProcSys(fmt.Sprintf("/proc/sys/net/ipv4/conf/%s/route_localnet", hostVethName), "1") if err != nil { return fmt.Errorf("failed to set net.ipv4.conf.%s.route_localnet=1: %s", hostVethName, err) }
ProxyARPのDelayを0にする。 (ProxyARPを有効にするNICでdelayを0にするのは定石)
if err = writeProcSys(fmt.Sprintf("/proc/sys/net/ipv4/neigh/%s/proxy_delay", hostVethName), "0"); err != nil { d.logger.Warnf("failed to set net.ipv4.neigh.%s.proxy_delay=0: %s", hostVethName, err) }
ProxyARPのEnableにする。(ProxyARPの設定の意味は後述する。)
if err = writeProcSys(fmt.Sprintf("/proc/sys/net/ipv4/conf/%s/proxy_arp", hostVethName), "1"); err != nil { return fmt.Errorf("failed to set net.ipv4.conf.%s.proxy_arp=1: %s", hostVethName, err) }
ip forwardingをEnableにする。
if err = writeProcSys(fmt.Sprintf("/proc/sys/net/ipv4/conf/%s/forwarding", hostVethName), "1"); err != nil { return fmt.Errorf("failed to set net.ipv4.conf.%s.forwarding=1: %s", hostVethName, err) }
vethのリンクアップ
ホスト側
if err = hostNlHandle.LinkSetUp(hostVeth); err != nil { return fmt.Errorf("failed to set %q up: %w", hostVethName, err) }
コンテナNamespace側
if err = netlink.LinkSetUp(contVeth); err != nil { return fmt.Errorf("failed to set %q up: %w", contVethName, err) }
コンテナNamespace側のvethにMACを設定
annotationで明示的に指定されていればそれを使いますが、指定されていないければカーネル側で自動で設定されるものをそのまま使う。
if requestedContVethMac, found := annotations["cni.projectcalico.org/hwAddr"]; found { tmpContVethMAC, err := net.ParseMAC(requestedContVethMac) if err != nil { return fmt.Errorf("failed to parse MAC address %v provided via cni.projectcalico.org/hwAddr: %v", requestedContVethMac, err) } err = netlink.LinkSetHardwareAddr(contVeth, tmpContVethMAC) if err != nil { return fmt.Errorf("failed to set container veth MAC to %v as requested via cni.projectcalico.org/hwAddr: %v", requestedContVethMac, err) } contVethMAC = tmpContVethMAC.String() d.logger.Infof("successfully configured container veth MAC to %v as requested via cni.projectcalico.org/hwAddr", contVethMAC) } else { contVethMAC = contVeth.Attrs().HardwareAddr.String() }
コンテナNamespace側のルートの設定
まず、169.254.1.1/32
というLINKLOCALなネットワークへの宛先を登録する。
gw := net.IPv4(169, 254, 1, 1) gwNet := &net.IPNet{IP: gw, Mask: net.CIDRMask(32, 32)} err := netlink.RouteAdd( &netlink.Route{ LinkIndex: contVeth.Attrs().Index, Scope: netlink.SCOPE_LINK, Dst: gwNet, }, ) if err != nil { return fmt.Errorf("failed to add route inside the container: %v", err) }
これによってルーティングテーブルに169.254.1.1 dev eth0 scope link
というルートが登録される。
つづいて、デフォルトルートをインストールする。
for _, r := range routes { if r.IP.To4() == nil { d.logger.WithField("route", r).Debug("Skipping non-IPv4 route") continue } d.logger.WithField("route", r).Debug("Adding IPv4 route") if err = ip.AddRoute(r, gw, contVeth); err != nil { return fmt.Errorf("failed to add IPv4 route for %v via %v: %v", r, gw, err) } }
上述のコードでroutesとなっているが、今回のケースではroutesには0.0.0.0/0
だけが入っている。
これによって、ルーティングテーブルにはdefault via 169.254.1.1 dev eth0
がインストールされる。
コンテナNamespace側のアドレスの設定
for _, addr := range result.IPs { if err = netlink.AddrAdd(contVeth, &netlink.Addr{IPNet: &addr.Address}); err != nil { return fmt.Errorf("failed to add IP addr to %q: %v", contVeth, err) } }
ホスト側のルートの設定
route := netlink.Route{ LinkIndex: hostVeth.Attrs().Index, Scope: netlink.SCOPE_LINK, Dst: &ipAddr.Address, } err := hostNlHandle.RouteAdd(&route)
上述のコードでipAddrに入っているのはpodのIPになる。
これによって、ルーティングテーブルにはPOD_IP dev caliXXXXX scope link
がインストールされます。
これでDoNetworkingは終わり。
workloadEndpointの更新
最後にここまでで得られたPodへの接続情報(IPアドレス等の情報)を使ってworkloadEndpointを更新して終了。
if _, err := utils.CreateOrUpdate(ctxPatchCNI, calicoClient, endpoint); err != nil { logger.WithError(err).Error("Error creating/updating endpoint in datastore.") releaseIPAM() return nil, err } logger.Info("Wrote updated endpoint to datastore")
なぜProxyARPが必要なのか
コンテナNamespaceにおけるrouting tableは次のようになっている。
default via 169.254.1.1 dev eth0 169.254.1.1 dev eth0 scope link
つまり、コンテナからのIP通信はeth0を通じた169.254.1.1を経由して送ろうとする。
そのため、コンテナはethernet headerのdstに入れるMACを169.254.1.1へのARPリクエストをで解決しようとする。
ここでホスト側のvethでProxyARPが有効になっていると、コンテナNamespace側のvethが送ってきたARPリクエストに対してホスト側のvethは自身のMACでARPリプライをする。
こうして、コンテナはethernet headerのdstにホスト側vethのMAC(EE:EE:EE:EE:EE:EE
)を入れてパケットを送信する。
あるPod(10.244.0.65)から別のPod(10.244.0.194)へpingを送る際のパケットキャプチャは以下になる。169.254.1.1を解決するためのARPリクエストを送ってEE:EE:EE:EE:EE:EE
でARP解決されていることがわかる。
workloadEndpointの更新について
CNIのコード中にあるworkloadEndpointへのCreateやUpdateについて、コード中では次のようにしており、いかにもworkloadEndpointに対してCreateOrUpdateを実行しているようだが、コードの中身を追っていくと、この操作はpodのannotationを変更しているだけである。
ctxPatchCNI := k8sresources.ContextWithPatchMode(ctx, k8sresources.PatchModeCNI) if _, err := utils.CreateOrUpdate(ctxPatchCNI, calicoClient, endpoint); err != nil { logger.WithError(err).Error("Error creating/updating endpoint in datastore.") releaseIPAM() return nil, err } logger.Info("Wrote updated endpoint to datastore")
実際はcalico-kube-controllersがpodのannotationの変更を検知してそれを受けてworkloadEndpointの更新を行っている。 docs.tigera.io