MIT6.824分布式系统Lab2-Raft-A笔记.md

MIT6.824分布式系统Lab2-Raft-A笔记.md

tk_sky 115 2022-11-23

一、原理

Lab2要求实现一套比较完整的Raft协议,分为A,B,C,D四个部分。

Raft原论文:In Search of an Understandable Consensus Algorithm (mit.edu)

Raft是一个分布式一致性算法。相比于Paxos,Raft的机制更简单直白适合学习。Raft将客户端请求转化为线性的Log,使用Log的一致性保证所有冗余副本(replica)的一致性。

Raft将一组一致的冗余服务器统一交给一个leader管理,但leader不是固定的,而是通过各个服务器的选举产生,保证在leader可能出现问题的情况下的容错。Leader将负责全部replica的log同步,以及与客户端的交互。

Lab2A

Lab2A需要实现Raft的leader election(领导选举)部分,实现Raft的两个rpc:AppendEntriesRPC和RequestVoteRPC,实现在网络中断、服务器高延迟或掉线等情况下的leader重选,而能够保持只有一个leader在机架中生效。

和其他lab一样,这里基于给出的部分框架go代码完成要求的功能。

论文中的各类声明、处理逻辑、rpc等可以完全按照论文的图2:

相关原理

Raft的领导选举主要基于Term(任期)和随机发起选举周期来处理。leader为了显示其功能正常,需要定期(本lab设为150ms)向其他replica(followers)发送心跳(heartBeat),从而维持其在followers中的领导地位。

任期是维护leader秩序的机制,一个任期最多只能有一个leader。随机选举周期是所有服务器均内置的机制,只对非leader服务器有效。若随机选举周期在一个随机的时间(本lab设为150ms-650ms)没有被重置,将会触发该服务器发起选举,推进任期。因此,根据随机选举周期的随机性,在leader失效后(或者Raft开始运行时),将有一个服务器率先发起选举,推进新的任期。

一个服务器发起选举后,将从follower状态转为candidate(候选人),并向所有其他服务器请求投票。(在lab2A中)如果candidate的term>=被请求投票服务器的term,且该服务器在本term没投过票,则可以将票投给candidate。获取了大多数(多于半数)服务器同意票的candidate可以成为新的leader。

当然,也可能出现多个follower同时转变为candidate的情况,就可能导致没有任何candidate可以获得多数投票。此时只需要等待某服务器的随机选举周期重新率先到来,发起新一轮term的选举即可。

选举安全性

在设计算法实现时,必须要能够保证选举的安全性,也就是每个term最多只能有一个有效的leader。不论是某个服务器的网络中断,或者是随机选举周期巧合冲突的情况,都要保证不能有多个leader同时有效。

term的设计就是为了保证同一时间leader的信息同步。为了确保leader的信息能够同步,我们要求所有服务器之间的term信息必须是最新同步的——也就是说,在服务器之间的所有通信(AppendEntriesRPC,RequestVoteRPC)的请求参数和返回参数中,都要对Term进行交换,即时更新可能的新term。

Term的更新是由于部分仍然联网可用的服务器发现leader已经失效并发起选举。为了保证leader信息和term强关联,一旦在通信中(包括两个rpc中交换的全部数据)发现有其他服务器的term比自己的更新,candidate和leader都应当认为自己的信息已过期,此时应该有更新的leader正在掌权(或者candidate正在收集选票),此时他们都应该转为follower。

二、项目结构

lab2的全部文件在src目录下的raft目录中。

所有需要编写的Raft的内容都储存在raft.go中,不需要修改其他go文件代码。

此外,raft目录中还提供了可供调用的rpc和持久化,在需要时进行调用即可。

测试程序test_test.go用于给我们运行并进行评测。可以查看其中的代码来了解测试的方式。

运行raft时,测试程序会调用Make函数来初始化你的raft,所以可以在该函数中对raft节点进行初始化,以及运行一些需要的go程(如rf.ticker())。

关于rpc调用,raft.go中已经提供了调用示例,仿照示例编写即可。

要运行2A的测试,执行go test -run 2A -race,即可运行raft并查看评测结果。-race是为了检查是否有race(多线程数据冲突)的情况,及时debug。

三、代码实现

1. 数据结构声明

在2A中不需要实现有关Log的操作,但还是先做好相关的声明。

按照论文Figure 2的信息,创建结构体如下:

type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Your data here (2A, 2B, 2C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.
	currentTerm int
	votedFor    int `default:"-1"`
	Log         []LogEntry
	commitIndex int
	lastApplied int

	State          int       //状态,0 follower 1: candidate 2: leader
	nextActiveTime time.Time //下次造反时间

	//candidate使用
	votesGot int //收到的票数

	//leader使用
	nextIndex  []int
	matchIndex []int
}

type LogEntry struct {
	Index   int
	Term    int
	Command interface{}
}

其中LogEntry是针对Log的数据结构,2A还用不到。

注意这里使用nextActiveTime来标记下次随机选举周期的时间,可以理解为下次造反的时间。

2. 完善交互相关

首先把评测会用到的rf.GetState()完成:

func (rf *Raft) GetState() (int, bool) {

	var term int
	var isleader bool
	// Your code here (2A).
	rf.mu.Lock()
	term = rf.currentTerm
	isleader = rf.State == 2
	rf.mu.Unlock()
	return term, isleader
}

然后按照论文完成两种RPC交互的请求和回复参数的数据结构:

type RequestVoteArgs struct {
	Term         int
	CandidateId  int
	LastLogIndex int
	LastLogTerm  int
}

type RequestVoteReply struct {
	Term        int
	VoteGranted bool
}

type AppendEntriesArg struct {
	Term         int
	LeaderId     int
	PrevLogIndex int
	PrevLogTerm  int
	Entries      []LogEntry
	LeaderCommit int
}

type AppendEntriesReply struct {
	Term    int
	Success bool
}

Make函数用于框架来创建你的raft节点。所以在这里做好初始化:

func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{}
	rf.peers = peers
	rf.persister = persister
	rf.me = me

	// Your initialization code here (2A, 2B, 2C).
	rf.votedFor = -1
	randomTerm := 150 + rand.Intn(500)
	rf.nextActiveTime = time.Now().Add(time.Duration(randomTerm) * time.Millisecond)

	// initialize from state persisted before a crash
	rf.readPersist(persister.ReadRaftState())

	// start ticker goroutine to start elections
	go rf.ticker()

	return rf
}

3. HeartBeat相关

HeartBeat相关代码主要用于leader定期向其他服务器发送心跳。

HeartBeat由伴随leader的rf.doHeartBeat()方法发起,每150ms循环一次,直到该节点不再是leader为止:

func (rf *Raft) doHeartBeat() {
	rf.mu.Lock()
	for rf.State == 2 && rf.killed() == false {

		var arg AppendEntriesArg
		if len(rf.Log) == 0 {
			arg = AppendEntriesArg{
				rf.currentTerm, rf.me, 0,
				0, nil, rf.commitIndex}
		} else {
			arg = AppendEntriesArg{
				rf.currentTerm, rf.me, rf.lastApplied,
				rf.Log[rf.lastApplied].Term, nil, rf.commitIndex}
		}

		rf.mu.Unlock()
		go rf.executeHeartBeat(arg)

		time.Sleep(time.Millisecond * 150)
		rf.mu.Lock()
	}
	rf.mu.Unlock()
}

这里先不对log进行处理。由于执行rpc可能会受网络影响(测试程序触发)等导致运行受阻,所以需要另开go程处理:

func (rf *Raft) executeHeartBeat(arg AppendEntriesArg) {
	maxTerm := 0
	allReplySuccess := true
	for i := 0; i < len(rf.peers); i++ {
		if i != rf.me {
			reply := AppendEntriesReply{}
			rf.sendAppendEntries(i, &arg, &reply)
			if reply.Term > maxTerm {
				maxTerm = reply.Term
			}
			if !reply.Success {
				allReplySuccess = false
			}
		}
	}

	if allReplySuccess {
		return
	}

	rf.mu.Lock()
	if rf.currentTerm < maxTerm {
		rf.currentTerm = maxTerm
		rf.State = 0
	}
	rf.mu.Unlock()
}

需要注意的是,leader必须借助heartbeat与其他服务器交换term信息,以保证term时刻是最新。如果任意一方发现对方的term更新,则应当转为follower并重置相关属性。

对心跳的处理:(不含Log部分)

func (rf *Raft) AppendEntriesHandler(args *AppendEntriesArg, reply *AppendEntriesReply) {
	//注意这个是handler
	rf.mu.Lock()
	defer rf.mu.Unlock()

	reply.Term = rf.currentTerm
	success := true

	// 收到来自有效leader的心跳
	if args.Term >= rf.currentTerm {
		rf.votedFor = -1
		randomTerm := 150 + rand.Intn(500)
		rf.nextActiveTime = time.Now().Add(time.Duration(randomTerm) * time.Millisecond)
		//println(rf.me, "收到来自leader的有效心跳")
		if rf.State != 0 {
			rf.State = 0
			rf.currentTerm = args.Term
		}
	}

	if args.Term < rf.currentTerm {
		success = false
	} else {
		rf.currentTerm = args.Term
	}

	reply.Success = success
}

这里按论文要求对返回的success进行判断。

由于这里是rpc的handler部分,所以需要对被触发rpc的节点打上锁,避免race。

如果收到的leader心跳的term>=本节点的term,那么将这次心跳视为有效心跳,重置自身的选举随机周期,并同步可能的term不一致。如果本节点是leader或candidate,收到有效心跳后应当转为follower。

4. RequestVote相关

首先是go程ticker,用于循环检测是否已经到了随机选举周期,看是否该发起选举了。

ticker每10ms检查一次,这样设计的原因是raft要求能够在任意时刻更新随机选举周期,也就是要更新下次发起选举的时间点,而为了不耦合ticker循环,因此改变下次选举时间只需要更新raft中的数据,不需要涉及ticker循环。

func (rf *Raft) ticker() {

	for rf.killed() == false {
		// Your code here to check if a leader election should
		// be started and to randomize sleeping time using
		// time.Sleep().
		rf.mu.Lock()
		if rf.State != 1 {
			rf.votesGot = 0
		}
		if rf.State != 2 {
			if time.Now().After(rf.nextActiveTime) {
				// 随机周期内没有收到心跳,转变为candidate
				randomTerm := 150 + rand.Intn(500)
				rf.nextActiveTime = time.Now().Add(time.Duration(randomTerm) * time.Millisecond)
				rf.State = 1
				println(rf.me, "号没有收到心跳,发起了投票")
				rf.launchElection()
			}
		}
		rf.mu.Unlock()
		time.Sleep(time.Millisecond * 10)
	}
}

注意在发起选举时,需要重置随机选举周期。

然后是发起选举的具体过程:

func (rf *Raft) launchElection() {

	rf.currentTerm++
	rf.votedFor = rf.me
	rf.votesGot = 1

	lenLog := len(rf.Log)
	var arg RequestVoteArgs
	if lenLog == 0 {
		arg = RequestVoteArgs{rf.currentTerm, rf.me, 0, 0}
	} else {
		arg = RequestVoteArgs{rf.currentTerm, rf.me, rf.Log[lenLog-1].Index, rf.Log[lenLog-1].Term}
	}

	for i := 0; i < len(rf.peers); i++ {
		if i != rf.me {
			go rf.executeElection(i, rf.currentTerm, arg)
		}
	}

}

由于发起选举涉及的更新需要分别、及时执行,所以针对除自己外的每一个服务器,都创建一个新的go程用于发送投票请求。这里暂时不处理log有关的事务。

注意发起选举时要给自己的term+1,同时要投票给自己。

请求投票go程:

func (rf *Raft) executeElection(server int, term int, arg RequestVoteArgs) {

	reply := RequestVoteReply{}
	rf.sendRequestVote(server, &arg, &reply)

	rf.mu.Lock()
	//println(rf.me, "在term", rf.currentTerm, "收到了", server, "的投票回信:", reply.VoteGranted)

	if rf.currentTerm < reply.Term {
		rf.State = 0
		rf.currentTerm = reply.Term
		rf.votedFor = -1
		//println(rf.me, "在candidate时收到来自更高term的消息,转follower")
	}

	if reply.VoteGranted && rf.State == 1 {
		rf.votesGot++
		if rf.votesGot > len(rf.peers)/2 {
			println(rf.me, "成为了leader!")
			rf.State = 2
			go rf.doHeartBeat()
		}
	}
	rf.mu.Unlock()

}

取得结果后,首先检查自己的term是否落后。如果落后,则转为follower,并跟进新term。

如果此时获得了服务器总数/2+1的票数,那么节点就可以变为leader。注意变为leader要启动leader专属的doHeartBeat go程。

最后是处理投票的Handler:

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	// Your code here (2A, 2B).
	//注意这个是handler
	rf.mu.Lock()
	defer rf.mu.Unlock()

	reply.Term = rf.currentTerm

	if args.Term < rf.currentTerm || rf.votedFor != -1 || rf.State != 0 {
		reply.VoteGranted = false
	} else {
		reply.VoteGranted = true
		rf.votedFor = args.CandidateId
		randomTerm := 150 + rand.Intn(500)
		rf.nextActiveTime = time.Now().Add(time.Duration(randomTerm) * time.Millisecond)
	}
	if args.Term > rf.currentTerm && rf.State == 1 {
		rf.State = 0
		rf.currentTerm = args.Term
		rf.votedFor = -1
	}

}

在处理别处的vote请求时,首先还是检查term。只有candidate的term不比自己的term小时,才能投票,否则一律拒绝。另外,如果自己已经投票(votefor!=-1),也不能再投一次。

投票之后,也对选举周期进行一次重置,减少多次选举同时进行的情况。

当然,这次通信也需要对term进行同步检查。如果收到的term比自己的term大,则一律更新term并转为follower,不论自己是什么身份。

5. RPC

还有两个rpc的具体发送请求,附在下面。

func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
	ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
	return ok
}

func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArg, reply *AppendEntriesReply) {
	rf.peers[server].Call("Raft.AppendEntriesHandler", args, reply)
}

三、注意事项和总结

  • 编写分布式系统的很重要的原则:代码尽量简单明确,逻辑清晰。因为分布式系统的调试非常困难,所以必须先要对思想理论进行证明,理解后再进行代码编写。代码尽量清晰简洁,在能满足需求的情况下尽量简单,便于后期查错和调试。调试应当越调越简单,而不应该越调越混乱。如果需要,应当果断重构。
  • 考虑到分布式系统都存在多线程交互以及外部访问,所以在编写时要注意考虑多线程问题,比如加锁。在修改锁状态前先理清逻辑,以免出现race的问题。另外也不要编写多个锁制造理解困难。
  • 考虑到循环检测会出现大段sleep的时间,注意编写的逻辑不要在sleep时有锁。

测试结果截图: