Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
Signed-off-by: CalvinNeo <[email protected]>
  • Loading branch information
CalvinNeo committed Apr 17, 2024
1 parent 79da4a9 commit 088581c
Show file tree
Hide file tree
Showing 7 changed files with 345 additions and 1 deletion.
17 changes: 17 additions & 0 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2547,12 +2547,15 @@ where
return Ok(());
}

debug!("!!!!!! handle 3");

if msg.get_is_tombstone() {
// we receive a message tells us to remove ourself.
self.handle_gc_peer_msg(&msg);
return Ok(());
}

debug!("!!!!!! handle 4");
if msg.has_merge_target() {
fail_point!("on_has_merge_target", |_| Ok(()));
if self.need_gc_merge(&msg)? {
Expand All @@ -2561,17 +2564,20 @@ where
return Ok(());
}

debug!("!!!!!! handle 5");
if self.check_msg(&msg) {
return Ok(());
}

debug!("!!!!!! handle 6");
if msg.has_extra_msg() {
self.on_extra_message(msg);
return Ok(());
}

let is_snapshot = msg.get_message().has_snapshot();

debug!("!!!!!! handle 7");
// TODO: spin off the I/O code (delete_snapshot)
let regions_to_destroy = match self.check_snapshot(&msg)? {
Either::Left(key) => {
Expand All @@ -2588,6 +2594,7 @@ where
Either::Right(v) => v,
};

debug!("!!!!!! handle 8");
if util::is_vote_msg(msg.get_message()) || msg_type == MessageType::MsgTimeoutNow {
if self.fsm.hibernate_state.group_state() != GroupState::Chaos {
self.fsm.reset_hibernate_state(GroupState::Chaos);
Expand All @@ -2600,25 +2607,35 @@ where
let from_peer_id = msg.get_from_peer().get_id();
self.fsm.peer.insert_peer_cache(msg.take_from_peer());

debug!("!!!!!! handle 8.1");
let result = if msg_type == MessageType::MsgTransferLeader {
self.on_transfer_leader_msg(msg.get_message(), peer_disk_usage);
Ok(())
} else {
debug!("!!!!!! handle 8.2");
// This can be a message that sent when it's still a follower. Nevertheleast,
// it's meaningless to continue to handle the request as callbacks are cleared.
if msg.get_message().get_msg_type() == MessageType::MsgReadIndex
&& self.fsm.peer.is_leader()
&& (msg.get_message().get_from() == raft::INVALID_ID
|| msg.get_message().get_from() == self.fsm.peer_id())
{
debug!(
"!!!!!! handle 8.2.2 {} {} {}",
self.fsm.peer.is_leader(),
msg.get_message().get_from(),
msg.get_message().get_from()
);
self.ctx.raft_metrics.message_dropped.stale_msg.inc();
return Ok(());
}
debug!("!!!!!! handle 8.3");
self.fsm.peer.step(self.ctx, msg.take_message())
};

stepped.set(result.is_ok());

debug!("!!!!!! handle 9");
if is_snapshot {
if !self.fsm.peer.has_pending_snapshot() {
// This snapshot is rejected by raft-rs.
Expand Down
2 changes: 2 additions & 0 deletions components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1704,6 +1704,7 @@ where
"msg_size" => msg.get_message().compute_size(),
"to" => to_peer_id,
"disk_usage" => ?msg.get_disk_usage(),
"!!!!msg" => ?msg
);

for (term, index) in msg
Expand Down Expand Up @@ -1776,6 +1777,7 @@ where
ctx: &mut PollContext<EK, ER, T>,
mut m: eraftpb::Message,
) -> Result<()> {
info!("!!!!!! raft step {:?}", m);
fail_point!(
"step_message_3_1",
self.peer.get_store_id() == 3 && self.region_id == 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,10 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
}
}

pub fn get_router(&self, node_id: u64) -> Option<RaftRouter<TiFlashEngine, ProxyRaftEngine>> {
self.sim.rl().get_router(node_id)
}

fn valid_leader_id(&self, region_id: u64, leader_id: u64) -> bool {
let store_ids = match self.voter_store_ids_of_region(region_id) {
None => return false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ unsafe extern "C" fn ffi_release_pre_handled_snapshot(
pub fn gen_engine_store_server_helper(
wrap: Pin<&EngineStoreServerWrap>,
) -> EngineStoreServerHelper {
info!("mock gen_engine_store_server_helper");
EngineStoreServerHelper {
magic_number: interfaces_ffi::RAFT_STORE_PROXY_MAGIC_NUMBER,
version: interfaces_ffi::RAFT_STORE_PROXY_VERSION,
Expand Down
3 changes: 2 additions & 1 deletion proxy_components/proxy_ffi/src/read_index_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ fn into_read_index_response<S: engine_traits::Snapshot>(
resp
}

fn gen_read_index_raft_cmd_req(req: &mut ReadIndexRequest) -> RaftCmdRequest {
pub fn gen_read_index_raft_cmd_req(req: &mut ReadIndexRequest) -> RaftCmdRequest {
let region_id = req.get_context().get_region_id();
let mut cmd = RaftCmdRequest::default();
{
Expand All @@ -91,6 +91,7 @@ fn gen_read_index_raft_cmd_req(req: &mut ReadIndexRequest) -> RaftCmdRequest {
inner_req.set_cmd_type(CmdType::ReadIndex);
inner_req.mut_read_index().set_start_ts(req.get_start_ts());
if !req.get_ranges().is_empty() {
tikv_util::info!("!!!!!! not empty");
let r = &mut req.mut_ranges()[0];
let mut range = kvproto::kvrpcpb::KeyRange::default();
range.set_start_key(r.take_start_key());
Expand Down
Loading

0 comments on commit 088581c

Please sign in to comment.