From cdecedcbfba06f8ba9563c557a7fd424cf47aead Mon Sep 17 00:00:00 2001 From: wego1236 <844740374@qq.com> Date: Sat, 16 Nov 2024 19:14:13 +0800 Subject: [PATCH] Added related tests Signed-off-by: wego1236 <844740374@qq.com> --- harness/tests/integration_cases/test_raft.rs | 105 ++++++++++++++----- 1 file changed, 80 insertions(+), 25 deletions(-) diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index a62266d2..103e9cdf 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -21,7 +21,7 @@ use std::panic::{self, AssertUnwindSafe}; use harness::*; use protobuf::Message as PbMessage; use raft::eraftpb::*; -use raft::storage::MemStorage; +use raft::storage::{GetEntriesContext, MemStorage}; use raft::*; use raft_proto::*; use slog::Logger; @@ -917,7 +917,7 @@ fn test_dueling_candidates() { // enough log. nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]); - let tests = [ + let tests = vec![ // role, term, committed, applied, last index. (StateRole::Follower, 2, (1, 0, 1)), (StateRole::Follower, 2, (1, 0, 1)), @@ -968,7 +968,7 @@ fn test_dueling_pre_candidates() { // With pre-vote, it does not disrupt the leader. nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]); - let tests = [ + let tests = vec![ // role, term, committed, applied, last index. (1, StateRole::Leader, 1, (1, 0, 1)), (2, StateRole::Follower, 1, (1, 0, 1)), @@ -1242,7 +1242,7 @@ fn test_commit() { #[test] fn test_pass_election_timeout() { let l = default_logger(); - let tests = [ + let tests = vec![ (5, 0f64, false), (10, 0.1, true), (13, 0.4, true), @@ -1728,7 +1728,7 @@ fn test_all_server_stepdown() { (StateRole::Leader, StateRole::Follower, 3, 1, 1), ]; - let tmsg_types = [MessageType::MsgRequestVote, MessageType::MsgAppend]; + let tmsg_types = vec![MessageType::MsgRequestVote, MessageType::MsgAppend]; let tterm = 3u64; for (i, (state, wstate, wterm, windex, entries)) in tests.drain(..).enumerate() { @@ -3162,7 +3162,7 @@ fn test_add_node() -> Result<()> { let mut r = new_test_raft(1, vec![1], 10, 1, new_storage(), &l); r.apply_conf_change(&add_node(2))?; assert_iter_eq!(o r.prs().conf().voters().ids(), - [1, 2] + vec![1, 2] ); Ok(()) @@ -3208,11 +3208,11 @@ fn test_remove_node() -> Result<()> { let l = default_logger(); let mut r = new_test_raft(1, vec![1, 2], 10, 1, new_storage(), &l); r.apply_conf_change(&remove_node(2))?; - assert_iter_eq!(o r.prs().conf().voters().ids(), [1]); + assert_iter_eq!(o r.prs().conf().voters().ids(), vec![1]); // Removing all voters is not allowed. assert!(r.apply_conf_change(&remove_node(1)).is_err()); - assert_iter_eq!(o r.prs().conf().voters().ids(), [1]); + assert_iter_eq!(o r.prs().conf().voters().ids(), vec![1]); Ok(()) } @@ -3223,8 +3223,8 @@ fn test_remove_node_itself() { let mut n1 = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage(), &l); assert!(n1.apply_conf_change(&remove_node(1)).is_err()); - assert_iter_eq!(n1.prs().conf().learners(), [2]); - assert_iter_eq!(o n1.prs().conf().voters().ids(), [1]); + assert_iter_eq!(n1.prs().conf().learners(), vec![2]); + assert_iter_eq!(o n1.prs().conf().voters().ids(), vec![1]); } #[test] @@ -3956,8 +3956,8 @@ fn test_restore_with_learner() { assert!(sm.restore(s.clone())); assert_eq!(sm.raft_log.last_index(), 11); assert_eq!(sm.raft_log.term(11).unwrap(), 11); - assert_iter_eq!(o sm.prs().conf().voters().ids(), [1, 2]); - assert_iter_eq!(sm.prs().conf().learners(), [3]); + assert_iter_eq!(o sm.prs().conf().voters().ids(), vec![1, 2]); + assert_iter_eq!(sm.prs().conf().learners(), vec![3]); let conf_state = s.get_metadata().get_conf_state(); for node in &conf_state.voters { @@ -3990,7 +3990,7 @@ fn test_restore_with_voters_outgoing() { ); assert_iter_eq!( o sm.prs().conf().voters().ids(), - [1, 2, 3, 4] + vec![1, 2, 3, 4] ); assert!(!sm.restore(s)); } @@ -4078,7 +4078,7 @@ fn test_add_learner() -> Result<()> { let mut n1 = new_test_raft(1, vec![1], 10, 1, new_storage(), &l); n1.apply_conf_change(&add_learner(2))?; - assert_iter_eq!(n1.prs().conf().learners(), [2]); + assert_iter_eq!(n1.prs().conf().learners(), vec![2]); assert!(n1.prs().conf().learners().contains(&2)); Ok(()) @@ -4091,12 +4091,12 @@ fn test_remove_learner() -> Result<()> { let l = default_logger(); let mut n1 = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage(), &l); n1.apply_conf_change(&remove_node(2))?; - assert_iter_eq!(o n1.prs().conf().voters().ids(), [1]); + assert_iter_eq!(o n1.prs().conf().voters().ids(), vec![1]); assert!(n1.prs().conf().learners().is_empty()); // Remove all voters are not allowed. assert!(n1.apply_conf_change(&remove_node(1)).is_err()); - assert_iter_eq!(o n1.prs().conf().voters().ids(), [1]); + assert_iter_eq!(o n1.prs().conf().voters().ids(), vec![1]); assert!(n1.prs().conf().learners().is_empty()); Ok(()) @@ -5286,7 +5286,7 @@ fn test_group_commit_consistent() { /// of the election with both priority and log. #[test] fn test_election_with_priority_log() { - let tests = [ + let tests = vec![ // log is up to date or not 1..3, priority 1..3, id, state (true, false, false, 3, 1, 1, 1, StateRole::Leader), (true, false, false, 2, 2, 2, 1, StateRole::Leader), @@ -5301,7 +5301,7 @@ fn test_election_with_priority_log() { (false, false, true, 1, 1, 3, 1, StateRole::Leader), ]; - for (l1, l2, l3, p1, p2, p3, id, state) in tests { + for (_i, &(l1, l2, l3, p1, p2, p3, id, state)) in tests.iter().enumerate() { let l = default_logger(); let mut n1 = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), &l); let mut n2 = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage(), &l); @@ -5349,7 +5349,7 @@ fn test_election_after_change_priority() { // check state assert_eq!(network.peers[&1].state, StateRole::Follower, "peer 1 state"); - let tests = [ + let tests = vec![ (1, 1, StateRole::Follower), //id, priority, state (1, 2, StateRole::Leader), (1, 3, StateRole::Leader), @@ -5427,16 +5427,16 @@ fn test_uncommitted_entries_size_limit() { nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); // should return ok - nt.dispatch([msg.clone()]).unwrap(); + nt.dispatch(vec![msg.clone()].to_vec()).unwrap(); // then next proposal should be dropped - let result = nt.dispatch([msg]); + let result = nt.dispatch(vec![msg].to_vec()); assert_eq!(result.unwrap_err(), raft::Error::ProposalDropped); // but entry with empty size should be accepted let entry = Entry::default(); let empty_msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]); - nt.dispatch([empty_msg]).unwrap(); + nt.dispatch(vec![empty_msg].to_vec()).unwrap(); // after reduce, new proposal should be accepted let mut entry = Entry::default(); @@ -5453,18 +5453,18 @@ fn test_uncommitted_entries_size_limit() { let mut entry = Entry::default(); entry.data = (b"hello world and raft" as &'static [u8]).into(); let long_msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]); - nt.dispatch([long_msg]).unwrap(); + nt.dispatch(vec![long_msg].to_vec()).unwrap(); // but another huge one will be dropped let mut entry = Entry::default(); entry.data = (b"hello world and raft" as &'static [u8]).into(); let long_msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]); - nt.dispatch([long_msg]).unwrap_err(); + nt.dispatch(vec![long_msg].to_vec()).unwrap_err(); // entry with empty size should still be accepted let entry = Entry::default(); let empty_msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]); - nt.dispatch([empty_msg]).unwrap(); + nt.dispatch(vec![empty_msg].to_vec()).unwrap(); } #[test] @@ -5851,3 +5851,58 @@ fn test_switching_check_quorum() { } assert_eq!(sm.state, StateRole::Leader); } + +fn expect_one_message(r: &mut Interface) -> Message { + let msgs = r.read_messages(); + assert_eq!(msgs.len(), 1, "expect one message"); + msgs[0].clone() +} + +#[test] +fn test_log_replication_with_reordered_message() { + let l = default_logger(); + let mut r1 = new_test_raft(1, vec![1, 2], 10, 1, new_storage(), &l); + r1.become_candidate(); + r1.become_leader(); + r1.read_messages(); + r1.mut_prs().get_mut(2).unwrap().become_replicate(); + + + let mut r2 = new_test_raft(2, vec![1, 2], 10, 1, new_storage(), &l); + + // r1 sends 2 MsgApp messages to r2. + let _ = r1.append_entry(&mut [new_entry(0, 0, SOME_DATA)]); + r1.send_append(2); + let req1 = expect_one_message(&mut r1); + let _ = r1.append_entry(&mut [new_entry(0, 0, SOME_DATA)]); + r1.send_append(2); + let req2 = expect_one_message(&mut r1); + + // r2 receives the second MsgApp first due to reordering. + let _ = r2.step(req2); + let resp2 = expect_one_message(&mut r2); + // r2 rejects req2 + assert_eq!(resp2.reject, true); + assert_eq!(resp2.reject_hint, 0); + assert_eq!(resp2.index, 2); + + // r2 handles the first MsgApp and responses to r1. + // And r1 updates match index accordingly. + let _ = r2.step(req1); + let m = expect_one_message(&mut r2); + assert_eq!(m.reject, false); + assert_eq!(m.index, 2); + let _ = r1.step(m); + let _ = expect_one_message(&mut r1); + assert_eq!(r1.prs().get(2).unwrap().matched, 2); + + // r1 observes a transient network issue to r2, hence transits to probe state. + let _ = r1.step(new_message(2, 1, MessageType::MsgUnreachable, 0)); + assert_eq!(r1.prs().get(2).unwrap().state, ProgressState::Probe); + + // now r1 receives the delayed resp2. + let _ = r1.step(resp2); + let m = expect_one_message(&mut r1); + // r1 shall re-send MsgApp from match index even if resp2's reject hint is less than matching index. + assert_eq!(r1.prs().get(2).unwrap().matched, m.index) +} \ No newline at end of file