rf.mu.Lock() flag := (rf.state == Leader) args := AppendEntriesArgs{Term: rf.currentTerm, LeaderId: rf.me} rf.mu.Unlock() if !flag { return }
2A时就尽可能实现更多功能,而不是仅仅通过测试,中文版论文用来大致了解raft算法,对照英文版论文编写代码。 问题: RequestVote RPC中 at least as up-to-date对应于:
Raft determines which of two logs is more up-to-date by comparing the index and term of the last entries in the logs. If the logs have last entries with different terms, then the log with the later term is more up-to-date. If the logs end with the same term, then whichever log is longer is more up-to-date.
由于是at least as,还包括相等的情况。
votedFor:candidateId that received vote in current term (or null if none) 当前任期接受到的选票的候选者 ID(初值为 null) 这意味着每当term改变,votedFor都需要重置。
If election timeout elapses without receiving AppendEntries RPC from current leader or granting vote to candidate: convert to candidate 如果选举定时器超时时,没有收到 leader 的的追加日志请求 或者没有投票给候选者,该机器转化为候选者。 注意不要忽略granting vote to candidate这一条 参考: MIT 6.824 Spring 2020 Lab2 Raft 实现笔记
log
2B阶段实现起来没用多久,但一直在调试。发现vscode自带的调试功能挺好用的(go test插件?) 一些细节: currentTerm: latest term server has seen (initialized to 0 on first boot, increases monotonically) 当前看到的最新任期,所以每当看到更大的任期,都需设置为该任期 If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower votedFor:candidateId that received vote in current term (or null if none) 当前任期接收投票的候选者ID,所以任期改变时,votedFor也需要重置 **log[]**:log entries; each entry contains command for state machine, and term when entry was received by leader (first index is 1) 日志记录,起始索引为1,可以在初始化时加入term为0的假日志,便于程序统一处理(lastLogTerm prevLogTerm) commitIndex:将被提交的日志记录索引,可以通过matchIndex拷贝后排序来得到新的commitIndex AppendEntries RPC: rule3:只有现有条目与entries中的条目才需要删除其后所有日志,如果不冲突则不需要删除,注意比较时取二者索引较小值,以免数组越界 rule 5:index of last new entry而不是本地日志最大索引值,prevLogIndex+len(entries) 注意不需要对空日志进行特殊处理 RequestVote RPC: rule2两个条件:
偶发性问题: 在明显的问题解决完后后,运行一次go test -run 2B -race显示全部通过,但执行脚本运行100次有时候却出现了几次错误。这种一定几率出现的问题就比较麻烦了。
1
for i in {1..100}; do go test -run 2B -race; done
1 2 3 4 5 6
Test (2B): concurrent Start()s ... --- FAIL: TestConcurrentStarts2B (31.27s) config.go:549: only 1 decided for index 6; wanted 3 Test (2B): RPC counts aren't too high ... --- FAIL: TestCount2B (32.34s) config.go:549: only 2 decided for index 11; wanted 3
不过我另辟蹊径,懒得debug,直接把lab2前面建议的一些文章认真看了一遍(之前看太长就只是粗略地看了一遍) Students’ Guide to Raft Raft Q&A Debugging by Pretty Printing Instructors’ Guide to Raft 发现我遇到的一些问题都在Students’ Guide to Raft提到了,不过我也对这些点记忆深刻了。 Students’ Guide to Raft记录 1 论文中的表述大多时候是must而不是should,例如不是说每当server接收RPC调用时都应该重置选举定时器,而是在receiving AppendEntries RPC from current leader or granting vote to candidate时候才重置选举定时器。 2 心跳信息不应该被视作特殊的一种消息,当follower接收了该心跳消息,则隐式地表明当前日志已于该server匹配,而后进行错误的提交。 3 当接收心跳消息后,简单地切断server日志prevLogIndex的部分,这同样是不对的,论文中的表述为If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it,以上为一个条件语句。 4 当选举定时器过期后,即使上一任选举尚未完成,也应该进行下次的选举。 5 reply false意味着立即返回 6 AppendEntries处理过程中即使prevLogIndex对应位置条目不存在,同样视作日志不匹配 7 检查AppendEntries函数,即使entries为空也同样能执行成功,同样考虑本地日志越界的情况 8 认真理解last new entry与at least as up-to-date的含义 9 注意更新commitIndex时log[N].term == currentTerm 10 nextIndex是乐观估计,matchIndex是悲观估计,即使在很多时候赋值都为nextIndex = matchIndex + 1 11 在接收的旧的RPC回复时,比较当前term与arg中term,如果不一样,则直接返回,不进行处理
Goroutine 277 (running) created at: 6.824/labrpc.(*Network).processReq() /root/mit6.824/6.824/src/labrpc/labrpc.go:239 +0x174
Goroutine 323 (running) created at: 6.824/raft.(*Raft).agreementTask() /root/mit6.824/6.824/src/raft/raft.go:376 +0xa3 ================== --- FAIL: TestBackup2B (16.85s) testing.go:853: race detected during execution of test
The accelerated log backtracking optimization is very underspecified, probably because the authors do not see it as being necessary for most deployments. It is not clear from the text exactly how the conflicting index and term sent back from the client should be used by the leader to determine what nextIndex to use. We believe the protocol the authors probably want you to follow is: If a follower does not have prevLogIndex in its log, it should return with conflictIndex = len(log) and conflictTerm = None. If a follower does have prevLogIndex in its log, but the term does not match, it should return conflictTerm = log[prevLogIndex].Term, and then search its log for the first index whose entry has term equal to conflictTerm. Upon receiving a conflict response, the leader should first search its log for conflictTerm. If it finds an entry in its log with that term, it should set nextIndex to be the one beyond the index of the last entry in that term in its log. If it does not find an entry with that term, it should set nextIndex = conflictIndex.
the one beyond the index不怎么能理解,就直接设置成最后一个等于该term的索引。next index只影响效率,不怎么影响正确性。如果2B没啥问题,2C也应该没啥问题,如果2C有问题,就应该运行几千次2B测试,验证一下。
Test (2C): basic persistence ... ... Passed -- 4.1 3 216 46172 6 Test (2C): more persistence ... 2022/10/30 06:44:20 next index retry. cur term:5 me:3 server:1 value:2 reply:term:-1 index:2 2022/10/30 06:44:20 next index retry. cur term:5 me:3 server:2 value:2 reply:term:-1 index:2 2022/10/30 06:44:22 next index retry. cur term:9 me:1 server:4 value:5 reply:term:-1 index:5 2022/10/30 06:44:22 next index retry. cur term:9 me:1 server:0 value:5 reply:term:-1 index:5 2022/10/30 06:44:24 next index retry. cur term:13 me:4 server:3 value:8 reply:term:-1 index:8 2022/10/30 06:44:24 next index retry. cur term:13 me:4 server:2 value:8 reply:term:-1 index:8 2022/10/30 06:44:29 next index retry. cur term:16 me:2 server:1 value:11 reply:term:-1 index:11 2022/10/30 06:44:29 next index retry. cur term:16 me:2 server:0 value:11 reply:term:-1 index:11 2022/10/30 06:44:31 next index retry. cur term:20 me:0 server:4 value:14 reply:term:-1 index:14 2022/10/30 06:44:31 next index retry. cur term:20 me:0 server:3 value:14 reply:term:-1 index:14 ... Passed -- 16.0 5 982 207944 16 Test (2C): partitioned leader and one follower crash, leader restarts ... 2022/10/30 06:44:35 next index retry. cur term:3 me:2 server:1 value:2 reply:term:-1 index:2 ... Passed -- 1.9 3 51 12120 4 Test (2C): Figure 8 ... 2022/10/30 06:44:37 next index retry. cur term:4 me:0 server:1 value:3 reply:term:-1 index:3 2022/10/30 06:44:41 next index retry. cur term:11 me:2 server:1 value:9 reply:term:-1 index:9 2022/10/30 06:44:41 next index retry. cur term:12 me:1 server:4 value:4 reply:term:-1 index:4 2022/10/30 06:44:41 next index retry. cur term:12 me:1 server:3 value:5 reply:term:-1 index:5 2022/10/30 06:44:43 next index retry. cur term:13 me:3 server:2 value:9 reply:term:8 index:9 2022/10/30 06:44:43 next index retry. cur term:14 me:4 server:1 value:10 reply:term:-1 index:10 2022/10/30 06:44:44 next index retry. cur term:15 me:2 server:3 value:11 reply:term:-1 index:11 2022/10/30 06:44:45 next index retry. cur term:16 me:3 server:0 value:10 reply:term:-1 index:10 2022/10/30 06:44:45 next index retry. cur term:16 me:3 server:0 value:9 reply:term:8 index:9 2022/10/30 06:44:46 next index retry. cur term:17 me:0 server:2 value:13 reply:term:-1 index:13 2022/10/30 06:44:47 next index retry. cur term:19 me:1 server:4 value:12 reply:term:-1 index:12 2022/10/30 06:44:48 next index retry. cur term:20 me:2 server:3 value:14 reply:term:-1 index:14 2022/10/30 06:44:49 next index retry. cur term:22 me:4 server:0 value:16 reply:term:-1 index:16 2022/10/30 06:44:50 next index retry. cur term:24 me:3 server:2 value:19 reply:term:-1 index:19 2022/10/30 06:44:52 next index retry. cur term:27 me:3 server:0 value:23 reply:term:-1 index:23 2022/10/30 06:44:52 next index retry. cur term:27 me:3 server:2 value:23 reply:term:-1 index:23 2022/10/30 06:44:53 next index retry. cur term:29 me:3 server:2 value:25 reply:term:-1 index:25 2022/10/30 06:44:53 next index retry. cur term:29 me:3 server:0 value:25 reply:term:-1 index:25 2022/10/30 06:44:54 next index retry. cur term:30 me:0 server:1 value:17 reply:term:-1 index:17 2022/10/30 06:44:54 next index retry. cur term:31 me:2 server:4 value:21 reply:term:-1 index:21 2022/10/30 06:44:55 next index retry. cur term:34 me:4 server:2 value:29 reply:term:-1 index:29 2022/10/30 06:44:55 next index retry. cur term:34 me:4 server:1 value:29 reply:term:-1 index:29 2022/10/30 06:44:57 next index retry. cur term:36 me:2 server:0 value:28 reply:term:-1 index:28 2022/10/30 06:44:58 next index retry. cur term:37 me:1 server:3 value:27 reply:term:-1 index:27 2022/10/30 06:44:58 next index retry. cur term:39 me:3 server:0 value:33 reply:term:-1 index:33 2022/10/30 06:44:58 next index retry. cur term:39 me:3 server:4 value:32 reply:term:-1 index:32 2022/10/30 06:44:59 next index retry. cur term:40 me:4 server:2 value:33 reply:term:-1 index:33 2022/10/30 06:45:00 next index retry. cur term:41 me:2 server:1 value:34 reply:term:-1 index:34 2022/10/30 06:45:01 next index retry. cur term:42 me:0 server:3 value:35 reply:term:-1 index:35 2022/10/30 06:45:02 next index retry. cur term:43 me:1 server:4 value:35 reply:term:40 index:35 2022/10/30 06:45:02 next index retry. cur term:43 me:1 server:3 value:35 reply:term:-1 index:35 race: limit on 8128 simultaneously alive goroutines is exceeded, dying
// // the tester doesn't halt goroutines created by Raft after each test, // but it does call the Kill() method. your code can use killed() to // check whether Kill() has been called. the use of atomic avoids the // need for a lock. // // the issue is that long-running goroutines use memory and may chew // up CPU time, perhaps causing later tests to fail and generating // confusing debug output. any goroutine with a long-running loop // should call killed() to check whether it should stop. // func(rf *Raft) Kill() { atomic.StoreInt32(&rf.dead, 1) // Your code here, if desired. }
在AppendEntriesRPC处理重传时,find same term的范围应该大于lastIncludedIndex
在同一个协程中apply快照与日志
第一条很容易理解,无论如何,日志都应该存在一条lastIncludeIndex日志,故在InstallSnapshot的step7中虽然写的是discard the entire log,也要加上一条快照对应的lastIncludeIndex日志,Snapshot函数中,少删除一条日志,保证0位为lastIncludeIndex日志。
第二条 leader拥有所有已提交的日志,对于已提交日志相同索引的日志内容一致,所以没必要检查term(可用于AppendEntries中step2 preLogIndex 匹配与step3 different term),要时时刻刻想到这一点。
Leader Completeness: if a log entry is committed in a given term, then that entry will be present in the logs of the leaders for all higher-numbered terms.
Previously, this lab recommended that you implement a function called CondInstallSnapshot to avoid the requirement that snapshots and log entries sent on applyCh are coordinated. This vestigal API interface remains, but you are discouraged from implementing it: instead, we suggest that you simply have it return true.
for i := 0; i < iters; i++ { victim := (leader1 + 1) % servers sender := leader1 if i%3 == 1 { sender = (leader1 + 1) % servers victim = leader1 } DPrintf("%v time,vicitm is %v\n", i, victim) if disconnect { cfg.disconnect(victim) cfg.one(rand.Int(), servers-1, true) } if crash { cfg.crash1(victim) DPrintf("%v crash", victim) cfg.one(rand.Int(), servers-1, true) }
// perhaps send enough to get a snapshot nn := (SnapShotInterval / 2) + (rand.Int() % SnapShotInterval) for i := 0; i < nn; i++ { cfg.rafts[sender].Start(rand.Int()) }
// let applier threads catch up with the Start()'s if disconnect == false && crash == false { // make sure all followers have caught up, so that // an InstallSnapshot RPC isn't required for // TestSnapshotBasic2D(). cfg.one(rand.Int(), servers, true) } else { cfg.one(rand.Int(), servers-1, true) }
if cfg.LogSize() >= MAXLOGSIZE { cfg.t.Fatalf("Log size too large") } if disconnect { // reconnect a follower, who maybe behind and // needs to rceive a snapshot to catch up. cfg.connect(victim) cfg.one(rand.Int(), servers, true) leader1 = cfg.checkOneLeader() } if crash { cfg.start1(victim, cfg.applierSnap) DPrintf("%v restart\n", victim) cfg.connect(victim) cfg.one(rand.Int(), servers, true) leader1 = cfg.checkOneLeader() } } cfg.end() }
// 生成快照 if (m.CommandIndex+1)%SnapShotInterval == 0 { w := new(bytes.Buffer) e := labgob.NewEncoder(w) e.Encode(m.CommandIndex) var xlog []interface{} for j := 0; j <= m.CommandIndex; j++ { xlog = append(xlog, cfg.logs[i][j]) } e.Encode(xlog) rf.Snapshot(m.CommandIndex, w.Bytes()) } // 读取快照 if cfg.saved[i] != nil { cfg.saved[i] = cfg.saved[i].Copy()
snapshot := cfg.saved[i].ReadSnapshot() if snapshot != nil && len(snapshot) > 0 { // mimic KV server and process snapshot now. // ideally Raft should send it up on applyCh... err := cfg.ingestSnap(i, snapshot, -1) if err != "" { cfg.t.Fatal(err) } } }
func(cfg *config) ingestSnap(i int, snapshot []byte, index int) string { if snapshot == nil { log.Fatalf("nil snapshot") return"nil snapshot" } r := bytes.NewBuffer(snapshot) d := labgob.NewDecoder(r) var lastIncludedIndex int var xlog []interface{} if d.Decode(&lastIncludedIndex) != nil || d.Decode(&xlog) != nil { log.Panic() log.Fatalf("snapshot decode error") return"snapshot Decode() error" } if index != -1 && index != lastIncludedIndex { err := fmt.Sprintf("server %v snapshot doesn't match m.SnapshotIndex", i) return err } cfg.logs[i] = map[int]interface{}{} for j := 0; j < len(xlog); j++ { cfg.logs[i][j] = xlog[j] } cfg.lastApplied[i] = lastIncludedIndex return"" }
Raft must discard old log entries in a way that allows the Go garbage collector to free and re-use the memory; this requires that there be no reachable references (pointers) to the discarded log entries.
If, when the server comes back up, it reads the updated snapshot, but the outdated log, it may end up applying some log entries that are already contained within the snapshot. This happens since the commitIndex and lastApplied are not persisted, and so Raft doesn’t know that those log entries have already been applied. The fix for this is to introduce a piece of persistent state to Raft that records what “real” index the first entry in Raft’s persisted log corresponds to. This can then be compared to the loaded snapshot’s lastIncludedIndex to determine what elements at the head of the log to discard.
func(rf *Raft) compareStateAndSnapshot() { if rf.snapshot == nil || len(rf.snapshot) < 1 { return } r := bytes.NewBuffer(rf.snapshot) d := labgob.NewDecoder(r) var lastIncludedIndex int if d.Decode(&lastIncludedIndex) != nil { log.Panicf("compareStateAndSnapshot: snapshot decode error") } if rf.lastIncludedIndex != lastIncludedIndex { WPrintf("snapshot lastIncludedIndex is different. snapshot:%v state:%v\n", lastIncludedIndex, rf.lastIncludedIndex) rf.log = append([]logEntry{}, rf.log[lastIncludedIndex-rf.lastIncludedIndex:]...) rf.lastIncludedIndex = lastIncludedIndex rf.lastIncludedTerm = rf.log[0].Term // 日志0位的term rf.commitIndex = rf.lastIncludedIndex } }
没啥用,WPrintf语句没打印过
由于3B在快照中编码了其他数据,也就不再做这个比较了 四:不知道有没有用的修改 在2D测试跑通后跑全部的测试,其中几次在2D的各个测试出现了panic: test timed out after 10m0s问题,看了看打印的goroutine堆栈,发现许多goroutine都在获取锁,并且apply协程阻塞在通道发送那一步(在当时实现中此时该线程持有锁),觉得有可能是service没有接收ApplyMsg,导致server一直无法推进,故修改提交协程实现,在发送消息时不持有锁。
Rule 1: Whenever you have data that more than one goroutine uses, and at least one goroutine might modify the data, the goroutines should use locks to prevent simultaneous use of the data.
Rule 2: Whenever code makes a sequence of modifications to shared data, and other goroutines might malfunction if they looked at the data midway through the sequence, you should use a lock around the whole sequence. Rule 3: Whenever code does a sequence of reads of shared data (orreads and writes), and would malfunction if another goroutine modified the data midway through the sequence, you should use a lock around the whole sequence. Rule 4: It’s usually a bad idea to hold a lock while doing anything that might wait: reading a Go channel, sending on a channel, waiting for a timer, calling time.Sleep(), or sending an RPC (and waiting for the reply). Rule 5: Be careful about assumptions across a drop and re-acquire of a lock. One place this can arise is when avoiding waiting with locks held.
commitIndex is volatile because Raft can figure out a correct value for it after a reboot using just the persistent state. Once a leader successfully gets a new log entry committed, it knows everything before that point is also committed. A follower that crashes and comes back up will be told about the right commitIndex whenever the current leader sends it an AppendEntries RPC.
lastApplied starts at zero after a reboot because the Figure 2 design assumes the service (e.g., a key/value database) doesn’t keep any persistent state. Thus its state needs to be completely recreated by replaying all log entries. If the service does keep persistent state, it is expected to persistently remember how far in the log it has executed, and to ignore entries before that point. Either way it’s safe to start with lastApplied = 0 after a reboot.
Instead, the best approach is usually to work backwards and narrow down the size of phase 2 until it is as small as possible, so that the location of the fault is readily apparent. This is done by expanding the instrumentation of your code to surface errors sooner, and thereby spend less time in phase 2. This generally involves adding additional debugging statements and/or assertions to your code.
When possible, consider writing your code to “fail loudly”. Instead of trying to tolerate unexpected states, try to explicitly detect states that should never be allowed to happen, and immediately report these errors. Consider even immediately calling the Go ‘panic’ function in these cases to fail especially loudly. See also the Wikipedia page on Offensive programming techniques. Remember that the longer you allow errors to remain latent, the longer it will take to narrow down the true underlying fault.
When you’re failing a test, and it’s not obvious why, it’s usually worth taking the time to understand what the test is actually doing, and which part of the test is observing the problem. It can be helpful to add print statements to the test code so that you know when events are happening.
// 测试函数 协商某项日志 // do a complete agreement. // it might choose the wrong leader initially, // and have to re-submit after giving up. // entirely gives up after about 10 seconds. // indirectly checks that the servers agree on the // same value, since nCommitted() checks this, // as do the threads that read from applyCh. // returns index. // if retry==true, may submit the command multiple // times, in case a leader fails just after Start(). // if retry==false, calls Start() only once, in order // to simplify the early Lab 2B tests. func(cfg *config) one(cmd interface{}, expectedServers int, retry bool) int { t0 := time.Now() starts := 0 for time.Since(t0).Seconds() < 10 && cfg.checkFinished() == false { // try all the servers, maybe one is the leader. index := -1 for si := 0; si < cfg.n; si++ { starts = (starts + 1) % cfg.n var rf *Raft cfg.mu.Lock() if cfg.connected[starts] { rf = cfg.rafts[starts] } cfg.mu.Unlock() if rf != nil { index1, _, ok := rf.Start(cmd) if ok { index = index1 break } } } if index != -1 { // somebody claimed to be the leader and to have // submitted our command; wait a while for agreement. t1 := time.Now() for time.Since(t1).Seconds() < 2 { nd, cmd1 := cfg.nCommitted(index) if nd > 0 && nd >= expectedServers { // committed if cmd1 == cmd { // and it was the command we submitted. return index } } time.Sleep(20 * time.Millisecond) } if retry == false { cfg.t.Fatalf("one(%v) failed to reach agreement", cmd) } } else { time.Sleep(50 * time.Millisecond) } }
if cfg.checkFinished() == false { for si := 0; si < cfg.n; si++ { starts = (starts + 1) % cfg.n var rf *Raft cfg.mu.Lock() if cfg.connected[starts] { rf = cfg.rafts[starts] } cfg.mu.Unlock() if rf != nil { rf.mu.Lock() DPrintf("%+v", rf) rf.mu.Unlock() } } log.Panicf("one(%v) failed to reach agreement", cmd) cfg.t.Fatalf("one(%v) failed to reach agreement", cmd) } return-1 }
// create concurrent clients cfn := func(me int, ch chan []int) { var ret []int ret = nil deferfunc() { ch <- ret }() values := []int{} for atomic.LoadInt32(&stop) == 0 { x := rand.Int() index := -1 ok := false for i := 0; i < servers; i++ { // try them all, maybe one of them is a leader cfg.mu.Lock() rf := cfg.rafts[i] cfg.mu.Unlock() if rf != nil { index1, _, ok1 := rf.Start(x) if ok1 { ok = ok1 index = index1 } } } if ok { // maybe leader will commit our value, maybe not. // but don't wait forever. for _, to := range []int{10, 20, 50, 100, 200} { nd, cmd := cfg.nCommitted(index) if nd > 0 { if xx, ok := cmd.(int); ok { if xx == x { values = append(values, x) } } else { cfg.t.Fatalf("wrong command type") } break } time.Sleep(time.Duration(to) * time.Millisecond) } } else { time.Sleep(time.Duration(79+me*17) * time.Millisecond) } } ret = values }
ncli := 3 cha := []chan []int{} for i := 0; i < ncli; i++ { cha = append(cha, make(chan []int)) go cfn(i, cha[i]) }
for iters := 0; iters < 20; iters++ { if (rand.Int() % 1000) < 200 { i := rand.Int() % servers cfg.disconnect(i) }
if (rand.Int() % 1000) < 500 { i := rand.Int() % servers if cfg.rafts[i] == nil { cfg.start1(i, cfg.applier) } cfg.connect(i) }
if (rand.Int() % 1000) < 200 { i := rand.Int() % servers if cfg.rafts[i] != nil { cfg.crash1(i) } }
// Make crash/restart infrequent enough that the peers can often // keep up, but not so infrequent that everything has settled // down from one change to the next. Pick a value smaller than // the election timeout, but not hugely smaller. time.Sleep((RaftElectionTimeout * 7) / 10) }
time.Sleep(RaftElectionTimeout) cfg.setunreliable(false) for i := 0; i < servers; i++ { if cfg.rafts[i] == nil { cfg.start1(i, cfg.applier) } cfg.connect(i) }
atomic.StoreInt32(&stop, 1)
values := []int{} for i := 0; i < ncli; i++ { vv := <-cha[i] if vv == nil { t.Fatal("client failed") } values = append(values, vv...) }
// 重新计算提交索引 func(rf *Raft) changeCommitIndex() { // leader rule 4 matchIndex := append([]int{}, rf.matchIndex...) // 深拷贝 sort.Ints(matchIndex) // If there exists an N such that N > commitIndex, a majority // of matchIndex[i] ≥ N, and log[N].term == currentTerm: // set commitIndex = N for i := matchIndex[rf.peerNumber-rf.majorityPeer]; i > rf.commitIndex; i-- { if rf.log[i-rf.lastIncludedIndex].Term == rf.currentTerm { // DPrintf("%d change commindex from %d to %d\n", rf.me, rf.commitIndex, i) rf.commitIndex = i return } } }
// If there exists an N such that N > commitIndex, a majority // of matchIndex[i] ≥ N, and log[N].term == currentTerm: // set commitIndex = N for i := matchIndex[rf.peerNumber-rf.majorityPeer]; i > rf.commitIndex; i-- { if rf.log[i-rf.lastIncludedIndex].Term == rf.currentTerm { // DPrintf("%d change commindex from %d to %d\n", rf.me, rf.commitIndex, i) rf.commitIndex = i return } } }
type ApplyMsg struct { CommandValid bool Command interface{} CommandIndex int
// For 2D: SnapshotValid bool Snapshot []byte SnapshotTerm int SnapshotIndex int }
type logEntry struct { Command interface{} Term int }
type Raft struct { mu sync.Mutex // Lock to protect shared access to this peer's state peers []*labrpc.ClientEnd // RPC end points of all peers persister *Persister // Object to hold this peer's persisted state me int// this peer's index into peers[] dead int32// set by Kill()
currentTerm int votedFor int// 当前任期接受到的选票的候选者ID log []logEntry
commitIndex int lastApplied int nextIndex []int matchIndex []int
if !matchResult { // AppendEntries rule 2 reply.Success = false // If a follower does not have prevLogIndex in its log, it should return with conflictIndex = len(log) and conflictTerm = None. // If a follower does have prevLogIndex in its log, but the term does not match, it should return conflictTerm = log[prevLogIndex].Term, and then search its log for the first index whose entry has term equal to conflictTerm. if maxLocalLogIndex < args.PrevLogIndex { reply.ConflictIndex = rf.lastIncludedIndex + len(rf.log) // conflictIndex = len(log) reply.ConflictTerm = NoneTerm } else { // next index相关,不应该涉及lastIncludedTerm reply.ConflictTerm = rf.log[args.PrevLogIndex-rf.lastIncludedIndex].Term reply.ConflictIndex = rf.commitIndex + 1// ConflictIndex应大于commitIndex for i := args.PrevLogIndex; i > rf.commitIndex+1; i-- { if rf.log[i-1-rf.lastIncludedIndex].Term != reply.ConflictTerm { // 上一个entry term不匹配,该位置即为第一个为该term的entry reply.ConflictIndex = i break } } } DPrintf("prelog term doesn't match. server id:%v commit index:%d maxLocalLogIndex:%v args.PrevLogIndex:%v rf.lastIncludedIndex:%v ConflictTerm:%v ConflictIndex:%v \n", rf.me, rf.commitIndex, maxLocalLogIndex, args.PrevLogIndex, rf.lastIncludedIndex, reply.ConflictTerm, reply.ConflictIndex) if needPersist { rf.persist() } return } // DPrintf("%d preIndex %d entry size:%d\n", rf.me, args.PrevLogIndex, len(args.Entries)) // 找到第一个term不同的索引 lastNewEntryIndex := args.PrevLogIndex + len(args.Entries) minLogIndex := Max(rf.commitIndex+1, args.PrevLogIndex+1) // 已提交日志的term一定一致,不用检查 maxLogIndex := Min(lastNewEntryIndex, maxLocalLogIndex) // 取本地日志与消息日志最大索引的较小值 diffItemIndex := maxLogIndex + 1 for i := minLogIndex; i <= maxLogIndex; i++ { if rf.log[i-rf.lastIncludedIndex].Term != args.Entries[i-args.PrevLogIndex-1].Term { diffItemIndex = i break } } // AppendEntries rule 3 if diffItemIndex != maxLogIndex+1 { // 只有存在不一致日志时才delete rf.log = rf.log[:diffItemIndex-rf.lastIncludedIndex] needPersist = true }
// If there exists an N such that N > commitIndex, a majority // of matchIndex[i] ≥ N, and log[N].term == currentTerm: // set commitIndex = N for i := matchIndex[rf.peerNumber-rf.majorityPeer]; i > rf.commitIndex; i-- { if rf.log[i-rf.lastIncludedIndex].Term == rf.currentTerm { // DPrintf("%d change commindex from %d to %d\n", rf.me, rf.commitIndex, i) rf.commitIndex = i return } } }
// 进行一次日志协商 func(rf *Raft) agreementTask() { for i := 0; i < rf.peerNumber; i++ { if i == rf.me { continue } gofunc(server int) { rf.mu.Lock() defer rf.mu.Unlock() for !rf.killed() { if rf.state != Leader { return } if rf.nextIndex[server] <= rf.lastIncludedIndex { // 所需日志包含在快照中 if rf.snapshot == nil || len(rf.snapshot) < 1 { log.Panicf("agreementTask: wrong snapshot.\n") } DPrintf("%v send snapshot to %v LastIncludedInde:%v\n", rf.me, server, rf.lastIncludedIndex) args := InstallSnapshotArgs{Term: rf.currentTerm, LeaderId: rf.me, LastIncludedIndex: rf.lastIncludedIndex, LastIncludedTerm: rf.lastIncludedTerm, Data: rf.snapshot} reply := InstallSnapshotReply{} res := rf.sendInstallSnapshot(server, &args, &reply) // 在函数中解锁加锁 if !res { // DPrintf("%v send InstallSnapshot to %v failed\n", server, rf.me) return }
if rf.state != Leader { return }
if args.Term != rf.currentTerm { // old rpc reply DPrintf("old InstallSnapshot rpc reply\n") return }
if rf.currentTerm < reply.Term { // set currentTerm = T, convert to follower rf.currentTerm = reply.Term rf.votedFor = InvalidId rf.state = Follower rf.persist() return } // Upon receiving a conflict response, the leader should first search its log for conflictTerm. If it finds an entry in its log with that term, it should set nextIndex to be the one beyond the index of the last entry in that term in its log. // If it does not find an entry with that term, it should set nextIndex = conflictIndex. if reply.ConflictTerm == NoneTerm { rf.nextIndex[server] = reply.ConflictIndex } else { rf.nextIndex[server] = reply.ConflictIndex for i := args.PrevLogIndex; i > rf.lastIncludedIndex; i-- { if rf.log[i-rf.lastIncludedIndex].Term == reply.ConflictTerm { rf.nextIndex[server] = i break } } } if rf.nextIndex[server] < 1 { log.Panicf("next index set wrong value.") } DPrintf("next index retry. cur term:%v leader:%d server:%d commit index:%d next index:%d\n", rf.currentTerm, rf.me, server, rf.commitIndex, rf.nextIndex[server]) } }
}(i) } }
// 向各节点发送心跳消息 func(rf *Raft) heartBeatTask() { for !rf.killed() { rf.mu.Lock() flag := (rf.state == Leader) rf.mu.Unlock() if !flag { return } go rf.agreementTask() time.Sleep(HeartBeatInterval) } }
// 向各节点发送投票信息 func(rf *Raft) sendVoteTask(args RequestVoteArgs) { ticket := 1 for i := 0; i < rf.peerNumber; i++ { if i == rf.me { continue } gofunc(server int) { reply := RequestVoteReply{} res := rf.sendRequestVote(server, &args, &reply) if !res { // DPrintf("sendRequestVote failed server:%d me:%d\n", server, rf.me) return } rf.mu.Lock() defer rf.mu.Unlock() if rf.state != Candidate { // 若当前状态不是候选者则直接返回 return }
if args.Term != rf.currentTerm { DPrintf("old RequestVote reply\n") return }
if reply.VoteGranted { ticket++ if ticket >= rf.majorityPeer { // 得到超过半数投票,转变为leader,并开启心跳任务 rf.state = Leader rf.leaderId = rf.me // 初始化Leader专属变量 rf.nextIndex = make([]int, rf.peerNumber) nextIndex := rf.lastIncludedIndex + len(rf.log) for i := 0; i < rf.peerNumber; i++ { rf.nextIndex[i] = nextIndex } rf.matchIndex = make([]int, rf.peerNumber) DPrintf("%d become leader. commit index:%d next index:%d\n", rf.me, rf.commitIndex, rf.nextIndex[0]) go rf.heartBeatTask() } } elseif rf.currentTerm < reply.Term { // 返回的任期大于本身任期,转变为Follower,更新任期 rf.currentTerm = reply.Term rf.state = Follower rf.votedFor = InvalidId rf.persist() } }(i) } }
func(rf *Raft) ticker() { for !rf.killed() { // Your code here to check if a leader election should // be started and to randomize sleeping time using // time.Sleep(). sleepTime := time.Duration(rand.Intn(300)+300) * time.Millisecond // 300~600ms time.Sleep(sleepTime) rf.mu.Lock() // 读取状态前加锁 // && time.Since(rf.lastInstallSnapshot) > sleepTime 论文中没写,就不加上去了 if rf.state != Leader && time.Since(rf.lastHeartBeat) > sleepTime && time.Since(rf.lastVote) > sleepTime { rf.currentTerm++ rf.votedFor = rf.me rf.lastHeartBeat = time.Now() rf.state = Candidate lastLogIndex := rf.lastIncludedIndex + len(rf.log) - 1 lastLogTerm := rf.log[lastLogIndex-rf.lastIncludedIndex].Term args := RequestVoteArgs{Term: rf.currentTerm, CandidateId: rf.me, LastLogIndex: lastLogIndex, LastLogTerm: lastLogTerm} // DPrintf("%d join vote term:%d\n", rf.me, rf.currentTerm) rf.persist() go rf.sendVoteTask(args) } rf.mu.Unlock() } }