MIT6.824分布式系统Lab2-Raft-B笔记.md

MIT6.824分布式系统Lab2-Raft-B笔记.md

tk_sky 302 2022-11-29

一、概要

论文链接:

In Search of an Understandable Consensus Algorithm (mit.edu)

Lab2B需要实现Raft协议中的Log部分,也就是需要开始处理Log 有关 Replication的问题了。

Lab2B可能是整个lab中最困难的一步,需要大量调试,可能需要反复仔细阅读论文,揣摩论文里某些词句的意思,以及按照Figure 2 严格认真的实现。另外,还需要实现论文中没有实现的Fast Roll-back,能够快速回滚有冲突的Log。

在lab2A中已经实现了leader election的机制,但这个机制没有考虑Log相关内容,所以在2B中要进行完善。另外由于2A中对AppendEntry的实现比较简便,采用了线性发rpc的方法,而2B中有test对时限要求较高,所以需要先进行改写。

做2B之前,先确保看懂了论文的第五部分,并且确保lab2A的代码能在绝大多数情况下通过测试。

二、原理

Raft使用Log来管理传递到状态机的指令,并通过选出的leader来统一管理和确保Log的一致性。

Log在每个服务器内都由一个统一的序列Log[]管理。每个log在log列表中的位置称为LogIndex。第一个被添加进的log的logIndex为1 。

在集群中,leader负责和客户交接,传递新到的指令并维护集群的一致性,即集群稳定后所有服务器的log都一致,且尽可能存下多的客户指令。因此,只有拥有了最新log的服务器才能作为leader。

要对比哪个log是最新的,首先对比最后一个log的term,以term更新的那个为最新。如果term相同,取log[]最长的为最新。

leader通过AppendEntriesRPC与follower进行交流。如果有新的指令进入,也通过该RPC传递到follower。由于AppendEntriesRPC也有2A中涉及的维持心跳和选举功能的作用,所以其参数Entries可空,表示不添加新的log。但无论是否添加新log,follower收到心跳后都应当根据arg的信息进行一致性检查

如何进行一致性检查?

  • 由于2A中有关Term的特性,各服务器的log列表中Log们记录下的term必为递增排列,而一个Term只有一个Leader能掌权,所以只要Term和Index相同,对应的Log中的客户命令就是相同的。
  • leader可以储存下在正常运行情况下leader认为的所有follower接收下一个log时应该存放的位置,即nextIndex[]。在AppendEntriesRPC时,除了新logEntries[]外,leader就可以根据nextIndex[]发送prevLogInedx intprevLogTerm int,表示leader认为的该follower现在最后一个log的Index,以及其对应的Term(完美条件下应当和leader的一致)。
  • 借助这些信息,follower就可以找出自己log和leader的log对应位置Term不一致的情况。如果出现不一致,follower就拒绝接受新的log,并删除冲突的部分(具体做法可以一个个删,也可以用特殊办法快速处理,见fast roll-back)。因此,由于log是从左到右append上去的,所以可以保证通过一致性检查的log和leader的log一致。

通过上述方法,当follower接收新log时,就基于leader提供的信息进行一致性检查。如果不通过,就删除冲突部分,并告诉leader左移它的nextIndex,然后再次重试,直到不一致全部消除,成功append为止。成功append之后,leader调整对应的matchIndex(记录该follower已经和leader确保一致的位置)。如果发现大多数节点的matchIndex都>=该log的index,则认为已经达到了系统的目标,称为commited,修改自己的commitIndex。之后leader的commitIndex会随心跳下传,已经取得该log的follower也随之更新commitIndex。与此同时,在ticker(或者其他频繁执行的检查)中,节点发现自己的LastApplied(表示上一个给状态机执行的log位置)比commitIndex低,即有新的log可以apply了,就通过指定的channel传递给状态机。

三、关于testcase与注意事项

本部分共有8个test,实现应当从最简单的同步开始,然后严格按照论文要求实现各种边界和控制机制。

各个test的内容可以参考下图:

测试的case具有大量的随机性,除了满足一次能通过,应当保证绝大多数情况(多于200次测试)下没有异常出现。

注意事项

  • Start()函数是外部传送命令调用的,只有leader才应理会其中的command,并且应该在添加到log后快速返回,交由之后的心跳去同步。
  • 在开始之前尽量把2A的问题解决,然后在2A的基础上对投票规则做修改。注意有些地方是不能重置发起选举周期的,比如老leader收到新的心跳后。
  • 具体的代码实现要严格参照Figure2,如果按自己想法很容易出现纰漏。论文里已经详细论述了Figure2的正确性。
  • 如果需要进行大量测试,可以用这个脚本:https://gist.github.com/jonhoo/f686cacb4b9fe716d5aa
  • 对于一些特殊情况或者具体实现细节有疑惑,可以去raft算法的网站 https://raft.github.io/ 使用其中的可视化+交互式展示,可以手动丢弃一些包或者停用一些节点模拟testcase,看看真正的raft是怎么处理的。
  • 不要忘记传递ApplyCh来表明commit log到状态机。
  • 注意论文中对log不匹配的处理是一次rpc回退一个,而testBackup要求快速回退,否则无法在时限内通过。因此必须设计快速回退机制,可以参考6.824的课程提纲笔记,其中有提到一种方案:https://pdos.csail.mit.edu/6.824/notes/l-raft2.txt 。对应的,由于时限被用来检测是否完成了这一机制,所以不要擅自修改心跳的间隔快过lab的要求。

四、代码实现

首先需要对之前的AppenEntry做一点修改,将一个goroutine发全部心跳改为一个peer一个心跳。这么做是因为对2B来说,每个peer的心跳对应的arg不同。

for i := 0; i < lenPeers; i++ {
			if i != rf.me {
				go rf.executeHeartBeat(args[i], i)
			}
		}

修改好以后,就可以从Start()函数开始,编写有关Log的内容。

外部需要传递命令给raft时,就会尝试调用节点的Start函数,但只有leader才应该处理用户的命令:

func (rf *Raft) Start(command interface{}) (int, int, bool) {
	index := -1
	term := -1
	isLeader := true

	// Your code here (2B).
	rf.mu.Lock()
	defer rf.mu.Unlock()
	index = len(rf.Log)
	term = rf.currentTerm
	isLeader = rf.State == 2
	if isLeader {
		rf.Log = append(rf.Log, LogEntry{index, term, command})
		rf.matchIndex[rf.me] = len(rf.Log) - 1
	}
	return index, term, isLeader
}

这里要求快速处理,所以这里只应当把新log保存到leader的log表中,同时更新matchIndex(leader的matchIndex直接和他所有的log对应)。

下面就是lab2B的重头戏了,leader根据储存的nextIndex信息为各个follower准备好同步的参数。

这里的设计以简单起见,每次心跳最多只发送一个Log。如果没有要添加的log,则设置Entries为nil:

func (rf *Raft) doHeartBeat() {
	rf.mu.Lock()

	for rf.State == 2 && rf.killed() == false {
		lenPeers := len(rf.peers)
		args := make([]AppendEntriesArg, lenPeers)
		for i := 0; i < lenPeers; i++ {
			if i != rf.me {
				if rf.Log[len(rf.Log)-1].Index >= rf.nextIndex[i] { //按论文检测是否发entry
					if rf.nextIndex[i] == 0 {
						rf.nextIndex[i] = 1
					}
					args[i] = AppendEntriesArg{
						Term: rf.currentTerm, LeaderId: rf.me, PrevLogIndex: rf.nextIndex[i] - 1,
						PrevLogTerm: rf.Log[rf.nextIndex[i]-1].Term, Entry: []LogEntry{rf.Log[rf.nextIndex[i]]},
						LeaderCommit: rf.commitIndex}
				} else {
					// 只是普通心跳,不需要发Entry
					args[i] = AppendEntriesArg{
						Term: rf.currentTerm, LeaderId: rf.me, PrevLogIndex: rf.nextIndex[i] - 1,
						PrevLogTerm: rf.Log[rf.nextIndex[i]-1].Term, Entry: nil,
						LeaderCommit: rf.commitIndex}
				}
			}
		}

		rf.mu.Unlock()
		for i := 0; i < lenPeers; i++ {
			if i != rf.me {
				go rf.executeHeartBeat(args[i], i)
			}
		}
		time.Sleep(time.Millisecond * 100)
		rf.mu.Lock()
	}
	rf.mu.Unlock()
}

这里暂时先跳过发送和处理appendEntryRPC回复的go程,先看看接收的handler:

func (rf *Raft) AppendEntriesHandler(args *AppendEntriesArg, reply *AppendEntriesReply) {
	//注意这个是handler
	rf.mu.Lock()
	defer rf.mu.Unlock()

	success := true

	// 收到来自有效leader的心跳
	if args.Term >= rf.currentTerm {
		rf.votedFor = -1
		//rf.lastHeartBeat = time.Now()
		randomTerm := 150 + rand.Intn(500)
		rf.nextActiveTime = time.Now().Add(time.Duration(randomTerm) * time.Millisecond)
		if rf.State != 0 {
			rf.State = 0
			rf.currentTerm = args.Term
			//println("因为收到心跳", rf.me, "从", rf.State, "转为了follower")
		}
	}

	if args.Term < rf.currentTerm {
		success = false
		reply.Success = success
		return
	} else {
		rf.currentTerm = args.Term
	}

	//Log冲突检查
	if args.PrevLogIndex >= 0 && (args.PrevLogIndex <= len(rf.Log)-1 && rf.Log[args.PrevLogIndex].Term != args.PrevLogTerm) {
		success = false
		//println("检测到log冲突:prevLogIndex:", args.PrevLogIndex, " prevLogTerm: ", args.PrevLogTerm, "实际term:", rf.Log[args.PrevLogIndex].Term)
	} else {
		if args.PrevLogIndex > len(rf.Log)-1 {
			success = false
		} else {
			if args.Entry != nil {
				rf.Log = append(rf.Log[0:args.PrevLogIndex+1], args.Entry...)
				//println(rf.me, "添加log成功,现在长度为", len(rf.Log))
			}
		}
	}

	if success && args.LeaderCommit > rf.commitIndex {
		if args.LeaderCommit <= len(rf.Log)-1 {
			rf.commitIndex = args.LeaderCommit
		} else {
			rf.commitIndex = len(rf.Log) - 1
		}
	}

	reply.Success = success
}

这是没有考虑快速回退的简化版本。首先要注意,log的复制逻辑不应当影响leader election的有关逻辑,非有效的心跳也不应当影响log复制逻辑。

收到有效心跳后,首先根据leader发来的信息执行一致性检查。论文中的两个要求要在这里得到满足:

Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)

If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it (§5.3)

第一条通过对 rf.Log[args.PrevLogIndex].Term != args.PrevLogTermargs.Term < rf.currentTerm的检测完成,第二条的实现则是通过rf.Log = append(rf.Log[0:args.PrevLogIndex+1], args.Entry...) 这样的方式来保证复制到的log的位置都只取决于leader发来的prevLogInedx,同时删掉之后的冲突log。

之后,根据论文要求,根据leaderCommit对commitIndex进行修改:

If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)

下面就是leader收到心跳回复后的处理方法:

func (rf *Raft) executeHeartBeat(arg AppendEntriesArg, server int) {
	reply := AppendEntriesReply{}
	ok := rf.sendAppendEntries(server, &arg, &reply)
	if !ok {
		//println("对", server, "的rpc失败")
		return
	}
	rf.mu.Lock()
	defer rf.mu.Unlock()
	if reply.Term > rf.currentTerm {
		rf.State = 0
		rf.currentTerm = reply.Term
		rf.votedFor = -1
		//println(rf.me, "发心跳时,收到来自更高term的消息,转follower")
		return
	}

	if reply.Success && arg.Entry != nil {
		rf.matchIndex[server] = arg.PrevLogIndex + 1
		rf.nextIndex[server] = rf.matchIndex[server] + 1
	}

	if reply.Success && arg.Entry == nil {
		rf.matchIndex[server] = arg.PrevLogIndex
	}

	if reply.Success {
		lenPeers := len(rf.peers)

		for i := rf.commitIndex + 1; i <= len(rf.Log)-1; i++ {
			n := 0
			for j := 0; j < lenPeers; j++ {
				if rf.matchIndex[j] >= i {
					n++
				}
			}
			if n > lenPeers/2 && rf.Log[i].Term == rf.currentTerm {
				rf.commitIndex = i
				//println("leader已commit一份log,现在commitIndex为", rf.commitIndex)
			}
		}
	}

	if !reply.Success {
		rf.nextIndex[server]--
    }
}

同样是不考虑快速回退的版本。首先注意到要先判断本次rpc发送是否成功,如果发送不成功那么不能做后续的处理,因为reply参数是无效的。由于test可能会对部分节点和请求进行断网,所以必须要手动判断rpc情况。

在leader收到回复后,就需要针对的调整相应nextIndex和matchIndex。如果回复是success == true,说明一致性检查是成功的。如果这时参数里entry!=nil,也就是说新logEntry已经被正确的传递,这时设置matchIndex为prevLogIndex+1,nextIndex为matchIndex+1。如果entry==nil,意味着一致性检查正确而没有增加新log,设置matchIndex = prevLogIndex即可。

如果收到的回复为true,则可以触发leader的commit检测。论文里是这么写的:

If there exists an N such that N > commitIndex, a majority of matchIndex[i] ≥ N, and log[N].term == currentTerm: set commitIndex = N (§5.3, §5.4)

这里的实现统计matchIndex中>N的服务器数量,如果大于中间值就可以设置leader的commitInedx为N。注意这里论文的表述为“大于commitIndex的任意N”,所以要遍历整个 len(rf.Log) > N > commitIndex,而不是找到一个就停止。

如果回复的success==false,意味着一致性检查失败。注意这里follower是不知道leader具体的log排布的,只能用rpc传递的参数信息对涉及的一个log做检测,所以需要leader主动降低nextIndex,在下次心跳时再试。这里暂时用一步一步退nextIndex的方法。

log传递到这里,已经可以做到在log传递到大部分服务器时修改commitIndex。那么就需要服务器依据另一个变量lastApplied和commitIndex对比,找出需要apply的部分并用channel发送。这个过程放在每5ms就触发一次的tiker里以便快速执行:

func (rf *Raft) ticker() {

	for rf.killed() == false {
		// Your code here to check if a leader election should
		// be started and to randomize sleeping time using
		// time.Sleep().
		rf.mu.Lock()
		if rf.State != 1 {
			rf.votesGot = 0
		}
		if rf.State != 2 {
			if time.Now().After(rf.nextActiveTime) {
				// 随机周期内没有收到心跳,转变为candidate
				randomTerm := 150 + rand.Intn(500)
				rf.nextActiveTime = time.Now().Add(time.Duration(randomTerm) * time.Millisecond)
				rf.State = 1
				//println(rf.me, "号没有收到心跳,发起了投票")
				rf.launchElection()
			}
		}

		for rf.lastApplied < rf.commitIndex && rf.commitIndex < len(rf.Log) {
			rf.lastApplied++
			//println(rf.me, "commit了log", rf.lastApplied)
			rf.applyCh <- ApplyMsg{
				true, rf.Log[rf.lastApplied].Command, rf.lastApplied,
				false, nil, 0, 0}
		}

		rf.mu.Unlock()
		time.Sleep(time.Millisecond * 5)
	}
}

到这里已经基本完成了raft关于log的功能。但是由于所有log以leader为准,为了尽力保证在leader交替时保留更多的Log,需要对投票的规则做修改,使得保留了更多log的服务器能够胜出:

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	// Your code here (2A, 2B).
	//注意这个是handler
	rf.mu.Lock()
	defer rf.mu.Unlock()

	if args.Term < rf.currentTerm || rf.votedFor != -1 || rf.State != 0 {
		reply.VoteGranted = false
	} else {
		reply.VoteGranted = true
		randomTerm := 150 + rand.Intn(500)
		rf.nextActiveTime = time.Now().Add(time.Duration(randomTerm) * time.Millisecond)
	}
	if args.Term > rf.currentTerm {
		rf.State = 0
		rf.currentTerm = args.Term
		reply.VoteGranted = true
	}

	lenLog := len(rf.Log)
	if rf.Log[lenLog-1].Term > args.LastLogTerm {
		reply.VoteGranted = false
	}
	if rf.Log[lenLog-1].Term == args.LastLogTerm && args.LastLogIndex < lenLog-1 {
		reply.VoteGranted = false
	}
	if reply.VoteGranted {
		rf.votedFor = args.CandidateId
	}
	reply.Term = rf.currentTerm
}

这里投票的规则要求candidate的log比自己的“更新”(定义见上文)。

截止这里,应该已经可以通过除了testBackup之外的全部测试了。

为什么不能通过testBackup呢?

testBackup的测试内容是短时间内发布大量命令,同时快速隔断部分leader,一段时间后又重连,就可能导致大量index一致而term不一致的冲突log。面对几十条冲突log,一次心跳一条慢慢的往回回调nextIndex显然太慢了。

虽然这种情况比较极端,一般不容易出现,但还是值得解决的问题。这里就需要用到lecture里提到的fast roll-back方法了。这种方法让follower在冲突时向心跳的回复里添加更多信息,帮助leader快速定位到冲突点。

在心跳回复的定义里添加下面的参数:

	XTerm   int // 快速回退用,term of conflicting entry
	XIndex  int // 快速回退时用,term是Xterm的最早的index
	XLen    int // 快速回退时传递日志长度

根据定义,在心跳的Handler传入参数:

if args.PrevLogIndex >= 0 && (args.PrevLogIndex <= len(rf.Log)-1 && rf.Log[args.PrevLogIndex].Term != args.PrevLogTerm) {
		success = false

		// lecture里的方法,针对log冲突快速回退
		reply.XTerm = rf.Log[args.PrevLogIndex].Term
		XIndex := -1
		for i := 0; i < len(rf.Log); i++ {
			if rf.Log[i].Term == reply.XTerm {
				XIndex = i
				break
			}
		}
		reply.XIndex = XIndex
		reply.XLen = len(rf.Log)

		//println("检测到log冲突:prevLogIndex:", args.PrevLogIndex, " prevLogTerm: ", args.PrevLogTerm, "实际term:", rf.Log[args.PrevLogIndex].Term)
	} else {

		if args.PrevLogIndex > len(rf.Log)-1 {
			reply.XTerm = -1
			reply.XIndex = -1
			reply.XLen = len(rf.Log)
			success = false
		} else {
			if args.Entry != nil {

				rf.Log = append(rf.Log[0:args.PrevLogIndex+1], args.Entry...)
				//println(rf.me, "添加log成功,现在长度为", len(rf.Log))
			}
		}
	}

	if success && args.LeaderCommit > rf.commitIndex {
		if args.LeaderCommit <= len(rf.Log)-1 {
			rf.commitIndex = args.LeaderCommit
		} else {
			rf.commitIndex = len(rf.Log) - 1
		}
	}

然后在executeHeartBeat go程中被处理:

if !reply.Success {
		hasXTerm := false
		lastIndexOfXTerm := -1
		for i := 0; i < len(rf.Log); i++ {
			if rf.Log[i].Term == reply.XTerm {
				hasXTerm = true
				lastIndexOfXTerm = i
			}
		}
		if reply.XTerm != -1 {
			if hasXTerm {
				rf.nextIndex[server] = lastIndexOfXTerm
			} else {
				rf.nextIndex[server] = reply.XIndex
			}
		} else {
			rf.nextIndex[server] = reply.XLen
		}
	}

这里分为三种case,详情参见lecture。主要原理是leader需要删除follower冲突的那个term里leader没有的log,所以可以让follower在心跳返回时传递该term的log的信息,便于leader一次性判断后删除。

至此,lab2B圆满完成。

代码链接:lab-6.824/src/answer at master · tksky1/lab-6.824 (github.com)

推荐参考链接

课程提纲,主要参考快速回退部分写法

参考快速回退部分:6.824 分布式系统④ 快速恢复/ 持久化) [MIT公开课] - 知乎 (zhihu.com)

Raft网站,可以可视化和互动操作了解算法细节:Raft Consensus Algorithm

好用的批量测试脚本:Script for running go tests many times in parallel, printing the current status, and logging errors (github.com)