Skip to content

Commit

Permalink
raft: next index shall be larger than match index (#557)
Browse files Browse the repository at this point in the history
* Fixed a case in progress.go maybe_decr_to that could cause the next value to be less than or equal to the match value in the probe state

Close #555.

Signed-off-by: wego1236 <[email protected]>
  • Loading branch information
wego1236 authored Dec 6, 2024
1 parent 63aec46 commit 2fbeee5
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 2 deletions.
53 changes: 53 additions & 0 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5851,3 +5851,56 @@ fn test_switching_check_quorum() {
}
assert_eq!(sm.state, StateRole::Leader);
}

fn expect_one_message(r: &mut Interface) -> Message {
let msgs = r.read_messages();
assert_eq!(msgs.len(), 1, "expect one message");
msgs[0].clone()
}

#[test]
fn test_log_replication_with_reordered_message() {
let l = default_logger();
let mut r1 = new_test_raft(1, vec![1, 2], 10, 1, new_storage(), &l);
r1.become_candidate();
r1.become_leader();
r1.read_messages();
r1.mut_prs().get_mut(2).unwrap().become_replicate();

let mut r2 = new_test_raft(2, vec![1, 2], 10, 1, new_storage(), &l);

// r1 sends 2 MsgApp messages to r2.
let _ = r1.append_entry(&mut [new_entry(0, 0, SOME_DATA)]);
r1.send_append(2);
let req1 = expect_one_message(&mut r1);
let _ = r1.append_entry(&mut [new_entry(0, 0, SOME_DATA)]);
r1.send_append(2);
let req2 = expect_one_message(&mut r1);

// r2 receives the second MsgApp first due to reordering.
let _ = r2.step(req2);
let resp2 = expect_one_message(&mut r2);
// r2 rejects req2
assert!(resp2.reject);
assert_eq!(resp2.reject_hint, 0);
assert_eq!(resp2.index, 2);

// r2 handles the first MsgApp and responses to r1.
// And r1 updates match index accordingly.
let _ = r2.step(req1);
let m = expect_one_message(&mut r2);
assert!(!m.reject);
assert_eq!(m.index, 2);
let _ = r1.step(m);
assert_eq!(r1.prs().get(2).unwrap().matched, 2);

// r1 observes a transient network issue to r2, hence transits to probe state.
let _ = r1.step(new_message(2, 1, MessageType::MsgUnreachable, 0));
assert_eq!(r1.prs().get(2).unwrap().state, ProgressState::Probe);

// now r1 receives the delayed resp2.
let _ = r1.step(resp2);
let m = expect_one_message(&mut r1);
// r1 shall re-send MsgApp from match index even if resp2's reject hint is less than matching index.
assert_eq!(r1.prs().get(2).unwrap().matched, m.index)
}
4 changes: 2 additions & 2 deletions src/tracker/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ impl Progress {
// Do not decrease next index if it's requesting snapshot.
if request_snapshot == INVALID_INDEX {
self.next_idx = cmp::min(rejected, match_hint + 1);
if self.next_idx < 1 {
self.next_idx = 1;
if self.next_idx < self.matched + 1 {
self.next_idx = self.matched + 1;
}
} else if self.pending_request_snapshot == INVALID_INDEX {
// Allow requesting snapshot even if it's not Replicate.
Expand Down

0 comments on commit 2fbeee5

Please sign in to comment.