Raft Lite: An Easy-to-understand Implementation of Raft Consensus Algorithm

2024-01-02
23 min read

Raft Lite is a simple and easy-to-understand implementation of the Raft consensus algorithm. In this post, I will explain the internal implementation of Raft Lite. Please find the complete implementation of Raft Lite in the Raft Lite repository if you are interested.

Consensus algorithm and total order broadcast

State machine replication (SMR) is a fundamental problem in distributed systems. The problem is as follows: suppose we have a set of nodes, each of which maintains a state machine. The state machine can be any arbitrary state machine, such as a key-value pair storage system, a database, or a file system. The state machine can transition to the next state by executing an instruction. The problem is how we can ensure that all nodes execute the same sequence of instructions, so that all nodes can reach the same state.

Consensus algorithms are designed to solve the SMR problem. In a consensus algorithm, each node can propose a value, with only one value being ultimately chosen. Through multiple iterations of this process, the nodes can reach a consensus on a sequence of values. The sequence of values can be used as the sequence of instructions to be executed by the state machine.

A formally equivalent form of the consensus algorithm is total order broadcast, which I believe is easier to understand for most developers. It functions as a library, just like a network protocol does. The following figure shows the idea. We assume that the network is point-to-point, and messages may encounter loss, disorder, or delays. Such a network could be built using TCP, UDP, or any other suitable protocol.

Figure from Distributed System Course.

Although you can use raw TCP connections to achieve broadcast, the total order broadcast library gives stronger guarantees. The library ensures that the arrival order of messages is consistent across all nodes. For example, if node A and node B broadcasts messages m1 and m2 respectively, then the arrival order of m1 and m2 on one node must be the same as the arrival order of m1 and m2 on any other node.

Using the total order broadcast library, we can solve the SMR problem as follows:

  1. Each node maintains a state machine.

  2. Whenever the application layer wants to execute an instruction, it broadcasts the instruction to all nodes using the total order broadcast library.

  3. When a node receives an instruction, it executes the instruction and updates its state machine.

That is to say, the total order broadcast library ensures that all nodes deliver the same sequence of messages. Therefore, all nodes can execute the same sequence of instructions, and reach the same state.

Please note that “deliver” is different from “receive”. The total order broadcast library might receive a message, but not deliver it to the application layer immediately. The library will wait until all the messages before it are delivered to the application layer.

While Raft is explained in the form of consensus algorithm in the original paper, I’ll explain and implement Raft Lite in the form of total order broadcast, which presumably facilitates a more intuitive understanding.

API of Raft Lite

As mentioned above, Raft Lite is a total order broadcast library. The API of Raft Lite is as follows:

raft := new_raft_instance(id, peers);
(broadcast_channel, delivery_channel) := raft.run();

And you can use the channels to build a state machine:

on request to execute an instruction do
  send the instruction to broadcast_channel
end on

on receiving an instruction from delivery_channel do
  execute the instruction on the state machine
end on

Internal overview of Raft Lite

While Raft paper is much clearer than Paxos paper, translating it into usable code remains challenging. The complexity arises due to the paper’s heavy reliance on natural language rather than pseudocode to articulate the algorithm. Furthermore, apart from the AppendEntries and RequestVote RPCs, algorithmic descriptions are dispersed across different sections and figures in the paper. A reader needs to combine those algorithm snippets carefully to get a complete Raft algorithm.

Therefore, Raft Lite uses a event loop to describe the Raft algorithm, since the event loop is a more concrete and intuitive way to describe the algorithm. The events include:

enum Event {
    ElectionTimeout,
    ReplicationTimeout,
    VoteRequest,
    VoteResponse,
    LogRequest,
    LogResponse,
    Broadcast,
}

And the event loop looks like this:

loop {
    let event = wait_for_event();
    switch event {
        case ElectionTimeout:
            start_election();
        case ReplicationTimeout:
            handle_replicate_log();
        case VoteRequest:
            handle_vote_request();
        case VoteResponse:
            handle_vote_response();
        case LogRequest:
            handle_log_request();
        case LogResponse:
            handle_log_response();
        case Broadcast:
            handle_broadcast();
    }
}

In the following, I will give pseudo-code for Raft’s processing of each event, as well as explanations of these pseudo-codes. The rust implementation of Raft Lite is also given. If you want to see the complete implementation, please refer to the Raft Lite repository.

Most of the pseudocode for Raft Lite comes from my master’s supervisor Martin Kleppmann’s Distributed Systems Course . I modified it slightly to make it easier to understand and implement.

The basic idea of Raft

The basic idea of Raft is as follows:

  1. A leader is elected among all nodes. The leader is responsible for receiving broadcast requests from all the nodes, and sorting them into a log.

  2. The leader sends the log to all nodes. The nodes will deliver the log entries to application in the same order.

If we can find an election algorithm that guarantees there is only one leader from the beginning to the end of the system, then the implementation of Raft will be very simple - just the description in the previous paragraph will suffice.

However, a leader may fail or be disconnected from the network. In this case, a new leader must be elected. Therefore, the election algorithm cannot guarantee a unique leader throughout the system’s operation. Raft’s election algorithm offers a weaker assurance: at any given time, there exists at most one leader, or possibly none.

“At any given time” is the key phrase here. In distributed systems, we usually do not use physical clocks to describe time given their tendency to drift. Instead, logical clocks are used. Raft uses a logical clock known as “term” represented as an integer where higher values denote later time. The Raft election algorithm can guarantee that there exists at most one leader in any term.

Even if we’ve found an election algorithm that guarantees at most one leader in any term, the log replication process won’t be as straightforward as outlined earlier. This is because the transition from one leader to another may result in different delivery orders of log entries on different nodes.

For example, suppose node A is the leader in term 1, and node B is the leader in term 2. Let’s assume node A broadcasts message m1 in term 1, all the nodes except node B receive and deliver m1. Then node B becomes the leader in term 2, and broadcasts message m2. In this case, the node B will deliver m2 without delivering m1, while all other nodes has already delivered m1 before m2.

To solve this problem, Raft’s log replication algorithm guarantees that the log entry is delivered only if it is acknowledged by a majority of the nodes, and only the node receiving the majority of the votes can become the leader. A node can vote another one only if the log of the candidate is more up-to-date than its own log. Therefore, the intersection of the two majorities guarantees that the a node must deliver the most up-to-date log entries before it become a leader. Don’t worry if you don’t understand this paragraph, we’ll give a detailed explanation later.

In the following sections, we’ll explain how Raft implements the election algorithm and log replication algorithm.

Initialization

on initialisation do 
    currentTerm := 0; 
    votedFor := null;
    currentRole := follower; 
    currentLeader := null;
    votesReceived := {}; 
    
    log := []; 
    commitLength := 0
    sentLength := []; 
    ackedLength := [];
end on

Lines 2 to 6 are related to elections, while lines 8 to 11 are related to message broadcasting.

These variables consistently maintain the following semantics throughout the algorithm:

  1. log: all log entries of the node. These log entries may or may not have been delivered to the application layer. Log entries across different nodes might vary temporarily, but they will eventually converge to the same state.
  2. commitLength: log[0..commitLength - 1] are the message that has been delivered to the application layer, and log[commitLength..] are the message yet to be delivered to the application layer. It’s important to note that uncommitted log entries might be removed, whereas committed entries are retained permanently.
  3. ackedLength: node k acknowledge the log[0..ackedLength[k]-1] has been received.
  4. sentLength: the leader assumes that log[0..sentLength[k] - 1] of node k matches the leader’s log entries.
  5. current_term: the logical clock of the node.
  6. votes_received: votes received during the ongoing term.

Rust implementation in Raft Lite:

struct NodeState {
    id: u64, // id of the node
    current_term: u64, // 0
    voted_for: Option<u64>, // None
    current_role: Role, // Follower
    current_leader: Option<u64>, // None
    votes_received: HashSet<u64>, // {}
  
  	log: Vec<LogEntry>, // []
    commit_length: u64, // 0
    sent_length: Vec<u64>, // []
    acked_length: Vec<u64>, // []
}

ElectionTimeout

Each node has a election timer, which is reset when receving the heartbeat from its leader.

When the Election Timer expires, the follower will transition to the role of “candidate”. Following this transition, it will proceed to send voting requests to all nodes.

on election timeout do 
    currentTerm := currentTerm + 1;  // start election in a new term, since we need to guarantee that at most one leader exists in a term
    if currentRole == Leader then
        return; // do nothing if the node is already a leader
    end if
    currentRole := candidate; // become a candidate (not yet a leader)
    votedFor := nodeId; // vote for self
    votesReceived := {nodeId};  // currently, the node have one vote (the node itself)
    
    lastTerm := 0  // this is a temporary value, denoting the term of the last log entry. we will explain it later
    if log.length > 0 then 
        lastTerm := log[log.length − 1].term; 
    end if 
    
    msg := (VoteRequest, nodeId , currentTerm , log.length, lastTerm) // send vote request to all nodes
    for each node in nodes: 
        send msg to node
    start election timer // reset election timer, if the node does not receive a majority of votes within the election timeout, it will start a new election
end on

Rust implmenetation in Raft Lite:

fn start_election(&mut self) {
    let state = &mut self.state;
    if state.current_role == Role::Leader {
        return;
    }
    let id = state.id;
    state.current_term += 1;
    state.voted_for = Some(id);
    state.current_role = Role::Candidate;
    state.votes_received.clear();
    state.votes_received.insert(id);

    let mut last_term = 0;
    if state.log.len() > 0 {
        last_term = state.log.last().unwrap().term;
    }

    let msg = VoteRequestArgs {
        cid: id,
        cterm: state.current_term,
        clog_length: state.log.len() as u64,
        clog_term: last_term,
    };
    for i in 0..self.config.peers.len() {
        if i == id as usize {
            continue;
        }
        self.runner.vote_request(i as u64, msg.clone());
    }
}

VoteRequest

When node A receives a voting request from node B, it will perform the following steps:

  1. Check if the term of B is greater than or equal the current term of A. If not, A will reject the voting request, since voting for B might result in multiple leaders in B’s term.
  2. Check if the log of B is more or equal up-to-date than the log of A. If not, A will reject the voting request, since voting for B might result in log entries being lost.
  3. Check if A has already voted for another candidate in the current term. If so, A will reject the voting request, since voting for B might result in multiple leaders in the current term.
on receiving (VoteRequest,cId,cTerm,cLogLength,cLogTerm) at node nodeId do
    if cTerm > currentTerm then 
    // If the term of the candidate is greater than the current term of the node, then the node should update its current term to the term of the candidate, and become a follower. This is because:
    // 1. If the current node is a follower: it doesn't make sense to stay in the current term, since the leader may crash or disconnect.
    // 2. If the current node is a leader: it might be disconnected from the network or crashed for a while. In this case, the current node should step down and become a follower.
        currentTerm := cTerm; 
        currentRole := follower;
        votedFor := null
    end if
    
    lastTerm := 0 // get the term of the last log entry
    if log.length > 0 then 
        lastTerm := log[log.length − 1].term; 
    end if r
    
    logOk := (cLogTerm > lastTerm) or (cLogTerm == lastTerm and cLogLength >= log.length) 
    // check if the log of the candidate is more up-to-date than the log of the node. logOk means the log of the candidate is more up-to-date than the log of the current node.
    
    if cTerm == currentTerm and logOk and votedFor in {cId , null} then 
    // 1. If the term of the candidate is less than the current term of the node, then the node should reject the vote request.
    // 2. If the log of the candidate is not more up-to-date than the log of the node, then the node should reject the vote request.
    // 3. If the node has already voted for another candidate in the current term, then the node should reject the vote request. 
        votedFor := cId
        send (VoteResponse, nodeId, currentTerm, true) to node cId
    else
        send (VoteResponse, nodeId, currentTerm, false) to node cId 
    // false means the node does not vote for the candidate, and the node will inform the candidate its current term. This is because the candidate may have a smaller term, and the node should make the candidate to update its term.
    end if 
end on

Rust implementation in Raft Lite:

fn handle_vote_request(&mut self, args: VoteRequestArgs) {
    let state = &mut self.state;
    if args.cterm > state.current_term {
        state.current_term = args.cterm;
        state.current_role = Role::Follower;
        state.voted_for = None;
    }

    let mut last_term = 0;
    if state.log.len() > 0 {
        last_term = state.log.last().unwrap().term;
    }

    let log_ok = args.clog_term > last_term
        || (args.clog_term == last_term && args.clog_length >= state.log.len() as u64);

    let mut granted = false;
    if args.cterm == state.current_term
        && log_ok
        && (state.voted_for.is_none() || state.voted_for.unwrap() == args.cid)
    {
        state.voted_for = Some(args.cid);
        granted = true;
    }

    let msg = VoteResponseArgs {
        voter_id: state.id,
        term: state.current_term,
        granted,
    };
    self.runner.vote_response(args.cid, msg);
}

VoteResponse

Upon receiving voting responses, a node should check whether it has received a majority of votes. If so, it should transition to the role of leader. Otherwise, it should remain a candidate.

on receiving (VoteResponse, voterId, term, granted) at nodeId do
    if currentRole == candidate and term == currentTerm and granted then
        votesReceived.append(voterId) 
    // If the node is a candidate, and the term of the vote response is the same as the current term of the node, and the vote response is granted, then the node should add the voter to the list of votes received.
    if |votesReceived| >= upper_bound[(|nodes| + 1)/2] then // if the node receives a majority of votes, then the node becomes a leader
        currentRole := leader; 
        currentLeader := nodeId  // the node becomes the leader of itself
        for each follower in nodes \ {nodeId} do // for each follower, the node should send a heartbeat message to it
            sentLength[follower] := log.length // the node assumes that the log of the follower is the same as its own log
            ackedLength[follower] := 0 // the node does not receive any ack from the follower
            ReplicateLog(nodeId,follower)
        end for 
    end if
    else if term > currentTerm then 
    // However, if the term of the vote response is greater than the current term of the node, then the node should update its current term to the term of the vote response, and become a follower. This is because the current node is already out of date, and it should step down and become a follower to avoid multiple leaders in the current term.
        currentTerm := term 
        currentRole := follower 
        votedFor := null
    reset election timer
    end if 

end on

Rust implementation in Raft Lite:

fn handle_vote_response(&mut self, args: VoteResponseArgs) {
    let state = &mut self.state;
    if state.current_role == Role::Candidate && args.term == state.current_term && args.granted
    {
        state.votes_received.insert(args.voter_id);
        if state.votes_received.len() >= ((self.config.peers.len() + 1) + 1) / 2 {
            state.current_role = Role::Leader;
            state.current_leader = Some(state.id);
            for i in 0..self.config.peers.len() {
                if i == state.id as usize {
                    continue;
                }
                state.sent_length[i] = state.log.len() as u64;
                state.acked_length[i] = 0;
            }
            self.handle_replicate_log();
        }
    } else if args.term > state.current_term {
        state.current_term = args.term;
        state.current_role = Role::Follower;
        state.voted_for = None;
        self.runner.election_timer_reset();
    }
}

Broadcast

When the application layer triggers a broadcast, the leader will append the broadcast message to its log, and send the log entry to all followers. If the current node is not a leader, it will forward the broadcast message to the leader.

on request to broadcast msg at node nodeId do 
    if currentRole = leader then // if the node is a leader, then it can directly append the message to its log
        append the record (msg : msg, term : currentTerm) to log 
        ackedLength[nodeId] := log.length // the node is synchronized with itself
        for each follower in nodes \ {nodeId} do // synchronize the log with all followers
            ReplicateLog(nodeId,follower) 
        end for
    else if currentLeader != null then // if the node is not a leader, but it follows a leader, then it should forward the request to the leader
        forward the request to currentLeader via a FIFO link     
    else
        buffer the message until currentLeader != null // if the node is not a leader, and it does not follow a leader, then it should buffer the message until it follows a leader
    end if
end on

Rust implementation in Raft Lite: please note that the buffer mechanism is handled by the runner, and is not shown here.

fn handle_broadcast(&mut self, payload: Vec<u8>) {
    let state = &mut self.state;
    if state.current_role == Role::Leader {
        let entry = Arc::new(LogEntry {
            term: state.current_term,
            payload,
        });
        state.log.push(entry.clone());
        let id = state.id as usize;
        state.acked_length[id] = state.log.len() as u64;
        self.handle_replicate_log();
    } else {
        self.runner.forward_broadcast(BroadcastArgs { payload });
    }
}

ReplicationTimeout

When the replication timer expires, the leader will synchronize its log with all followers. The synchronization message also serves as a heartbeat message.

on replication timeout at node nodeId do 
  if currentRole = leader then
    for each follower in nodes \ {nodeId} do 
      ReplicateLog(nodeId,follower)
    end for 
  end if
end do

Rust implementation in Raft Lite:

fn handle_replicate_log(&mut self) {
    let state = &self.state;
    let runner = &mut self.runner;
    if state.current_role != Role::Leader {
        return;
    }
    let id = state.id;

    for i in 0..self.config.peers.len() {
        if i == id as usize {
            continue;
        }
        Self::replicate_log(state, id, i as u64, runner);
    }
}

ReplicateLog

ReplicateLog is a helper function that synchronizes the log of the leader with a follower.

The simplest way to synchronize the log is to send the entire log to the follower. However, this is inefficient. As mentioned earlier, the leader assumes that the log of the follower is the same as its own log when it becomes a leader. Therefore, the leader only needs to send the log entries that the follower does not have.

sentLength[follower] := log.length // the node assumes that the log of the follower is the same as its own log

The leader maintains a variable sentLength for each follower. sentLength[follower] denotes the length of the log that the leader believes the follower has. When the leader synchronizes the logs with the follower, it will send the log entries after sentLength[follower]. If the synchronization is failed, the leader will decrease sentLength[follower] by 1, and try again.

function ReplicateLog(leaderId, followerId) 
    prefixLen := sentLength[followerId] // we call the log entries that the leader believes are already replicated on the follower as prefix
    suffix := [log[prefixLen], log[prefixLen + 1], ..., log[log.length − 1]] // only send the suffix of the log to the follower
    
    prefixTerm := 0 // prefixTerm is the term of the last log entry in the prefix. We will explain it later. 
    if prefixLen > 0 then
        prefixTerm := log[prefixLen − 1].term
    end if
    
    send (LogRequest,leaderId,currentTerm,prefixLen,prefixTerm,commitLength,suffix) to followerId 
end function

Rust implementation in Raft Lite:

fn replicate_log(state: &NodeState, leader_id: u64, follower_id: u64, runner: &mut T) {
    let prefix_len = state.sent_length[follower_id as usize];
    let suffix = state.log[prefix_len as usize..].to_vec();
    let mut prefix_term = 0;
    if prefix_len > 0 {
        prefix_term = state.log[prefix_len as usize - 1].term;
    }
    let msg = LogRequestArgs {
        leader_id,
        term: state.current_term,
        prefix_len,
        prefix_term,
        leader_commit: state.commit_length,
        suffix,
    };
    runner.log_request(follower_id, msg);
}

LogRequest

When a follower receives a synchronization message from the leader, it will perform the following steps:

  1. The follower will check whether the log is consistent with the log entries that the leader believes the follower has. If not, the follower will reject the synchronization request.

  2. If the log is consistent, the follower will append the suffix log entries to its own log.

  3. The follower will check whether the leader has committed any log entries. If so, the follower will commit the log entries that the leader has committed.

To check whether the log is consistent, the follower will compare the term of the last log entry in the prefix with leader’s prefix_term. If they are not equal, the log is inconsistent. It is true due to a property of Raft: if two nodes have the same log term at the same index, then they have the same log entries at and before that index. Here we don’t give the proof of this property, but you can find it in the original paper.

on receiving (LogRequest,leaderId,term,prefixLen,prefixTerm, leaderCommit,suffix) at node nodeId do
    if term > currentTerm then // if the term of the log request is greater than the current term of the node, then the node should become a follower of the leader
        currentTerm := term; 
        votedFor := null 
        reset election timer
    end if
    
    if term == currentTerm then // if the term of the log request is the same as the current term of the node, then the node should become a follower of the leader (the current node might be a candidate)
        currentRole := follower; 
        currentLeader := leaderId 
    end if
    
    logOk := (log.length >= prefixLen) and (prefixLen == 0 or log[prefixLen − 1].term == prefixTerm) // if logOk is true, then the prefix of the leader is the same as the prefix of the follower. Otherwise, the leader should send the log request again.
    if term = currentTerm and logOk then
        AppendEntries(prefixLen , leaderCommit , suffix ) // update the log using suffix
        ack := prefixLen + suffix.length // the node should notify the leader that it has received the log entries
        send (LogResponse,nodeId,currentTerm,ack,true) to leaderId
    else
        send (LogResponse, nodeId, currentTerm, 0, false) to leaderId
    end if 
end on

Rust implementation in Raft Lite:

fn handle_log_request(&mut self, args: LogRequestArgs) {
    let state = &mut self.state;
    if args.term > state.current_term {
        state.current_term = args.term;
        state.voted_for = None;
        self.runner.election_timer_reset();
    }
    if args.term == state.current_term {
        state.current_role = Role::Follower;
        state.current_leader = Some(args.leader_id);
        self.runner.election_timer_reset();
    }
    let log_ok = (state.log.len() >= args.prefix_len as usize)
        && (args.prefix_len == 0
            || state.log[args.prefix_len as usize - 1].term == args.prefix_term);

    let mut ack = 0;
    let mut success = false;
    let runner = &mut self.runner;
    if args.term == state.current_term && log_ok {
        Self::append_entries(
            state,
            args.prefix_len,
            args.leader_commit,
            args.suffix.clone(),
            runner,
        );
        ack = args.prefix_len + args.suffix.len() as u64;
        success = true;
    }
    let msg = LogResponseArgs {
        follower: state.id,
        term: state.current_term,
        ack,
        success,
    };
    self.runner.log_response(args.leader_id, msg);
}

AppendEntries

AppendEntries is a helper function that appends the suffix log entries to the log of the follower.

Here we check whether the follower has the same suffix log entries as the leader. If not, the follower will remove all the log entries after prefix from its log, and append the suffix log entries from leader to its log.

function AppendEntries(prefixLen,leaderCommit,suffix) 

    if suffix.length > 0 and log.length > prefixLen then // if the suffix of the leader is not empty, and the suffix of the follower is not empty
        index := min(log.length, prefixLen + suffix.length) − 1  // get the index of the last log entry that can be compared
        if log[index].term != suffix[index − prefixLen].term then // if they have different terms, then the suffix of the follower might be different from the suffix of the leader
            log := [log [0], log [1], . . . , log [prefixLen − 1]] // remove the suffix of the follower
        end if
    end if
    
    if prefixLen + suffix.length > log.length then // if the we can find log entries that can be appended
        for i := log.length − prefixLen to suffix.length − 1 do 
            append suffix[i] to log // append the log entries to the log
        end for 
    end if
    
    if leaderCommit > commitLength then // logs[0..leaderCommit − 1] are acknowledged by the majority of nodes, so we can commit those log entries
        for i := commitLength to leaderCommit − 1 do
            deliver log[i].msg to the application 
        end for
        commitLength := leaderCommit 
    end if
end function

Rust implementation in Raft Lite:

fn append_entries(
    state: &mut NodeState,
    prefix_len: u64,
    leader_commit: u64,
    suffix: Vec<Arc<LogEntry>>,
    runner: &mut T,
) {
    if suffix.len() > 0 && state.log.len() > prefix_len as usize {
        let index = min(state.log.len(), prefix_len as usize + suffix.len()) - 1;
        if state.log[index].term != suffix[index - prefix_len as usize].term {
            state.log.truncate(prefix_len as usize);
        }
    }
    if prefix_len as usize + suffix.len() > state.log.len() {
        for i in state.log.len() - prefix_len as usize..suffix.len() {
            state.log.push(suffix[i].clone());
        }
    }
    if leader_commit > state.commit_length {
        for i in state.commit_length..leader_commit {
            runner.deliver_message(state.log[i as usize].payload.to_vec());
        }
        state.commit_length = leader_commit;
    }
}

LogResponse

When the leader receives a log response from a follower, it will perform the following steps:

  1. If the synchronization is successful, the leader will update ackedLength and sentLength of the follower.

  2. If the synchronization is failed, the leader will decrease sentLength of the follower by 1, and try again.

on receiving (LogResponse,follower,term,ack,success) at nodeId do 
    if term == currentTerm and currentRole == leader then
        if success = true and ack >= ackedLength[follower] then // if it is a successful response, and the follower might acknowledge some new log entries
            sentLength[follower] := ack // update the sentLength of the follower
            ackedLength[follower] := ack  // update the ackedLength of the follower
            CommitLogEntries()
        else if sentLength[follower] > 0 then // if it is a failed response, and the leader should try a lower sentLength
            sentLength[follower] := sentLength[follower] − 1 
            ReplicateLog(nodeId,follower) // try again
        end if
    else if term > currentTerm then // if the term of the log response is greater than the current term of the node, then the current node should step down and become a follower
  	    currentTerm := term 
  	    currentRole := follower 
  	    votedFor := null
  	    reset election timer
    end if 
end on

Rust implementation in Raft Lite:

fn handle_log_response(&mut self, args: LogResponseArgs) {
    let state = &mut self.state;
    let runner = &mut self.runner;
    if args.term == state.current_term && state.current_role == Role::Leader {
        if args.success && args.ack >= state.acked_length[args.follower as usize] {
            state.sent_length[args.follower as usize] = args.ack;
            state.acked_length[args.follower as usize] = args.ack;
            Self::commit_log_entries(state, &self.config, runner);
        } else if state.sent_length[args.follower as usize] > 0 {
            state.sent_length[args.follower as usize] -= 1;
            let id = state.id;
            Self::replicate_log(state, id, args.follower, runner);
        }
    } else if args.term > state.current_term {
        state.current_term = args.term;
        state.current_role = Role::Follower;
        state.voted_for = None;
        self.runner.election_timer_reset();
    }
}

CommitLogEntries

If the leader receives a majority of acknowledgements for a log entry, it will commit the log entry.

define acks(x) = |{n in nodes | ackedLength[n] >= x}| // acks(x) returns the number of nodes whose ackedLength is greater than or equal to x

function CommitLogEntries
    minAcks := upper_bound(|nodes| + 1)/2) // minAcks is the minimum number of majority nodes. 
    
    ready := {len in {commitLength + 1, ... , log.length} | acks(len) >=  minAcks } // ready is a set of integers, which contains the log indexes of the log entries that are acknowledged by the majority of nodes
    
    if ready != {} and log[max(ready) − 1].term = currentTerm then // max(ready) is the largest log index in ready.
        for i := commitLength to max(ready) − 1 do
            deliver log[i].msg to the application  
    end for
        commitLength := max(ready) // update commitLength
    end if
end function

Rust implementation in Raft Lite:

fn commit_log_entries(state: &mut NodeState, config: &RaftConfig, runner: &mut T) {
    let min_acks = ((config.peers.len() + 1) + 1) / 2;
    let mut ready_max = 0;
    // here is an optimization: we don't need to iterate through the whole log
    for i in state.commit_length as usize + 1..state.log.len() + 1 {
        if Self::acks(&state.acked_length, i as u64) >= min_acks as u64 {
            ready_max = i;
        }
    }
    if ready_max > 0 && state.log[ready_max - 1].term == state.current_term {
        for i in state.commit_length as usize..ready_max {
            runner.deliver_message(state.log[i].payload.to_vec());
        }
        state.commit_length = ready_max as u64;
    }
}

fn acks(acked_length: &Vec<u64>, length: u64) -> u64 {
    let mut acks = 0;
    for i in 0..acked_length.len() {
        if acked_length[i] >= length {
            acks += 1;
        }
    }
    acks
}