一、原理
Lab2要求实现一套比较完整的Raft协议,分为A,B,C,D四个部分。
Raft原论文:In Search of an Understandable Consensus Algorithm (mit.edu)
Raft是一个分布式一致性算法。相比于Paxos,Raft的机制更简单直白适合学习。Raft将客户端请求转化为线性的Log
,使用Log
的一致性保证所有冗余副本(replica
)的一致性。
Raft将一组一致的冗余服务器统一交给一个leader
管理,但leader不是固定的,而是通过各个服务器的选举产生,保证在leader可能出现问题的情况下的容错。Leader将负责全部replica的log同步,以及与客户端的交互。
Lab2A
Lab2A需要实现Raft的leader election(领导选举)部分,实现Raft的两个rpc:AppendEntriesRPC和RequestVoteRPC,实现在网络中断、服务器高延迟或掉线等情况下的leader重选,而能够保持只有一个leader在机架中生效。
和其他lab一样,这里基于给出的部分框架go代码完成要求的功能。
论文中的各类声明、处理逻辑、rpc等可以完全按照论文的图2:
相关原理
Raft的领导选举主要基于Term
(任期)和随机发起选举周期
来处理。leader为了显示其功能正常,需要定期(本lab设为150ms)向其他replica(followers
)发送心跳(heartBeat
),从而维持其在followers中的领导地位。
任期是维护leader秩序的机制,一个任期最多只能有一个leader。随机选举周期是所有服务器均内置的机制,只对非leader服务器有效。若随机选举周期在一个随机的时间(本lab设为150ms-650ms)没有被重置,将会触发该服务器发起选举,推进任期。因此,根据随机选举周期的随机性,在leader失效后(或者Raft开始运行时),将有一个服务器率先发起选举,推进新的任期。
一个服务器发起选举后,将从follower状态转为candidate
(候选人),并向所有其他服务器请求投票。(在lab2A中)如果candidate的term>=被请求投票服务器的term,且该服务器在本term没投过票,则可以将票投给candidate。获取了大多数(多于半数)服务器同意票的candidate可以成为新的leader。
当然,也可能出现多个follower同时转变为candidate的情况,就可能导致没有任何candidate可以获得多数投票。此时只需要等待某服务器的随机选举周期重新率先到来,发起新一轮term的选举即可。
选举安全性
在设计算法实现时,必须要能够保证选举的安全性,也就是每个term最多只能有一个有效的leader。不论是某个服务器的网络中断,或者是随机选举周期巧合冲突的情况,都要保证不能有多个leader同时有效。
term的设计就是为了保证同一时间leader的信息同步。为了确保leader的信息能够同步,我们要求所有服务器之间的term信息必须是最新同步的——也就是说,在服务器之间的所有通信(AppendEntriesRPC,RequestVoteRPC)的请求参数和返回参数中,都要对Term进行交换,即时更新可能的新term。
Term的更新是由于部分仍然联网可用的服务器发现leader已经失效并发起选举。为了保证leader信息和term强关联,一旦在通信中(包括两个rpc中交换的全部数据)发现有其他服务器的term比自己的更新,candidate和leader都应当认为自己的信息已过期,此时应该有更新的leader正在掌权(或者candidate正在收集选票),此时他们都应该转为follower。
二、项目结构
lab2的全部文件在src目录下的raft目录中。
所有需要编写的Raft的内容都储存在raft.go
中,不需要修改其他go文件代码。
此外,raft目录中还提供了可供调用的rpc和持久化,在需要时进行调用即可。
测试程序test_test.go
用于给我们运行并进行评测。可以查看其中的代码来了解测试的方式。
运行raft时,测试程序会调用Make
函数来初始化你的raft,所以可以在该函数中对raft节点进行初始化,以及运行一些需要的go程(如rf.ticker()
)。
关于rpc调用,raft.go
中已经提供了调用示例,仿照示例编写即可。
要运行2A的测试,执行go test -run 2A -race
,即可运行raft并查看评测结果。-race是为了检查是否有race(多线程数据冲突)的情况,及时debug。
三、代码实现
1. 数据结构声明
在2A中不需要实现有关Log的操作,但还是先做好相关的声明。
按照论文Figure 2的信息,创建结构体如下:
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
currentTerm int
votedFor int `default:"-1"`
Log []LogEntry
commitIndex int
lastApplied int
State int //状态,0 follower 1: candidate 2: leader
nextActiveTime time.Time //下次造反时间
//candidate使用
votesGot int //收到的票数
//leader使用
nextIndex []int
matchIndex []int
}
type LogEntry struct {
Index int
Term int
Command interface{}
}
其中LogEntry
是针对Log的数据结构,2A还用不到。
注意这里使用nextActiveTime来标记下次随机选举周期的时间,可以理解为下次造反的时间。
2. 完善交互相关
首先把评测会用到的rf.GetState()
完成:
func (rf *Raft) GetState() (int, bool) {
var term int
var isleader bool
// Your code here (2A).
rf.mu.Lock()
term = rf.currentTerm
isleader = rf.State == 2
rf.mu.Unlock()
return term, isleader
}
然后按照论文完成两种RPC交互的请求和回复参数的数据结构:
type RequestVoteArgs struct {
Term int
CandidateId int
LastLogIndex int
LastLogTerm int
}
type RequestVoteReply struct {
Term int
VoteGranted bool
}
type AppendEntriesArg struct {
Term int
LeaderId int
PrevLogIndex int
PrevLogTerm int
Entries []LogEntry
LeaderCommit int
}
type AppendEntriesReply struct {
Term int
Success bool
}
Make
函数用于框架来创建你的raft节点。所以在这里做好初始化:
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
// Your initialization code here (2A, 2B, 2C).
rf.votedFor = -1
randomTerm := 150 + rand.Intn(500)
rf.nextActiveTime = time.Now().Add(time.Duration(randomTerm) * time.Millisecond)
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
// start ticker goroutine to start elections
go rf.ticker()
return rf
}
3. HeartBeat相关
HeartBeat相关代码主要用于leader定期向其他服务器发送心跳。
HeartBeat由伴随leader的rf.doHeartBeat()
方法发起,每150ms循环一次,直到该节点不再是leader为止:
func (rf *Raft) doHeartBeat() {
rf.mu.Lock()
for rf.State == 2 && rf.killed() == false {
var arg AppendEntriesArg
if len(rf.Log) == 0 {
arg = AppendEntriesArg{
rf.currentTerm, rf.me, 0,
0, nil, rf.commitIndex}
} else {
arg = AppendEntriesArg{
rf.currentTerm, rf.me, rf.lastApplied,
rf.Log[rf.lastApplied].Term, nil, rf.commitIndex}
}
rf.mu.Unlock()
go rf.executeHeartBeat(arg)
time.Sleep(time.Millisecond * 150)
rf.mu.Lock()
}
rf.mu.Unlock()
}
这里先不对log进行处理。由于执行rpc可能会受网络影响(测试程序触发)等导致运行受阻,所以需要另开go程处理:
func (rf *Raft) executeHeartBeat(arg AppendEntriesArg) {
maxTerm := 0
allReplySuccess := true
for i := 0; i < len(rf.peers); i++ {
if i != rf.me {
reply := AppendEntriesReply{}
rf.sendAppendEntries(i, &arg, &reply)
if reply.Term > maxTerm {
maxTerm = reply.Term
}
if !reply.Success {
allReplySuccess = false
}
}
}
if allReplySuccess {
return
}
rf.mu.Lock()
if rf.currentTerm < maxTerm {
rf.currentTerm = maxTerm
rf.State = 0
}
rf.mu.Unlock()
}
需要注意的是,leader必须借助heartbeat与其他服务器交换term信息,以保证term时刻是最新。如果任意一方发现对方的term更新,则应当转为follower并重置相关属性。
对心跳的处理:(不含Log部分)
func (rf *Raft) AppendEntriesHandler(args *AppendEntriesArg, reply *AppendEntriesReply) {
//注意这个是handler
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
success := true
// 收到来自有效leader的心跳
if args.Term >= rf.currentTerm {
rf.votedFor = -1
randomTerm := 150 + rand.Intn(500)
rf.nextActiveTime = time.Now().Add(time.Duration(randomTerm) * time.Millisecond)
//println(rf.me, "收到来自leader的有效心跳")
if rf.State != 0 {
rf.State = 0
rf.currentTerm = args.Term
}
}
if args.Term < rf.currentTerm {
success = false
} else {
rf.currentTerm = args.Term
}
reply.Success = success
}
这里按论文要求对返回的success进行判断。
由于这里是rpc的handler部分,所以需要对被触发rpc的节点打上锁,避免race。
如果收到的leader心跳的term>=本节点的term,那么将这次心跳视为有效心跳,重置自身的选举随机周期,并同步可能的term不一致。如果本节点是leader或candidate,收到有效心跳后应当转为follower。
4. RequestVote相关
首先是go程ticker
,用于循环检测是否已经到了随机选举周期,看是否该发起选举了。
ticker每10ms检查一次,这样设计的原因是raft要求能够在任意时刻更新随机选举周期,也就是要更新下次发起选举的时间点,而为了不耦合ticker循环,因此改变下次选举时间只需要更新raft中的数据,不需要涉及ticker循环。
func (rf *Raft) ticker() {
for rf.killed() == false {
// Your code here to check if a leader election should
// be started and to randomize sleeping time using
// time.Sleep().
rf.mu.Lock()
if rf.State != 1 {
rf.votesGot = 0
}
if rf.State != 2 {
if time.Now().After(rf.nextActiveTime) {
// 随机周期内没有收到心跳,转变为candidate
randomTerm := 150 + rand.Intn(500)
rf.nextActiveTime = time.Now().Add(time.Duration(randomTerm) * time.Millisecond)
rf.State = 1
println(rf.me, "号没有收到心跳,发起了投票")
rf.launchElection()
}
}
rf.mu.Unlock()
time.Sleep(time.Millisecond * 10)
}
}
注意在发起选举时,需要重置随机选举周期。
然后是发起选举的具体过程:
func (rf *Raft) launchElection() {
rf.currentTerm++
rf.votedFor = rf.me
rf.votesGot = 1
lenLog := len(rf.Log)
var arg RequestVoteArgs
if lenLog == 0 {
arg = RequestVoteArgs{rf.currentTerm, rf.me, 0, 0}
} else {
arg = RequestVoteArgs{rf.currentTerm, rf.me, rf.Log[lenLog-1].Index, rf.Log[lenLog-1].Term}
}
for i := 0; i < len(rf.peers); i++ {
if i != rf.me {
go rf.executeElection(i, rf.currentTerm, arg)
}
}
}
由于发起选举涉及的更新需要分别、及时执行,所以针对除自己外的每一个服务器,都创建一个新的go程用于发送投票请求。这里暂时不处理log有关的事务。
注意发起选举时要给自己的term+1,同时要投票给自己。
请求投票go程:
func (rf *Raft) executeElection(server int, term int, arg RequestVoteArgs) {
reply := RequestVoteReply{}
rf.sendRequestVote(server, &arg, &reply)
rf.mu.Lock()
//println(rf.me, "在term", rf.currentTerm, "收到了", server, "的投票回信:", reply.VoteGranted)
if rf.currentTerm < reply.Term {
rf.State = 0
rf.currentTerm = reply.Term
rf.votedFor = -1
//println(rf.me, "在candidate时收到来自更高term的消息,转follower")
}
if reply.VoteGranted && rf.State == 1 {
rf.votesGot++
if rf.votesGot > len(rf.peers)/2 {
println(rf.me, "成为了leader!")
rf.State = 2
go rf.doHeartBeat()
}
}
rf.mu.Unlock()
}
取得结果后,首先检查自己的term是否落后。如果落后,则转为follower,并跟进新term。
如果此时获得了服务器总数/2+1的票数,那么节点就可以变为leader。注意变为leader要启动leader专属的doHeartBeat go程。
最后是处理投票的Handler:
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
//注意这个是handler
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm || rf.votedFor != -1 || rf.State != 0 {
reply.VoteGranted = false
} else {
reply.VoteGranted = true
rf.votedFor = args.CandidateId
randomTerm := 150 + rand.Intn(500)
rf.nextActiveTime = time.Now().Add(time.Duration(randomTerm) * time.Millisecond)
}
if args.Term > rf.currentTerm && rf.State == 1 {
rf.State = 0
rf.currentTerm = args.Term
rf.votedFor = -1
}
}
在处理别处的vote请求时,首先还是检查term。只有candidate的term不比自己的term小时,才能投票,否则一律拒绝。另外,如果自己已经投票(votefor!=-1),也不能再投一次。
投票之后,也对选举周期进行一次重置,减少多次选举同时进行的情况。
当然,这次通信也需要对term进行同步检查。如果收到的term比自己的term大,则一律更新term并转为follower,不论自己是什么身份。
5. RPC
还有两个rpc的具体发送请求,附在下面。
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
return ok
}
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArg, reply *AppendEntriesReply) {
rf.peers[server].Call("Raft.AppendEntriesHandler", args, reply)
}
三、注意事项和总结
- 编写分布式系统的很重要的原则:代码尽量简单明确,逻辑清晰。因为分布式系统的调试非常困难,所以必须先要对思想理论进行证明,理解后再进行代码编写。代码尽量清晰简洁,在能满足需求的情况下尽量简单,便于后期查错和调试。调试应当越调越简单,而不应该越调越混乱。如果需要,应当果断重构。
- 考虑到分布式系统都存在多线程交互以及外部访问,所以在编写时要注意考虑多线程问题,比如加锁。在修改锁状态前先理清逻辑,以免出现race的问题。另外也不要编写多个锁制造理解困难。
- 考虑到循环检测会出现大段sleep的时间,注意编写的逻辑不要在sleep时有锁。
测试结果截图: