17 minute read

Roundup Part Two

Well, I already broke my rule and stayed coding without writing but here goes nothing. I worked on a lot. On the last roundup on Raft, I was working through supporting snapshots within RaftCore. I successfully finished that and had a good setup for the event loop. One thing I could’ve done was cluster membership changes (i.e. add a node to the cluster) but that does seem like a major rewrite that I wasn’t exactly motivated to work on; I wanted a Raft cluster running and that would’ve gotten in the way! So, I moved on to writing the event loop and will explain some of the design choices on that in this post. Then, we’ll get into actually running the cluster (eek!) and how it actually replicates commands across processes. It’s pretty awesome.

Event Loop

The event loop is fairly straightforward. We’re going to have a type that actually holds the event loop’s state (RaftDriver) and has a method that consumes self to loop over the events that can happen. Then, there will be another type (RaftNode) that will be owned by the application to communicate with the event loop.

The event loop is purposely simple. It needs to worry about three concerns:

  1. Tick on the provided interval to call RaftCore’s tick() method. That method is responsible for things like sending out heartbeats from the leaders or when followers need to know when it is time to try out for an election if the leader potentially died for some reason.
  2. Receive proposals (only as the leader) from the application. This is when the application has a new command that they want to replicate on the log and get it into the cluster.
  3. Receive messages from other nodes and process them. The node is a part of a cluster, so it needs to take part in elections, log replication, etc. so it has to be able to talk to other nodes.

Of course, RaftCore is the synchronous, sans-I/O approach to Raft. It has methods to handle each message type and a method to propose a new command. Those methods will all emit a Vec<Action> which are just ‘actions’ that the event loop should take. There are some contracts with the ordering of the Vec and whether or not the event loop is allowed to perform those Action’s concurrently or if those must be done. For example, messages can be sent off to another thread/task to be sent because the node doesn’t care if the recipient actually receives the messages (that’s the point of Raft, a node may not receive a message when it has crashed but Raft should still continue). However, saving persistent state (like the log or the current term) needs to be done before allowing any other Action to be processed. In this setup with tokio, calling .await in the event loop means that that task will block (no other ticks or messages will be processed) and ensure that Action has been taken care of before moving on to the next thing. A rule I’ve heard is ‘never block the event loop’, but in this case, I do want to make sure no new commands come in or messages are processed until that Action is done.

This means the event loop isn’t really that bad. It’s basically a tokio::select! in a loop that will worry about its concerns by either calling methods on RaftCore, saving files, or send messages to other nodes.

Which brings me to the key of the event loop which is the Transport and Storage traits. These traits define the networking and ‘durable’ storage contract respectively. For Transport there are two simple methods: send() and recv(). All that means is a type implementing the trait needs to provide a way to send a message to a specified node and receive messages from a stream from another node. Pretty simple. The only gotcha is that the send() method can be ‘in parallel’ meaning in our case, I can spawn off a new tokio task to send the message and not await it. And the recv() needs to be cancel safe, which is fine because there should be a separate reader task that just funnels the messages from all nodes into a single channel that the event loop reads from.

For Storage, each type of storage in Raft has its own set of store/restore methods. For example, the log has to be stored when new entries are added, so there is a store_log_entries and restore_log_entries method. These can store in any way the type chooses, but the guarantee is that it is durable storage when it returns (or is awaited on in the tokio world). The storage could be anything, but as you’ll see, I provide a straightforward (and likely normal?) way to just store this information on the local filesystem. But, you could imagine that even durable storage writes the information to an S3 bucket somewhere or something like that).

So, that’s the event loop. Pretty simple and generic on the actual way to send messages and store/restore the persistent state. Yet again, I did the engineering thing of doing a little bit of work and abstracting away a little more to someone else, but I promise eventually I’ll get you there. Next, let’s explore the Transport I designed for the example key-value store and then I’ll dive into the Storage and finally the actual key-value store that is just a simple HTTP server that interacts with the event loop.

Transport Implementation

For the Transport, I selected basic TCP connections to start. I played around with the idea of QUIC but also didn’t want to work through the pain of setting up certificates, etc. It can be done with a bit of work but for the first implementation, I chose TCP.

Granted, I didn’t use TCP exactly in its perfect design and in reality used it more like UDP than TCP. In fact, a UDP implementation probably would’ve been less code, but conceptually it is the same. I say this because I didn’t leverage the bi-directional nature of TCP. Connections were kept ‘alive’ but they were only one-way communications. For example, a node will start up and first loop through the list of nodes (to include the address that the current node (‘this’) should listen on for other Raft participants). If it finds itself, it will spawn off a listener task that will listen on the designated port and then upon any client connection, spawn another task off to actual read from the connected client and deserialize any messages into the Message struct and send them down a shared channel that the Transport::recv() implementation for the trait will read from to provide to the event loop. It generally looks like this:

let (receiver_tx, receiver_rx) = mpsc::unbounded_channel();
for n in nodes {
    if n.id == id {
        let rtx = receiver_tx.clone();
        let listener = TcpListener::bind(n.address)
            .await
            .expect("Unable to bind to address");
        tokio::spawn(async move {
            while let Ok((client_stream, _)) = listener.accept().await {
                let pipe = rtx.clone();
                tokio::spawn(async move {
                    let mut framed =
                        Framed::new(client_stream, LengthDelimitedCodec::new());
                    while let Some(Ok(buf)) = framed.next().await {
                        let Ok(msg) = postcard::from_bytes(&buf) else {
                            continue;
                        };
                        let _ = pipe.send(msg);
                    }
                });
            }
        });
    } else {
        ... /* talk about this below */
    }
}

The receiver_rx channel is then simply read from in the recv() impl of the Transport trait:

impl Transport for TcpTransport {
    ...

    async fn recv(&mut self) -> Message {
        self.receiver_rx.recv().await.expect("Receiving pipe lost")
    }
}

Pretty simple trait implementation there! I’ll also be forthright that there are a good deal of unwrap() and expect() here. My argument is that most of those situations are fairly unrecoverable. If the process ran out of memory with an unbounded channel or the sending side dropped on the receiver_rx that’s pretty much a done deal for the Raft node and the process might as well die and restart. That’s why Raft has storage and a consensus algorithm, things are expected to fail! (I know, that shouldn’t be because of my code, but I don’t even know what the right way to handle that error is; if I can’t receive from other nodes for some reason, I’m dead anyway, it doesn’t change anything).

For sending messages to other nodes, I spawn off a ‘sending’ task for each node. So in the else block in that loop above, I want to get another unbounded channel up and continually receive from that channel for any messages destined to that particular node. Then, I’ll try my best to keep the connection up and reestablish if its lost and send the message. Again, this is a case where I heavily use tokio tasks and channels to make my life easier and just receive from channel -> send down TCP stream. Here’s the else block:

let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
let address = n.address;
tokio::spawn(async move {
    let mut stream: Option<Framed<TcpStream, LengthDelimitedCodec>> = None;
    while let Some(msg) = rx.recv().await {
        if stream.is_none() {
            let Ok(connected_stream) = TcpStream::connect(address).await else {
                // can't connect so just continue the loop for the next message to
                // try again
                continue;
            };
            let framed = Framed::new(connected_stream, LengthDelimitedCodec::new());
            stream = Some(framed);
        }

        if let Some(s) = stream.as_mut() {
            let Ok(buf) = postcard::to_allocvec(&msg) else {
                continue;
            };
            if s.send(buf.into()).await.is_err() {
                stream = None;
            }
        }
    }
});
senders.insert(n.id, tx);

The sending side of that channel is placed in a HashMap so that the send trait implementation can just lookup the channel for the node to send to, place it in the channel and return. That way the message gets sent in another task and in a ‘fire and forget’ sort of way:

async fn send(&self, node: NodeId, message: Message) {
       let Some(sender) = self.senders.get(&node) else {
           return;
       };
       let _ = sender.send(message);
}

Another simple one to conceptualize on the actual trait implementation (I love when the struct impl actually hides all the complexity). Onward to Storage.

Storage Implementation

This one is a little more straightforward. In the end, this is just writing files. For snapshots and the ‘metadata’ as I like to call it, I really just slap #[derive(Serialize, Deserialize)] on the types I want to write to the file and then use postcard to serialize it into bytes. For example, here’s the type for snapshots:

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Snapshot {
    pub last_included_index: u64,
    pub last_included_term: u64,
    pub data: Vec<u8>,
}

Straightforward and it’s as simple as: postcard::to_allocvec(&snapshot) to get the bytes and use tokio::fs to write the Vec to the file, sync it, and then do an atomic move (rename()) to the actual file. Dead simple.

The interesting part is the log. First, commands are arbitrary sizes, so the easiest method is to do a length-prefixed frame with the length as a u32 and then the serialized command. Nothing too crazy there. The kicker was that I needed to also store off the index of the log entry because after a snapshot, the entries in the log no longer start from index 1, they start from the snapshot’s last included index + 1. There are also cases in the Raft algorithm where previous log entries that would have been committed to the log file (maybe not committed, but added to the durable file) would be overwritten if a leader came in and corrected the log. As a result, for every read, the log file needed to be processed and each frame needed to be read and then the file needed to be truncated at the place new entries would start. Most of the time, this would just be the end, but it could be in the middle. Also, if a partial write occurred and a full frame didn’t get in there, I couldn’t return a half-baked log entry, so I had to deal with that as well. The general logic for reading through the log file and then selecting a truncate point looked like this (it’s pretty similar for restore_log_entry but the store_log_entry needed this as well to find the spot to write):

let mut reader = BufReader::new(log);
let mut pos = 0;

loop {
    let Ok(index) = reader.read_u64().await else {
        // no more frames to read
        break;
    };

    if index == addendum.start_index {
        // we should overwrite this entry
        break;
    }

    let Ok(length) = reader.read_u32().await else {
        break;
    };

    let mut entry = vec![0u8; length as usize];
    if reader.read_exact(&mut entry).await.is_err() {
        break;
    }
    // clean read of a frame, advance our position and keep going
    pos += size_of::<u64>() + size_of::<u32>() + length as usize;
}

Once each frame has been processed and I find the truncation point, I just start writing from that position in the file and the new entries are added.

Beyond that Storage was fairly straightforward. Once that was done, it was time to wire it up with the key value server!

Example Key Value Server

In crates/raft-kv you can see it all come together. I will just hit on the setup and the two handlers.

First, I defined a struct for arguments using clap to start. It’s a bit manual but works for this demonstration. Once those are parsed, I create a config type for the RaftNode and start up the node (meaning the event loop kicks off and starts participating in the cluster).

let config = build_config(args).await;
tracing::info!("Initializing Raft node");
let (raft_node, mut applied_receiver) = RaftNode::new(config).await;

Then, I setup the state I share between the handlers. This state is defined like this:

struct AppState {
    node: RaftNode,

    store: Arc<RwLock<HashMap<String, String>>>,
}

It simply holds the RaftNode to communicate to the cluster and the key value store itself (wrapped in a Arc<RwLock<>> so that readers can read quick). It’s initialized in main like this:

let store = Arc::new(RwLock::new(HashMap::new()));
let state = Arc::new(AppState {
    node: raft_node,
    store: store.clone(),
});

Before starting the HTTP server, though, I need to take the mpsc::Receiver that RaftNode::new gives and spawn another task off to read from that applied channel. That’s the channel that the event loop will write to when the application needs to do something. The three things the event loop will ask is to 1) apply a command to the store (like a Get or Set), 2) install a new snapshot (meaning the event loop has put a new snapshot file on durable storage and the entire state of the store needs to change), or 3) please give me a current snapshot of your application state and the event loop will use that as the snapshot bytes for other nodes. The most interesting situation is actually receiving a command to apply so that arm of the match statement after reading from the applied channel looks like this:

AppliedEntry::Command(cmd) => {
    // deserialize the command to figure out what we do with it
    let Ok(kv_cmd) = postcard::from_bytes(&cmd) else {
        continue;
    };
    match kv_cmd {
        KvCommand::Get(_key) => {}
        KvCommand::Set(key, value) => {
            let mut writer = map.write().unwrap();
            let _ = writer.insert(key, value); // don't care if had been set
        }
        KvCommand::Delete(key) => {
            let mut writer = map.write().unwrap();
            let _ = writer.remove(&key); // don't care if had been set
        }
    }
}

It’s as simple as deserializing the command from the log, grabbing a write lock to the HashMap and updating as necessary. The Get command is blank and actually will likely get taken out when I make future changes, but I touch on that more below. For the snapshot cases, it’s really just the difference between deserializing the ‘full’ state and setting that state or just serializing the current state and sending it over a oneshot channel for the event loop to process more from there. Now, let’s talk about the simple HTTP handlers that interact with the cluster!

For reading a key, I haven’t set things up in RaftCore to do linearizable reads (meaning reads could be stale between the time a command is proposed and actually applied to the state machine). I plan to work on that next but for now, we do local reads so the GET handler for that just looks like this:

async fn get_handler(
    State(state): State<Arc<AppState>>,
    Path(key): Path<String>,
) -> Result<Json<GetResponse>, StatusCode> {
    let state = state
        .store
        .read()
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    let value = state.get(&key);
    Ok(Json(GetResponse {
        value: value.cloned(),
    }))
}

Simple again! Get a read lock on the HashMap, read the specified key, and send it back to the user. Set is really just as simple:

async fn set_handler(
    State(state): State<Arc<AppState>>,
    Json(body): Json<SetRequest>,
) -> Result<Json<SetResponse>, StatusCode> {
    let command = postcard::to_allocvec(&KvCommand::Set(body.key, body.value))
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    state
        .node
        .propose(command)
        .await
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    Ok(Json(SetResponse { success: true }))
}

This just uses the provided propose command on the RaftNode type and waits until that returns. There are instances, such as when there is no majority, where an HTTP client would hang here. I believe that is expected behavior according to Raft, but could also change in the future. One way that propose could fail is if the node is not the leader (because Raft prohibits anyone but the leader from proposing new commands). Right now, I just throw up my hands and give a 500 Internal Server Error but could probably enhance the knowledge of the cluster and the return value to include the address of the current leader and send back a 302 Redirect to instruct the client to try the request again there. For now, I think this is fine.

Running it!

Now the moment of truth, running three nodes. I’ll start them up like so in three different shells:

cargo run -p raft-kv -- --nodes 127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003 --id 1 --addr 0.0.0.0:3001 --directory /tmp/raft-kv/node1
cargo run -p raft-kv -- --nodes 127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003 --id 2 --addr 0.0.0.0:3002 --directory /tmp/raft-kv/node2
cargo run -p raft-kv -- --nodes 127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003 --id 3 --addr 0.0.0.0:3003 --directory /tmp/raft-kv/node3

And then see the cluster get its leader elected fairly quickly:

INFO raft_kv: Initializing persistent storage and transport
INFO raft_kv: Initializing Raft node
INFO raftcore: initializing Raft from empty state id=1
INFO raft_event_loop: Starting event loop node_id=1
INFO raftcore: election timeout reached; beginning election node_id=1
INFO raftcore: election timeout reached; beginning election node_id=1
INFO raftcore: election timeout reached; beginning election node_id=1
INFO raftcore: election timeout reached; beginning election node_id=1
INFO raftcore: election timeout reached; beginning election node_id=1
INFO raftcore: won election, transition to Leader node_id=1
INFO raftcore: update commit index from 0 to 1 (apply 1 entries) self.id=1

And can also see that log entries have made their way in (because of the no-op entry a leader makes upon being elected):

xxd /tmp/raft-kv/node1/log
# Me, here, notice the 64-bit big endian index (0x0000000000000001)
#                  |
#         ___________________ 
00000000: 0000 0000 0000 0001 0000 0002 0500       ..............
                              --------- ^^^^
                                  |       |
                                  |       |
the 32-bit length of the command          |
                                          |
the no-op command (postcard did this)     |

I can propose a command:

curl -X POST http://localhost:3001/key -H 'Content-Type: application/json' -d '{"key":"hello","value":"world"}'
{"success":true}%

And (because we allow for anyone to read and don’t have it linearizable) I can ask node2 for that value:

curl http://localhost:3002/key/hello
{"value":"world"}%

I can even kill off the current leader (node1 in my case) and see the cluster heal and still accept new commands on the new leader (node3 won this time):

curl -X POST http://localhost:3003/key -H 'Content-Type: application/json' -d '{"key":"dead","value":"leader"}'
{"success":true}%

And then start node1 back up and read the new command it missed (and see that upon startup, immediately healed and got the state it needed):

curl http://localhost:3001/key/dead
{"value":"leader"}%

Besides cluster membership, a few cosmetic improvements, and linearizable reads, this project is pretty much complete. I may come back and work more on it, but I’m fairly proud that I was able to build this from the ground up. I’m likely going to shift more focus on the FoundryDB series but may even design that so it can use Raft as the storage layer to replicate the database across a cluster of Raft nodes (who knows!). This was a great project and I really learned a TON. It was my first sans-I/O approach and it really clicked on how to separate the concerns. I’m very thankful to have support from Claude as my “LLM Instructor” who guided me along and pointed out issues and supported design (weird world to be thanking an LLM? but I guess I should thank Anthropic for providing the product?). While I know Claude could’ve written this in a day, I wouldn’t have learned nearly as much and I know the code inside and out. Claude was supportive to be a sanity check for me as a lone Raft programmer but I now know Raft extremely well and the challenge of wiring this entire system up was exceptionally rewarding. Just seeing commands replicate is pretty awesome!!

Until next time! (I’ll try to follow my rules more).

You can checkout the entire repo here (notice that both Claude and me diverged from the original CLAUDE.md file once things got real). I did have Claude generate the README because I have more code to write!!!

Link to the Repo

Roundup Series

In this section of the Roundup’s, I’ll post the “series” so you can cycle through!

Previous Roundup Next Roundup
Roundup - April 9th Coming Soon!