etcd 分布式 k-v 存储与 raft 应用 上

etcd 分布式 k-v 存储与 raft 应用 上

tk_sky 217 2024-02-27

etcd 分布式 k-v 存储与 raft 应用 上

一起读,@CD (

简介

etcd 是 go 语言实现的 分布式 k-v 存储,通过 raft 协议维护一致性,对外提供强一致性和高可用性。强一致性意味着超过最终一致性的可靠性,它可以有效应对各种类型的网络问题和机器故障问题,常用于提供高可靠要求的服务发现、分布式锁、分布式消息队列等功能。作为 CNCF 的核心项目,etcd 现在在 kubernetes 中用于维护集群的各种状态和元信息,保证这些信息的实时同步和持久化。

etcd 的 github 开源地址:etcd-io/etcd: Distributed reliable key-value store for the most critical data of a distributed system (github.com)

etcd 提供的能力包括:

  • 存储和获取数据的接口(get set)并保证数据的强一致性
  • 监听机制,可以监听 key 的变更
  • 提供 key 的过期和续约
  • 提供原子的 CAS(Compare-and-Swap)和 CAD(Compare-and-Delete)支持

etcd 经常作为一个 raft 的工程案例讲解,所以这里主要总结一下 etcd 对 raft 进行的应用和有关实现。

架构

etcd 在实现 raft 时将模块拆分为应用层和算法层两个模块。其中算法层是一个被应用层引用的静态库,在程序中两个模块会分布在两个独立的 goroutine 中,使用 channel 进行模块间通信。注意这里只讨论和 raft 有关的部分,不考虑其他功能层。另外代码主要以 v3 为准,v3 引入了 grpc,所以可以在数据结构里看到 pb 的 tag。

image-20240226224501424

算法层

算法是架构中专门用来实现 raft 算法的核心模块,主要工作是根据共识机制进行请求内容的正确性校验、对预写日志(write ahead log,WAL)的状态进行维护,以及可提交日志的进度推进,就是 raft 的那一套机制。

总之,算法层主要负责把应用层传来的输入日志做共识,然后返回达成一致的日志。

应用层

应用层是相对于 raft 算法面向服务对象的应用层,是 raft 节点里的中间层,向上与外部客户端通信,向下使用算法库,同时负责和其他模块交互。应用层主要涉及到网络通信、日志持久化、状态机管理等。

算法层数据结构

接下来看看算法层的一些核心数据结构。

首先是 Entry,和 raft 定义一致,一条 Entry 就是一笔预写日志,包含了普通类型和配置变更两种:

const (
    EntryNormal     EntryType = 0
    // 配置变更类的日志
    EntryConfChange EntryType = 1
)

type Entry struct {
    Term             uint64    `protobuf:"varint,2,opt,name=Term" json:"Term"`
    Index            uint64    `protobuf:"varint,3,opt,name=Index" json:"Index"`
    Type             EntryType `protobuf:"varint,1,opt,name=Type,enum=raftpb.EntryType" json:"Type"`
    Data             []byte    `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
}

Entry 包含了任期、索引、Data 三个字段。根据 raft 算法,term 和 indedx 构成了 entry 的全局索引。

对于 raft 算法来说,节点本质上是一个状态机,由输入的消息来不断地进行状态变更。这个改变状态机的输入消息就是 Message 结构体:

type Message struct {
    Type             MessageType `protobuf:"varint,1,opt,name=type,enum=raftpb.MessageType" json:"type"`
    To               uint64      `protobuf:"varint,2,opt,name=to" json:"to"`
    From             uint64      `protobuf:"varint,3,opt,name=from" json:"from"`
    Term             uint64      `protobuf:"varint,4,opt,name=term" json:"term"`
    LogTerm          uint64      `protobuf:"varint,5,opt,name=logTerm" json:"logTerm"`
    Index            uint64      `protobuf:"varint,6,opt,name=index" json:"index"`
    Entries          []Entry     `protobuf:"bytes,7,rep,name=entries" json:"entries"`
    Commit           uint64      `protobuf:"varint,8,opt,name=commit" json:"commit"`
    // ...
    Reject           bool        `protobuf:"varint,10,opt,name=reject" json:"reject"`
    RejectHint       uint64      `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"`
    // ...
}

Message 包含了日志同步消息、leader 选举消息、心跳消息等等内容,全都放在 MessageType里。(颇有我自己实现时候的不良美感)

需要注释的是,LogTerm、Index 对应的是 raft 论文的 LastLogTerm 和 LastLogIndex;Entries 是要完成同步的日志列表;Commit 是 leader 已提交的日志 inedx;Reject 标识响应结果为拒绝或赞同;RejectHint 用于日志同步时快速寻找冲突点(参考论文原文)。

raftLog 负责管理算法层里的预写日志,包含未持久和持久化的日志查询:

type raftLog struct {
    // 用于保存自从最后一次snapshot之后提交的数据
    storage Storage
    // 用于保存还没有持久化的数据和快照,这些数据最终都会保存到storage中
    unstable unstable
    // committed数据索引
    committed uint64
    // committed保存是写入持久化存储中的最高index,而applied保存的是传入状态机中的最高index
    // 即一条日志首先要提交成功(即committed),才能被applied到状态机中
    // 因此以下不等式一直成立:applied <= committed
    applied uint64
    // ...
}

这里的 Storage 是论文里没有的持久化日志的存储接口:

type Storage interface {
    // 返回保存的初始状态
    InitialState() (pb.HardState, pb.ConfState, error)
    // 返回索引范围在[lo,hi)之内并且不大于maxSize的entries数组
    Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
    // 传入一个索引值,返回这个索引值对应的任期号,如果不存在则error不为空,其中:
    // ErrCompacted:表示传入的索引数据已经找不到,说明已经被压缩成快照数据了。
    // ErrUnavailable:表示传入的索引值大于当前的最大索引
    Term(i uint64) (uint64, error)
    // 获得最后一条数据的索引值
    LastIndex() (uint64, error)
    // 返回第一条数据的索引值
    FirstIndex() (uint64, error)
    // ...
}

这个东西主要就是维护持久化日志的查询能力。

剩下未持久化的数据就留给 unstable 结构去管理:

type unstable struct {
    // ...
    // 还未持久化的数据
    entries []pb.Entry
    // offset用于保存entries数组中的数据的起始index
    offset  uint64
    // ...
}

其实就是一个未持久化log的列表,并维护一个在全局 logEntries 里的偏移量。

关于算法层与应用层交互的数据结构,etcd 里叫做 Ready,每当算法层完成一轮处理逻辑,就会往 channel 里塞一个这东西,封装处理好的结果:

type Ready struct {
    // 软状态是异变的,包括:当前集群leader、当前节点状态
    *SoftState
    // 硬状态需要被保存,包括:节点当前Term、Vote、Commit
    // 如果当前这部分没有更新,则等于空状态
    pb.HardState
    // 需要在消息发送之前被写入到持久化存储中的entries数据数组
    Entries []pb.Entry
    // ...
    // 需要输入到状态机中的数据数组,这些数据之前已经被保存到持久化存储中了
    CommittedEntries []pb.Entry
    // 在entries被写入持久化存储中以后,需要发送出去的数据
    Messages []pb.Message
}
  • 其中 SoftState 标识 raft 节点的软状态,就是不做持久化、通过协议机制和通信来维护的一些状态信息,包括自己的 leader 信息和自己的职位信息;

  • HardState 是 raft 节点的当前 term、自己投票的目标和自己已 commit 的 index,这部分信息根据 raft 协议必须要持久化;

  • Entries 是本轮算法层产生的 log 的列表,需要由应用层来完成持久化;

  • CommittedEntris 如其名,是本轮算法层已提交的 log,下面的工作是由应用层应用到状态机;

  • Message 是本轮算法层要往外发的网络信息,由应用层调网络模块发出。

至此,有了两层交换的数据结构,肯定也要有交换的接口,etcd 叫做 Node,主要对算法层的 raft 节点做抽象,负责两层的交互:

// Node represents a node in a raft cluster.
type Node interface {
    Tick()
    Propose(ctx context.Context, data []byte) error
    ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
    ApplyConfChange(cc pb.ConfChange) *pb.ConfState
    ReadIndex(ctx context.Context, rctx []byte) error
    Step(ctx context.Context, msg pb.Message) error
    Ready() <-chan Ready
    Advance()
    // ...
}
  • Tick:定时驱动信号,leader 心跳计时和选举计时都是以 tick 为单位;

  • Propose:应用层向算法层发起一笔写数据的请求,向 channel 传入一条消息,算法层的 goroutine 读到以后会驱动 raft 节点进入写请求的提议流程;

  • ProposeConfChange:应用层推动 raft 节点做配置变更;

  • ApplyConfChange:在配置变更提议在 raft 内已提交之后,向算法层发布配置变更内容,使变更立即生效;

  • ReadIndex:应用层向算法层发起读数据请求;

  • Step:向算法层传送一条任意类型消息;

  • Ready和Advance:获取 Ready channel 给应用层监听,当读取到 ready 信号时表明算法层已产生了新一轮的处理结果,应用层进行响应;处理完后调用 advance 方法向算法层表示应用层已处理完毕,进入下一轮。

接下来是剩下的和 raft 算法有关的数据结构。

type Progress struct {
    Match, Next uint64
}

progress 是 leader 用于记录其他节点日志同步进度的结构,对应论文的 matchIndex 数组 和 nextIndex 数组。

type raft struct {
    id uint64
    // 任期号
    Term uint64
    // 投票给哪个节点ID
    Vote uint64
    raftLog *raftLog
    prs         map[uint64]*Progress
    state StateType
    // 该map存放哪些节点投票给了本节点
    votes map[uint64]bool
    msgs []pb.Message
    lead uint64
    // 标识当前还有没有applied的配置数据
    pendingConf bool
    readOnly *readOnly
    electionElapsed int
    heartbeatElapsed int
    preVote     bool
    heartbeatTimeout int
    electionTimeout  int
    randomizedElectionTimeout int
    // tick函数,在到期的时候调用,不同的角色该函数不同
    tick func()
    step stepFunc
}

raft 结构是 raft 节点的总体抽象,和论文中对应,包含当前节点id、term、state 等等,随机的选举间隔等等也可以在这里找到。

应用层数据结构

下面根据 etcd 提供的 raft 运行示例的设计作为参考,看一下应用层的数据结构,如何维护与算法层的交互、状态机和 http 接口等等。

raftNode 是应用层对 raft 节点的封装,除了算法层的入口,还有与客户端、状态机、持久化相关的结构:

type raftNode struct {
    proposeC    <-chan string            // proposed messages (k,v)
    confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
    commitC     chan<- *string           // entries committed to log (k,v)
    // ...
    id          int      // client ID for raft session
    peers       []string // raft peer URLs
    // ...

    appliedIndex  uint64

    // raft backing for the commit/error channel
    node        raft.Node
    raftStorage *raft.MemoryStorage

    // ...
    transport *rafthttp.Transport
    // ...
}
  • 三个 channel 分别是接收客户端的写请求、配置变更请求,以及将已提交的日志应用到状态机;
  • 保留有 peers、appendIndex、node(上面说的算法层入口)等与算法层相关的信息;
  • raftStorage 持久化日志的存储模块;
  • transport 网络通信模块。

kvstore 是 example 实现的简易版k-v存储模块,用来模拟 etcd 的 kv 和 raft 节点的交互:

type kvstore struct {
    proposeC    chan<- string // channel for proposing updates
    mu          sync.RWMutex
    kvStore     map[string]string // current committed key-value pairs
}

其中 proposeC 是和 raftNode 同一个的 channel,负责向 raftNode 发来自客户端的写请求。

在 kvstore 模块启动时,会注入一个 commitC channel,kvstore 会持续监听它,收到 raftNode 提交的日志后将其应用到状态机(map)。

另外 example 还实现了一个简单的 http api,通过 PUT 表示写请求,POST 表示添加节点的配置变更请求,DELETE 表示删除节点的配置变更请求。

应用层与算法层的交互流程

应用层运行

example 实现的简易版应用层中,startRaft 方法包含了应用层和算法层的初始化和启动过程:

func (rc *raftNode) startRaft() {
    // ...
    rpeers := make([]raft.Peer, len(rc.peers))
    for i := range rpeers {
        rpeers[i] = raft.Peer{ID: uint64(i + 1)}
    }
    c := &raft.Config{
        ID:              uint64(rc.id),
        ElectionTick:    10,
        HeartbeatTick:   1,
        Storage:         rc.raftStorage,
    }

    startPeers := rpeers
    rc.node = raft.StartNode(c, startPeers)

    // ...
    rc.transport = &rafthttp.Transport{
        ID:          types.ID(rc.id),
        ClusterID:   0x1000,
        Raft:        rc,
        ServerStats: ss,
        LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
        ErrorC:      make(chan error),
    }

    rc.transport.Start()
    for i := range rc.peers {
        if i+1 != rc.id {
            rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
        }
    }
    
    go rc.serveRaft()
    go rc.serveChannel
}

这个过程包含了获取其他 raft 节点的信息、创建 raft 节点配置、启动算法层 Node、启动通信模块、开启 raftNode 主循环(用于与算法层的 goroutine 联系)。

这个循环主要是干什么呢:

首先,启动新协程监听 proposeC 和 ConfChangeC 两个 channel 来接收客户端的 PUT 和配置变更请求,然后调用 Node 接口的 api将其发给算法层:

go func() {
        // ...
        for {
            select {
            case prop := <-rc.proposeC:
                 rc.node.Propose(context.TODO(), []byte(prop))
            case cc, ok := <-rc.confChangeC:
                  cc.ID = confChangeCount
                  rc.node.ProposeConfChange(context.TODO(), cc)
            }
        }
        // client closed channel; shutdown raft if not already
        close(rc.stopc)
   }

接下来,启动 tick 定时器,每个 tick 默认 100ms,定时调用 Node.Tick 方法驱动算法层执行定时函数:

ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()
    for{
        select {
        case <-ticker.C:
            rc.node.Tick()
    }

之后,raftNode 会通过上面提到的 node.Ready 接收算法层处理结果、用 node.Advance 进行响应;期间对待持久化的日志做持久化,调用通信模块发算法层要发的消息,与状态机交互并应用已提交的日志。

算法层运行

算法层的启动是从 StartNode 开始的:

func StartNode(c *Config, peers []Peer) Node {
    r := newRaft(c)
    // 初次启动以term为1来启动
    r.becomeFollower(1, None)
    for _, peer := range peers {
        cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
        d, err := cc.Marshal()
        if err != nil {
            panic("unexpected marshal error")
        }
        e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
        r.raftLog.append(e)
    }
    
    r.raftLog.committed = r.raftLog.lastIndex()
    
    for _, peer := range peers {
        r.addNode(peer.ID)
    }

    n := newNode()
    go n.run(r)
    return &n
}

这里的 becomeFollower 等等和自己实现的 raft 基本一致,就是做节点启动的初始化,和论文一样。

启动之初,会把集群中的其他节点封装成配置变更信息,添加到非持久化日志里,并且视为已提交。

之后,异步调用 node.Run 方法,启动算法层 goroutine:

func (n *node) run(r *raft) {
    var propc chan pb.Message
    var readyc chan Ready
    var advancec chan struct{}
    var prevLastUnstablei, prevLastUnstablet uint64
    var havePrevLastUnstablei bool
    var rd Ready


    lead := None
    prevSoftSt := r.softState()
    prevHardSt := emptyState


    for {
        if advancec != nil {
            // advance channel不为空,说明还在等应用调用Advance接口通知已经处理完毕了本次的ready数据
            readyc = nil
        } else {
            rd = newReady(r, prevSoftSt, prevHardSt)
            if rd.containsUpdates() {
                // 如果这次ready消息有包含更新,那么ready channel就不为空
                readyc = n.readyc
            } else {
                // 否则为空
                readyc = nil
            }
        }
        
        // ...
        select {
        case m := <-propc:
            // 处理本地收到的提交值
            m.From = r.id
            r.Step(m)
        case m := <-n.recvc:
            // 处理其他节点发送过来的提交值
            // filter out response message from unknown From.
            if _, ok := r.prs[m.From]; ok || !IsResponseMsg(m.Type) {
                // 需要确保节点在集群中或者不是应答类消息的情况下才进行处理
                r.Step(m) // raft never returns an error
            }
        case cc := <-n.confc:
            // 接收到配置发生变化的消息
            if cc.NodeID == None {
                // NodeId为空的情况,只需要直接返回当前的nodes就好
                r.resetPendingConf()
                select {
                case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
                case <-n.done:
                }
                break
            }
            switch cc.Type {
            case pb.ConfChangeAddNode:
                r.addNode(cc.NodeID)
            case pb.ConfChangeRemoveNode:
                // 如果删除的是本节点,停止提交
                if cc.NodeID == r.id {
                    propc = nil
                }
                r.removeNode(cc.NodeID)
            // ...
            }
            // ...
        case <-n.tickc:
            r.tick()
        case readyc <- rd:
            // 通过channel写入ready数据
            // 以下先把ready的值保存下来,等待下一次循环使用,或者当advance调用完毕之后用于修改raftLog的
            if rd.SoftState != nil {
                prevSoftSt = rd.SoftState
            }
            if len(rd.Entries) > 0 {
                // 保存上一次还未持久化的entries的index、term
                prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
                prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
                havePrevLastUnstablei = true
            }
            if !IsEmptyHardState(rd.HardState) {
                prevHardSt = rd.HardState
            }
            // ...
            r.msgs = nil
            r.readStates = nil
            // 修改advance channel不为空,等待接收advance消息
            advancec = n.advancec
        case <-advancec:
            // 收到advance channel的消息
            if prevHardSt.Commit != 0 {
                // 将committed的消息applied
                r.raftLog.appliedTo(prevHardSt.Commit)
            }
            advancec = nil
            // ...
    }
}

这里用了一大组 select,还有嵌套,主要是为了确保 ready 和 advance 两个方法是成对调用的,且算法层没有新结果产生时不会向应用层提交 ready 消息。