上一篇介绍了flannel服务的启动。下面看看flanneld怎个进程是怎样运行的。设计到backend部分已vxlan为例。
每个节点的网段分配是上一篇说的RegisterNetwork,但具体怎么做的呢?这样确定自己网段呢?

subnetAttrs, err := newSubnetAttrs(be.extIface.ExtAddr, dev.MACAddr())
    if err != nil {
        return nil, err
    }

    lease, err := be.subnetMgr.AcquireLease(ctx, subnetAttrs)
    switch err {
    case nil:

    case context.Canceled, context.DeadlineExceeded:
        return nil, err

    default:
        return nil, fmt.Errorf("failed to acquire lease: %v", err)
    }

通过AcquireLease获取

// try to reuse a subnet if there's one that matches our IP
    if l := findLeaseByIP(leases, extIaddr); l != nil {
        // make sure the existing subnet is still within the configured network
        if isSubnetConfigCompat(config, l.Subnet) {
            log.Infof("Found lease (%v) for current IP (%v), reusing", l.Subnet, extIaddr)

            ttl := time.Duration(0)
            if !l.Expiration.IsZero() {
                // Not a reservation
                ttl = subnetTTL
            }
            exp, err := m.registry.updateSubnet(ctx, l.Subnet, attrs, ttl, 0)
            if err != nil {
                return nil, err
            }

            l.Attrs = *attrs
            l.Expiration = exp
            return l, nil
        } else {
            log.Infof("Found lease (%v) for current IP (%v) but not compatible with current config, deleting", l.Subnet, extIaddr)
            if err := m.registry.deleteSubnet(ctx, l.Subnet); err != nil {
                return nil, err
            }
        }
    }

    // no existing match, grab a new one
    sn, err := m.allocateSubnet(config, leases)
    if err != nil {
        return nil, err
    }

先是看看这个publicip是否以及分配了,如果分配就按照之前分配的,这样服务重启过后仍然能保持,如果没有,第一次启动则allocateSubnet

func (m *LocalManager) allocateSubnet(config *Config, leases []Lease) (ip.IP4Net, error) {
    log.Infof("Picking subnet in range %s ... %s", config.SubnetMin, config.SubnetMax)

    var bag []ip.IP4
    sn := ip.IP4Net{IP: config.SubnetMin, PrefixLen: config.SubnetLen}

OuterLoop:
    for ; sn.IP <= config.SubnetMax && len(bag) < 100; sn = sn.Next() {
        for _, l := range leases {
            if sn.Overlaps(l.Subnet) {
                continue OuterLoop
            }
        }
        bag = append(bag, sn.IP)
    }

    if len(bag) == 0 {
        return ip.IP4Net{}, errors.New("out of subnets")
    } else {
        i := randInt(0, len(bag))
        return ip.IP4Net{IP: bag[i], PrefixLen: config.SubnetLen}, nil
    }
}

这个方法产生一个不冲突的IP段。这样把这个注册到etcd
在上一篇结尾的时候有个MonitorLease,那么这个具体是怎么续租网络的呢?

func MonitorLease(ctx context.Context, sm subnet.Manager, bn backend.Network) error {
    // Use the subnet manager to start watching leases.
    evts := make(chan subnet.Event)
    go subnet.WatchLease(ctx, sm, bn.Lease().Subnet, evts)
    renewMargin := time.Duration(opts.subnetLeaseRenewMargin) * time.Minute
    dur := bn.Lease().Expiration.Sub(time.Now()) - renewMargin

    for {
        select {
        case <-time.After(dur):
            err := sm.RenewLease(ctx, bn.Lease())
            if err != nil {
                log.Error("Error renewing lease (trying again in 1 min): ", err)
                dur = time.Minute
                continue
            }

            log.Info("Lease renewed, new expiration: ", bn.Lease().Expiration)
            dur = bn.Lease().Expiration.Sub(time.Now()) - renewMargin

        case e := <-evts:
            switch e.Type {
            case subnet.EventAdded:
                bn.Lease().Expiration = e.Lease.Expiration
                dur = bn.Lease().Expiration.Sub(time.Now()) - renewMargin
                log.Infof("Waiting for %s to renew lease", dur)

            case subnet.EventRemoved:
                log.Error("Lease has been revoked. Shutting down daemon.")
                return errInterrupted
            }

        case <-ctx.Done():
            log.Infof("Stopped monitoring lease")
            return errCanceled
        }
    }
}

通过WatchLease监听subnet/etcdv2/registry.go,

func (esr *etcdSubnetRegistry) watchSubnet(ctx context.Context, since uint64, sn ip.IP4Net) (Event, uint64, error) {
    key := path.Join(esr.etcdCfg.Prefix, "subnets", MakeSubnetKey(sn))
    opts := &etcd.WatcherOptions{
        AfterIndex: since,
    }

    e, err := esr.client().Watcher(key, opts).Next(ctx)
    if err != nil {
        return Event{}, 0, err
    }

    evt, err := parseSubnetWatchResponse(e)
    return evt, e.Node.ModifiedIndex, err
}

就是watch subnet这个网段的节点。如果是EventAdded说明是添加网络就更新租约时间,如果是删除网络事件就把flanneld进程停止。如果按照正常的超时程序会RenewLease

func (m *LocalManager) RenewLease(ctx context.Context, lease *Lease) error {
    exp, err := m.registry.updateSubnet(ctx, lease.Subnet, &lease.Attrs, subnetTTL, 0)
    if err != nil {
        return err
    }

    lease.Expiration = exp
    return nil
}

这里不得不补充一个etcd节点的ttl时效,可以通过接口查询

curl http://10.39.0.6:2379/v2/keys/flannel/network/subnets/192.168.69.0-24|python -m json.tool
{
    "action": "get",
    "node": {
        "createdIndex": 2933637,
        "expiration": "2017-04-26T04:15:51.715778571Z",
        "key": "/flannel/network/subnets/192.168.69.0-24",
        "modifiedIndex": 2933637,
        "ttl": 64256,
        "value": "{\"PublicIP\":\"10.39.0.53\",\"BackendType\":\"vxlan\",\"BackendData\":{\"VtepMAC\":\"e6:7a:17:a1:9d:f1\"}}"
    }
}

当flanel每当超时后就会重新更新ttl时间。这样flannel就可以维护自己的子网段了。每个主机上的容器有自己的一个网段的问题解决了。上一篇的be.Run就是启动了vxlan backend

func (nw *network) Run(ctx context.Context) {
    log.V(0).Info("Watching for L3 misses")
    misses := make(chan *netlink.Neigh, 100)
    // Unfortunately MonitorMisses does not take a cancel channel
    // as there's no wait to interrupt netlink socket recv
    go nw.dev.MonitorMisses(misses)

    wg := sync.WaitGroup{}

    log.V(0).Info("Watching for new subnet leases")
    events := make(chan []subnet.Event)
    wg.Add(1)
    go func() {
        subnet.WatchLeases(ctx, nw.subnetMgr, nw.SubnetLease, events)
        log.V(1).Info("WatchLeases exited")
        wg.Done()
    }()

    defer wg.Wait()

    select {
    case initialEventsBatch := <-events:
        for {
            err := nw.handleInitialSubnetEvents(initialEventsBatch)
            if err == nil {
                break
            }
            log.Error(err, " About to retry")
            time.Sleep(time.Second)
        }

    case <-ctx.Done():
        return
    }

    for {
        select {
        case miss := <-misses:
            nw.handleMiss(miss)

        case evtBatch := <-events:
            nw.handleSubnetEvents(evtBatch)

        case <-ctx.Done():
            return
        }
    }
}

先是注册miss

func (dev *vxlanDevice) MonitorMisses(misses chan *netlink.Neigh) {
    nlsock, err := nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_NEIGH)
    if err != nil {
        log.Error("Failed to subscribe to netlink RTNLGRP_NEIGH messages")
        return
    }

    for {
        msgs, err := nlsock.Receive()
        if err != nil {
            log.Errorf("Failed to receive from netlink: %v ", err)

            time.Sleep(1 * time.Second)
            continue
        }

        for _, msg := range msgs {
            dev.processNeighMsg(msg, misses)
        }
    }
}

然后watch子网/flannel/network/subnets

func WatchLeases(ctx context.Context, sm Manager, ownLease *Lease, receiver chan []Event) {
    lw := &leaseWatcher{
        ownLease: ownLease,
    }
    var cursor interface{}

    for {
        res, err := sm.WatchLeases(ctx, cursor)
        if err != nil {
            if err == context.Canceled || err == context.DeadlineExceeded {
                return
            }

            log.Errorf("Watch subnets: %v", err)
            time.Sleep(time.Second)
            continue
        }

        cursor = res.Cursor

        var batch []Event

        if len(res.Events) > 0 {
            batch = lw.update(res.Events)
        } else {
            batch = lw.reset(res.Snapshot)
        }

        if len(batch) > 0 {
            receiver <- batch
        }
    }
}

下面就开始循环select处理所有的misses和events。
如果是events事件,

func (nw *network) handleSubnetEvents(batch []subnet.Event) {
    for _, event := range batch {
        switch event.Type {
        case subnet.EventAdded:
            log.V(1).Info("Subnet added: ", event.Lease.Subnet)

            if event.Lease.Attrs.BackendType != "vxlan" {
                log.Warningf("Ignoring non-vxlan subnet: type=%v", event.Lease.Attrs.BackendType)
                continue
            }

            var attrs vxlanLeaseAttrs
            if err := json.Unmarshal(event.Lease.Attrs.BackendData, &attrs); err != nil {
                log.Error("Error decoding subnet lease JSON: ", err)
                continue
            }
            nw.routes.set(event.Lease.Subnet, net.HardwareAddr(attrs.VtepMAC))
            nw.dev.AddL2(neighbor{IP: event.Lease.Attrs.PublicIP, MAC: net.HardwareAddr(attrs.VtepMAC)})

        case subnet.EventRemoved:
            log.V(1).Info("Subnet removed: ", event.Lease.Subnet)

            if event.Lease.Attrs.BackendType != "vxlan" {
                log.Warningf("Ignoring non-vxlan subnet: type=%v", event.Lease.Attrs.BackendType)
                continue
            }

            var attrs vxlanLeaseAttrs
            if err := json.Unmarshal(event.Lease.Attrs.BackendData, &attrs); err != nil {
                log.Error("Error decoding subnet lease JSON: ", err)
                continue
            }

            if len(attrs.VtepMAC) > 0 {
                nw.dev.DelL2(neighbor{IP: event.Lease.Attrs.PublicIP, MAC: net.HardwareAddr(attrs.VtepMAC)})
            }
            nw.routes.remove(event.Lease.Subnet)

        default:
            log.Error("Internal error: unknown event type: ", int(event.Type))
        }
    }
}

我只解释一下子网添加事件EventAdded,此时主要做两件事,第一添加本地缓存nw.routes,这个里面记录整个网络中所以的网络和对应vtep的mac,第二是添加本地flannel.1的fdb,设定目的vtep的mac对应的publicIp。通过之前的blog大家应该都比较清楚了。
如果是miss事件,


func (nw *network) handleL3Miss(miss *netlink.Neigh) {
    route := nw.routes.findByNetwork(ip.FromIP(miss.IP))
    if route == nil {
        log.V(0).Infof("L3 miss but route for %v not found", miss.IP)
        return
    }

    if err := nw.dev.AddL3(neighbor{IP: ip.FromIP(miss.IP), MAC: route.vtepMAC}); err != nil {
        log.Errorf("AddL3 failed: %v", err)
    } else {
        log.V(2).Infof("L3 miss: AddL3 for %s succeeded", miss.IP)
    }
}

nw.routes.findByNetwork这个方法就是查找目标IP对应的网络的vtep的mac地址,并且写到本地ip n表中,在协议栈封装包时会先读取neighbor表,这样vxlan的Inner目的mac地址就有了。这样网络就能联通。

Logo

讨论HarmonyOS开发技术,专注于API与组件、DevEco Studio、测试、元服务和应用上架分发等。

更多推荐