calico cni pluginのADD処理を追ってみる

calicoのCNIが何をしているのかを理解するために、コードリーディングする。
前提としてk8sで動作させている場合の挙動のみを追うようにする。

calicoの提供するCNIプラグイン

calicoの提供するCNIプラグインはcalico-cniとcalico-ipamの2つがある。
本記事で紹介するのはcalico-cniの方である。

calico-cni

reference pluginの区分でいうMainに分類されるCNIプラグイン
バイナリファイルの名前はcalico

calico-ipam

reference pluginの区分でいうIPAMに分類されるCNIプラグイン
バイナリファイルの名前はcalico-ipam

CNIコンフィグ

kubernetesでcalicoを動かす際のデフォルトのCNIコンフィグは以下になっている。

{
  "name": "k8s-pod-network",
  "cniVersion": "0.3.1",
  "plugins": [
    {
      "type": "calico",
      "log_level": "info",
      "log_file_path": "/var/log/calico/cni/cni.log",
      "datastore_type": "kubernetes",
      "nodename": "__KUBERNETES_NODE_NAME__",
      "mtu": __CNI_MTU__,
      "ipam": {
          "type": "calico-ipam"
      },
      "policy": {
          "type": "k8s"
      },
      "kubernetes": {
          "kubeconfig": "__KUBECONFIG_FILEPATH__"
      }
    },
    {
      "type": "portmap",
      "snat": true,
      "capabilities": {"portMappings": true}
    },
    {
      "type": "bandwidth",
      "capabilities": {"bandwidth": true}
    }
  ]
}

type: calicoの部分に注目すると、calicoの中でipamとpolicyという記述がある。
それぞれcalico-ipamとKubernetesのNetworkPolicyを指している。

ipamの設定に関するDoc

docs.tigera.io

policyの設定に関するDoc

docs.tigera.io

ADD処理の中身

ここからは上述したCNIコンフィグを使った際のcalicoのADD処理についてその動作を追っていく。

CNIコンフィグの読み込み

標準入力、つまりはCNIコンフィグをNetConf構造体に読みこむ。

type NetConf struct {
    CNIVersion     string `json:"cniVersion,omitempty"`
    Name           string `json:"name"`
    Type           string `json:"type"`
    Mode           string `json:"mode"`
    VXLANMacPrefix string `json:"vxlan_mac_prefix"`
    VXLANVNI       uint64 `json:"vxlan_vni"`
    IPAM           struct {
        Name       string
        Type       string   `json:"type"`
        Subnet     string   `json:"subnet"`
        AssignIpv4 *string  `json:"assign_ipv4"`
        AssignIpv6 *string  `json:"assign_ipv6"`
        IPv4Pools  []string `json:"ipv4_pools,omitempty"`
        IPv6Pools  []string `json:"ipv6_pools,omitempty"`
    } `json:"ipam,omitempty"`
    Args                 Args                   `json:"args"`
    MTU                  int                    `json:"mtu"`
    NumQueues            int                    `json:"num_queues"`
    Nodename             string                 `json:"nodename"`
    NodenameFile         string                 `json:"nodename_file"`
    IPAMLockFile         string                 `json:"ipam_lock_file"`
    NodenameFileOptional bool                   `json:"nodename_file_optional"`
    DatastoreType        string                 `json:"datastore_type"`
    EtcdEndpoints        string                 `json:"etcd_endpoints"`
    EtcdDiscoverySrv     string                 `json:"etcd_discovery_srv"`
    LogLevel             string                 `json:"log_level"`
    LogFilePath          string                 `json:"log_file_path"`
    LogFileMaxSize       int                    `json:"log_file_max_size"`
    LogFileMaxAge        int                    `json:"log_file_max_age"`
    LogFileMaxCount      int                    `json:"log_file_max_count"`
    Policy               Policy                 `json:"policy"`
    Kubernetes           Kubernetes             `json:"kubernetes"`
    FeatureControl       FeatureControl         `json:"feature_control"`
    EtcdScheme           string                 `json:"etcd_scheme"`
    EtcdKeyFile          string                 `json:"etcd_key_file"`
    EtcdCertFile         string                 `json:"etcd_cert_file"`
    EtcdCaCertFile       string                 `json:"etcd_ca_cert_file"`
    ContainerSettings    ContainerSettings      `json:"container_settings,omitempty"`
    IncludeDefaultRoutes bool                   `json:"include_default_routes,omitempty"`
    DataplaneOptions     map[string]interface{} `json:"dataplane_options,omitempty"`

    // Windows-specific configuration.
    // WindowsPodDeletionTimestampTimeout defines number of seconds before a pod deletion timestamp timeout and
    // should be removed from registry. Default: 600 seconds
    WindowsPodDeletionTimestampTimeout int `json:"windows_pod_deletion_timestamp_timeout,omitempty"`
    // WindowsUseSingleNetwork disables the use of multiple IPAM blocks on a single host and forces
    // a static HNS network name.
    WindowsUseSingleNetwork bool `json:"windows_use_single_network,omitempty"`
    // WindowsDisableDefaultBlockAllPolicy disables the default "block all traffic" policy on the pod endpoint.
    // By default, WindowsDisableDefaultBlockAllPolicy = false, as the default "block all traffic" policy is placed at
    // the time of creating the pod network.
    // If WindowsDisableDefaultBlockAllPolicy = true, then the default policy is disabled and pod network
    // is created without "block all traffic" policy.
    WindowsDisableDefaultDenyAllPolicy bool `json:"windows_disable_default_deny_all_policy"`
    // WindowsLoopbackDSR indicates if the running platform supports loopback DSR.
    WindowsLoopbackDSR bool `json:"windows_loopback_DSR,omitempty"`

    RuntimeConfig RuntimeConfig

    // The CNI plugin waits until all the endpoints specified in ReadinessGates are ready
    ReadinessGates []string `json:"readiness_gates"`

    // Options below here are deprecated.
    EtcdAuthority string `json:"etcd_authority"`
    Hostname      string `json:"hostname"`
}

CNIバージョンのチェック

CNIのバージョンが1.0.0より新しいものであるかチェックする。

   if version.Compare(conf.CNIVersion, "1.0.0", ">") {
        return fmt.Errorf("unsupported CNI version %s", conf.CNIVersion)
    }

WEPIDsの作成

WEPとはCalicoで定義されている言葉でworkloadEndpointの略である。 WEPはpodの接続情報をまとめたものになっている。
WEPリソース自体はpod作成直後に、calico-kube-controllerが作成をしてくれている。(この時点ではPodへのIP等各種フィールドは空になっている。)

docs.tigera.io

WEPIDsはWEPの識別情報をまとめたもので以下の構造体で表現される。 calico-cniはCmdArgsからWEPIDsを作成する。

type WEPIdentifiers struct {
    Namespace string
    WEPName   string
    names.WorkloadEndpointIdentifiers
}

## names.WorkloadEndpointIdentifiers
type WorkloadEndpointIdentifiers struct {
    Node         string
    Orchestrator string
    Endpoint     string
    Workload     string
    Pod          string
    ContainerID  string
}

calico clientの作成

CNIコンフィグでdatastore_typeをkubernetesに指定しているので、kubernetesのクライアントが作成される。
kuberntesクライアントが使用するkubeconfigはCNIコンフィグで指定されたものになる。

   calicoClient, err := utils.CreateClient(conf)
    if err != nil {
        err = fmt.Errorf("error creating calico client: %v", err)
        return
    }

WEPIDsの更新

WEPリソースと先程作成したWEPIDsを突き合わせて、WEPIDsのEndpointWEPNameフィールドをアップデートする。

k8sの固有のADD処理

ここからいよいよADD処理の具体的な実装をみていく。

   if wepIDs.Orchestrator == api.OrchestratorKubernetes {
        if result, err = k8s.CmdAddK8s(ctx, args, conf, *wepIDs, calicoClient, endpoint); err != nil {
            return
        }

CmdAddK8sというのが、その処理の呼び出しになっている。

dataplaneの特定

dataplaneはCNIコンフィグで指定されたものを使用する。
今回のようにCNIコンフィグで特に指定がされていない場合はlinux dataplane(iproute2)を利用する。

   d, err := dataplane.GetDataplane(conf, logger)

IPAM pluginの呼び出し

IPAMプラグインを呼び出してIPアドレスを取得する。
ここでcalico-ipamというCNIプラグインが呼び出されるが、本記事ではcalico-ipamの説明は省略。

       result, err = utils.AddIPAM(conf, args, logger)
        if err != nil {
            return nil, err
        }

WorkloadEndpointインスタンスの更新

podから得られた各種情報やIPAMプラグインの呼び出しによって得られたIPアドレスでWorkloadEndpointインスタンスを更新する。

   endpoint.Name = epIDs.WEPName
    endpoint.Namespace = epIDs.Namespace
    endpoint.Labels = labels
    endpoint.GenerateName = generateName
    endpoint.Spec.Endpoint = epIDs.Endpoint
    endpoint.Spec.Node = epIDs.Node
    endpoint.Spec.Orchestrator = epIDs.Orchestrator
    endpoint.Spec.Pod = epIDs.Pod
    endpoint.Spec.Ports = ports
    endpoint.Spec.IPNetworks = []string{}
    endpoint.Spec.ServiceAccountName = serviceAccount

    // Set the profileID according to whether Kubernetes policy is required.
    // If it's not, then just use the network name (which is the normal behavior)
    // otherwise use one based on the Kubernetes pod's profile(s).
    if conf.Policy.PolicyType == "k8s" {
        endpoint.Spec.Profiles = profiles
    } else {
        endpoint.Spec.Profiles = []string{conf.Name}
    }

    // Populate the endpoint with the output from the IPAM plugin.
    if err = utils.PopulateEndpointNets(endpoint, result); err != nil {
        // Cleanup IP allocation and return the error.
        utils.ReleaseIPAllocation(logger, conf, args)
        return nil, err
    }
    logger.WithField("endpoint", endpoint).Info("Populated endpoint")
    logger.Infof("Calico CNI using IPs: %s", endpoint.Spec.IPNetworks)

vethの名前の決定

host側に設定する方のvethの名前を決定します。

   desiredVethName := k8sconversion.NewConverter().VethNameForWorkload(epIDs.Namespace, epIDs.Pod)

VethNameForWorkloadの処理の中身は以下のようになっていて、vethの名前はprefixとハッシュの組み合わせになっている。
prefixはデフォルトではcaliが指定されるが、FELIX_INTERFACEPREFIX経由で変更可能である。

   h := sha1.New()
    h.Write([]byte(fmt.Sprintf("%s.%s", namespace, podname)))
    prefix := os.Getenv("FELIX_INTERFACEPREFIX")
    if prefix == "" {
        // Prefix is not set. Default to "cali"
        prefix = "cali"
    } else {
        // Prefix is set - use the first value in the list.
        splits := strings.Split(prefix, ",")
        prefix = splits[0]
    }
    log.WithField("prefix", prefix).Debugf("Using prefix to create a WorkloadEndpoint veth name")
    return fmt.Sprintf("%s%s", prefix, hex.EncodeToString(h.Sum(nil))[:11])

interfaceの作成と設定

DoNetworkingというのがインターフェースの各種設定をやる関数。

   hostVethName, contVethMac, err := d.DoNetworking(
        ctx, calicoClient, args, result, desiredVethName, routes, endpoint, annot)
    if err != nil {
        logger.WithError(err).Error("Error setting up networking")
        releaseIPAM()
        return nil, err
    }

以降はDoNeworkingの中身を解説する。
※calicoはnetworkの操作にvishvananda/netlinkを使っているが、使用しているバージョンは本記事執筆時点のlatestである1.1.0でなくbeta版のv1.2.1-beta.2である。

既存インターフェースのクリア

もし、同名のインターフェースが既に存在していたら、それを削除する。

   if oldHostVeth, err := hostNlHandle.LinkByName(hostVethName); err == nil {
        if err = hostNlHandle.LinkDel(oldHostVeth); err != nil {
            return "", "", fmt.Errorf("failed to delete old hostVeth %v: %v", hostVethName, err)
        }
        d.logger.Infof("Cleaning old hostVeth: %v", hostVethName)
    }
vethの作成
       la := netlink.NewLinkAttrs()
        la.Name = contVethName
        la.MTU = d.mtu
        la.NumTxQueues = d.queues
        la.NumRxQueues = d.queues
        veth := &netlink.Veth{
            LinkAttrs:     la,
            PeerName:      hostVethName,
            PeerNamespace: netlink.NsFd(int(hostNS.Fd())),
        }

        if err := netlink.LinkAdd(veth); err != nil {
            d.logger.Errorf("Error adding veth %+v: %s", veth, err)
            return err
        }
host側vethへのMACの設定
       if mac, err := net.ParseMAC("EE:EE:EE:EE:EE:EE"); err != nil {
            d.logger.Infof("failed to parse MAC Address: %v. Using kernel generated MAC.", err)
        } else {
            // Set the MAC address on the host side interface so the kernel does not
            // have to generate a persistent address which fails some times.
            if err = hostNlHandle.LinkSetHardwareAddr(hostVeth, mac); err != nil {
                d.logger.Warnf("failed to Set MAC of %q: %v. Using kernel generated MAC.", hostVethName, err)
            }
        }

上記コードを見てわかる通り、calicoではhost側のvethのMACは常にEE:EE:EE:EE:EE:EEになる。 calico的には使わないんだから良いだろということらしい。

docs.tigera.io

ホスト側vethへのカーネルパラメータの設定

ループバックへのルーティングを有効化する。

       err := writeProcSys(fmt.Sprintf("/proc/sys/net/ipv4/conf/%s/route_localnet", hostVethName), "1")
        if err != nil {
            return fmt.Errorf("failed to set net.ipv4.conf.%s.route_localnet=1: %s", hostVethName, err)
        }

ProxyARPのDelayを0にする。 (ProxyARPを有効にするNICでdelayを0にするのは定石)

       if err = writeProcSys(fmt.Sprintf("/proc/sys/net/ipv4/neigh/%s/proxy_delay", hostVethName), "0"); err != nil {
            d.logger.Warnf("failed to set net.ipv4.neigh.%s.proxy_delay=0: %s", hostVethName, err)
        }

ProxyARPのEnableにする。(ProxyARPの設定の意味は後述する。)

       if err = writeProcSys(fmt.Sprintf("/proc/sys/net/ipv4/conf/%s/proxy_arp", hostVethName), "1"); err != nil {
            return fmt.Errorf("failed to set net.ipv4.conf.%s.proxy_arp=1: %s", hostVethName, err)
        }

ip forwardingをEnableにする。

       if err = writeProcSys(fmt.Sprintf("/proc/sys/net/ipv4/conf/%s/forwarding", hostVethName), "1"); err != nil {
            return fmt.Errorf("failed to set net.ipv4.conf.%s.forwarding=1: %s", hostVethName, err)
        }
vethのリンクアップ

ホスト側

       if err = hostNlHandle.LinkSetUp(hostVeth); err != nil {
            return fmt.Errorf("failed to set %q up: %w", hostVethName, err)
        }

コンテナNamespace側

       if err = netlink.LinkSetUp(contVeth); err != nil {
            return fmt.Errorf("failed to set %q up: %w", contVethName, err)
        }

コンテナNamespace側のvethにMACを設定

annotationで明示的に指定されていればそれを使いますが、指定されていないければカーネル側で自動で設定されるものをそのまま使う。

       if requestedContVethMac, found := annotations["cni.projectcalico.org/hwAddr"]; found {
            tmpContVethMAC, err := net.ParseMAC(requestedContVethMac)
            if err != nil {
                return fmt.Errorf("failed to parse MAC address %v provided via cni.projectcalico.org/hwAddr: %v",
                    requestedContVethMac, err)
            }

            err = netlink.LinkSetHardwareAddr(contVeth, tmpContVethMAC)
            if err != nil {
                return fmt.Errorf("failed to set container veth MAC to %v as requested via cni.projectcalico.org/hwAddr: %v",
                    requestedContVethMac, err)
            }

            contVethMAC = tmpContVethMAC.String()
            d.logger.Infof("successfully configured container veth MAC to %v as requested via cni.projectcalico.org/hwAddr",
                contVethMAC)
        } else {
            contVethMAC = contVeth.Attrs().HardwareAddr.String()
        }

コンテナNamespace側のルートの設定

まず、169.254.1.1/32というLINKLOCALなネットワークへの宛先を登録する。

           gw := net.IPv4(169, 254, 1, 1)
            gwNet := &net.IPNet{IP: gw, Mask: net.CIDRMask(32, 32)}
            err := netlink.RouteAdd(
                &netlink.Route{
                    LinkIndex: contVeth.Attrs().Index,
                    Scope:     netlink.SCOPE_LINK,
                    Dst:       gwNet,
                },
            )

            if err != nil {
                return fmt.Errorf("failed to add route inside the container: %v", err)
            }

これによってルーティングテーブルに169.254.1.1 dev eth0 scope linkというルートが登録される。

つづいて、デフォルトルートをインストールする。

           for _, r := range routes {
                if r.IP.To4() == nil {
                    d.logger.WithField("route", r).Debug("Skipping non-IPv4 route")
                    continue
                }
                d.logger.WithField("route", r).Debug("Adding IPv4 route")
                if err = ip.AddRoute(r, gw, contVeth); err != nil {
                    return fmt.Errorf("failed to add IPv4 route for %v via %v: %v", r, gw, err)
                }
            }

上述のコードでroutesとなっているが、今回のケースではroutesには0.0.0.0/0だけが入っている。 これによって、ルーティングテーブルにはdefault via 169.254.1.1 dev eth0がインストールされる。

コンテナNamespace側のアドレスの設定
       for _, addr := range result.IPs {
            if err = netlink.AddrAdd(contVeth, &netlink.Addr{IPNet: &addr.Address}); err != nil {
                return fmt.Errorf("failed to add IP addr to %q: %v", contVeth, err)
            }
        }

ホスト側のルートの設定

       route := netlink.Route{
            LinkIndex: hostVeth.Attrs().Index,
            Scope:     netlink.SCOPE_LINK,
            Dst:       &ipAddr.Address,
        }
        err := hostNlHandle.RouteAdd(&route)

上述のコードでipAddrに入っているのはpodのIPになる。 これによって、ルーティングテーブルにはPOD_IP dev caliXXXXX scope linkがインストールされます。

これでDoNetworkingは終わり。

workloadEndpointの更新

最後にここまでで得られたPodへの接続情報(IPアドレス等の情報)を使ってworkloadEndpointを更新して終了。

   if _, err := utils.CreateOrUpdate(ctxPatchCNI, calicoClient, endpoint); err != nil {
        logger.WithError(err).Error("Error creating/updating endpoint in datastore.")
        releaseIPAM()
        return nil, err
    }
    logger.Info("Wrote updated endpoint to datastore")

なぜProxyARPが必要なのか

コンテナNamespaceにおけるrouting tableは次のようになっている。

default via 169.254.1.1 dev eth0
169.254.1.1 dev eth0 scope link

つまり、コンテナからのIP通信はeth0を通じた169.254.1.1を経由して送ろうとする。
そのため、コンテナはethernet headerのdstに入れるMACを169.254.1.1へのARPリクエストをで解決しようとする。
ここでホスト側のvethでProxyARPが有効になっていると、コンテナNamespace側のvethが送ってきたARPリクエストに対してホスト側のvethは自身のMACARPリプライをする。 こうして、コンテナはethernet headerのdstにホスト側vethのMACEE:EE:EE:EE:EE:EE)を入れてパケットを送信する。

あるPod(10.244.0.65)から別のPod(10.244.0.194)へpingを送る際のパケットキャプチャは以下になる。169.254.1.1を解決するためのARPリクエストを送ってEE:EE:EE:EE:EE:EEARP解決されていることがわかる。

workloadEndpointの更新について

CNIのコード中にあるworkloadEndpointへのCreateやUpdateについて、コード中では次のようにしており、いかにもworkloadEndpointに対してCreateOrUpdateを実行しているようだが、コードの中身を追っていくと、この操作はpodのannotationを変更しているだけである。

   ctxPatchCNI := k8sresources.ContextWithPatchMode(ctx, k8sresources.PatchModeCNI)
    if _, err := utils.CreateOrUpdate(ctxPatchCNI, calicoClient, endpoint); err != nil {
        logger.WithError(err).Error("Error creating/updating endpoint in datastore.")
        releaseIPAM()
        return nil, err
    }
    logger.Info("Wrote updated endpoint to datastore")

実際はcalico-kube-controllersがpodのannotationの変更を検知してそれを受けてworkloadEndpointの更新を行っている。 docs.tigera.io