|
|
@@ -57,10 +57,11 @@ impl<Command: ReplicableCommand> Raft<Command> {
|
|
|
move || {
|
|
|
log::info!("{:?} apply command daemon running ...", me);
|
|
|
|
|
|
+ let mut last_applied = 0;
|
|
|
while keep_running.load(Ordering::Relaxed) {
|
|
|
let messages = {
|
|
|
let mut rf = rf.lock();
|
|
|
- if rf.last_applied >= rf.commit_index {
|
|
|
+ if last_applied >= rf.commit_index {
|
|
|
// We have applied all committed log entries, wait until
|
|
|
// new log entries are committed.
|
|
|
condvar.wait_for(&mut rf, HEARTBEAT_INTERVAL);
|
|
|
@@ -69,17 +70,17 @@ impl<Command: ReplicableCommand> Raft<Command> {
|
|
|
// always smaller than or equal to commit index, as
|
|
|
// guaranteed by the SNAPSHOT_INDEX_INVARIANT.
|
|
|
assert!(rf.log.start() <= rf.commit_index);
|
|
|
- if rf.last_applied < rf.log.start() {
|
|
|
+ if last_applied < rf.log.start() {
|
|
|
let (index_term, data) = rf.log.snapshot();
|
|
|
let messages =
|
|
|
vec![ApplyCommandMessage::Snapshot(Snapshot {
|
|
|
last_included_index: index_term.index,
|
|
|
data: data.to_vec(),
|
|
|
})];
|
|
|
- rf.last_applied = rf.log.start();
|
|
|
+ last_applied = rf.log.start();
|
|
|
messages
|
|
|
- } else if rf.last_applied < rf.commit_index {
|
|
|
- let index = rf.last_applied + 1;
|
|
|
+ } else if last_applied < rf.commit_index {
|
|
|
+ let index = last_applied + 1;
|
|
|
let last_one = rf.commit_index + 1;
|
|
|
// This is safe because commit_index is always smaller
|
|
|
// than log.end(), see COMMIT_INDEX_INVARIANT.
|
|
|
@@ -95,7 +96,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
|
|
|
)
|
|
|
})
|
|
|
.collect();
|
|
|
- rf.last_applied = rf.commit_index;
|
|
|
+ last_applied = rf.commit_index;
|
|
|
messages
|
|
|
} else {
|
|
|
continue;
|