Roundup - May 11th
Roundup Part Two
Well, I already broke my rule and stayed coding without writing but here goes nothing. I worked on a lot. On the last roundup on Raft, I was working through supporting snapshots within RaftCore. I successfully finished that and had a good setup for the event loop. One thing I could’ve done was cluster membership changes (i.e. add a node to the cluster) but that does seem like a major rewrite that I wasn’t exactly motivated to work on; I wanted a Raft cluster running and that would’ve gotten in the way! So, I moved on to writing the event loop and will explain some of the design choices on that in this post. Then, we’ll get into actually running the cluster (eek!) and how it actually replicates commands across processes. It’s pretty awesome.
Event Loop
The event loop is fairly straightforward. We’re going to have a type that
actually holds the event loop’s state (RaftDriver) and has a method that
consumes self to loop over the events that can happen. Then, there will be
another type (RaftNode) that will be owned by the application to communicate
with the event loop.
The event loop is purposely simple. It needs to worry about three concerns:
- Tick on the provided interval to call
RaftCore’stick()method. That method is responsible for things like sending out heartbeats from the leaders or when followers need to know when it is time to try out for an election if the leader potentially died for some reason. - Receive proposals (only as the leader) from the application. This is when the application has a new command that they want to replicate on the log and get it into the cluster.
- Receive messages from other nodes and process them. The node is a part of a cluster, so it needs to take part in elections, log replication, etc. so it has to be able to talk to other nodes.
Of course, RaftCore is the synchronous, sans-I/O approach to Raft. It has
methods to handle each message type and a method to propose a new command.
Those methods will all emit a Vec<Action> which are just ‘actions’ that the
event loop should take. There are some contracts with the ordering of the Vec
and whether or not the event loop is allowed to perform those Action’s
concurrently or if those must be done. For example, messages can be sent off to
another thread/task to be sent because the node doesn’t care if the recipient
actually receives the messages (that’s the point of Raft, a node may not receive
a message when it has crashed but Raft should still continue). However, saving
persistent state (like the log or the current term) needs to be done before
allowing any other Action to be processed. In this setup with tokio, calling
.await in the event loop means that that task will block (no other ticks or
messages will be processed) and ensure that Action has been taken care of
before moving on to the next thing. A rule I’ve heard is ‘never block the event
loop’, but in this case, I do want to make sure no new commands come in or
messages are processed until that Action is done.
This means the event loop isn’t really that bad. It’s basically a
tokio::select! in a loop that will worry about its concerns by either calling
methods on RaftCore, saving files, or send messages to other nodes.
Which brings me to the key of the event loop which is the Transport and
Storage traits. These traits define the networking and ‘durable’ storage
contract respectively. For Transport there are two simple methods: send()
and recv(). All that means is a type implementing the trait needs to provide a
way to send a message to a specified node and receive messages from a stream
from another node. Pretty simple. The only gotcha is that the send() method
can be ‘in parallel’ meaning in our case, I can spawn off a new tokio task
to send the message and not await it. And the recv() needs to be cancel
safe, which is fine because there should be a separate reader task that just
funnels the messages from all nodes into a single channel that the event loop
reads from.
For Storage, each type of storage in Raft has its own set of store/restore
methods. For example, the log has to be stored when new entries are added, so
there is a store_log_entries and restore_log_entries method. These can store
in any way the type chooses, but the guarantee is that it is durable storage
when it returns (or is awaited on in the tokio world). The storage could be
anything, but as you’ll see, I provide a straightforward (and likely normal?) way
to just store this information on the local filesystem. But, you could imagine
that even durable storage writes the information to an S3 bucket somewhere or
something like that).
So, that’s the event loop. Pretty simple and generic on the actual way to send
messages and store/restore the persistent state. Yet again, I did the
engineering thing of doing a little bit of work and abstracting away a little
more to someone else, but I promise eventually I’ll get you there. Next, let’s explore the
Transport I designed for the example key-value store and then I’ll dive into
the Storage and finally the actual key-value store that is just a simple HTTP
server that interacts with the event loop.
Transport Implementation
For the Transport, I selected basic TCP connections to start. I played around
with the idea of QUIC but also didn’t want to work through the pain of setting
up certificates, etc. It can be done with a bit of work but for the first
implementation, I chose TCP.
Granted, I didn’t use TCP exactly in its perfect design and in reality used it
more like UDP than TCP. In fact, a UDP implementation probably would’ve been
less code, but conceptually it is the same. I say this because I didn’t leverage
the bi-directional nature of TCP. Connections were kept ‘alive’ but they were
only one-way communications. For example, a node will start up and first loop
through the list of nodes (to include the address that the current node (‘this’)
should listen on for other Raft participants). If it finds itself, it will spawn
off a listener task that will listen on the designated port and then upon any
client connection, spawn another task off to actual read from the connected
client and deserialize any messages into the Message struct and send them down
a shared channel that the Transport::recv() implementation for the trait will
read from to provide to the event loop. It generally looks like this:
let (receiver_tx, receiver_rx) = mpsc::unbounded_channel();
for n in nodes {
if n.id == id {
let rtx = receiver_tx.clone();
let listener = TcpListener::bind(n.address)
.await
.expect("Unable to bind to address");
tokio::spawn(async move {
while let Ok((client_stream, _)) = listener.accept().await {
let pipe = rtx.clone();
tokio::spawn(async move {
let mut framed =
Framed::new(client_stream, LengthDelimitedCodec::new());
while let Some(Ok(buf)) = framed.next().await {
let Ok(msg) = postcard::from_bytes(&buf) else {
continue;
};
let _ = pipe.send(msg);
}
});
}
});
} else {
... /* talk about this below */
}
}
The receiver_rx channel is then simply read from in the recv() impl of the
Transport trait:
impl Transport for TcpTransport {
...
async fn recv(&mut self) -> Message {
self.receiver_rx.recv().await.expect("Receiving pipe lost")
}
}
Pretty simple trait implementation there! I’ll also be forthright that there are
a good deal of unwrap() and expect() here. My argument is that most of those
situations are fairly unrecoverable. If the process ran out of memory with an unbounded
channel or the sending side dropped on the receiver_rx that’s pretty much a
done deal for the Raft node and the process might as well die and restart.
That’s why Raft has storage and a consensus algorithm, things are expected to fail!
(I know, that shouldn’t be because of my code, but I don’t even know what the
right way to handle that error is; if I can’t receive from other nodes for some
reason, I’m dead anyway, it doesn’t change anything).
For sending messages to other nodes, I spawn off a ‘sending’ task for each node.
So in the else block in that loop above, I want to get another unbounded
channel up and continually receive from that channel for any messages destined
to that particular node. Then, I’ll try my best to keep the connection up and
reestablish if its lost and send the message. Again, this is a case where I
heavily use tokio tasks and channels to make my life easier and just
receive from channel -> send down TCP stream. Here’s the else block:
let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
let address = n.address;
tokio::spawn(async move {
let mut stream: Option<Framed<TcpStream, LengthDelimitedCodec>> = None;
while let Some(msg) = rx.recv().await {
if stream.is_none() {
let Ok(connected_stream) = TcpStream::connect(address).await else {
// can't connect so just continue the loop for the next message to
// try again
continue;
};
let framed = Framed::new(connected_stream, LengthDelimitedCodec::new());
stream = Some(framed);
}
if let Some(s) = stream.as_mut() {
let Ok(buf) = postcard::to_allocvec(&msg) else {
continue;
};
if s.send(buf.into()).await.is_err() {
stream = None;
}
}
}
});
senders.insert(n.id, tx);
The sending side of that channel is placed in a HashMap so that the send
trait implementation can just lookup the channel for the node to send
to, place it in the channel and return. That way the message gets sent in
another task and in a ‘fire and forget’ sort of way:
async fn send(&self, node: NodeId, message: Message) {
let Some(sender) = self.senders.get(&node) else {
return;
};
let _ = sender.send(message);
}
Another simple one to conceptualize on the actual trait implementation (I love
when the struct impl actually hides all the complexity). Onward to Storage.
Storage Implementation
This one is a little more straightforward. In the end, this is just writing files.
For snapshots and the ‘metadata’ as I like to call it, I really just slap
#[derive(Serialize, Deserialize)] on the types I want to write to the file
and then use postcard to serialize it into bytes. For example, here’s the type
for snapshots:
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Snapshot {
pub last_included_index: u64,
pub last_included_term: u64,
pub data: Vec<u8>,
}
Straightforward and it’s as simple as: postcard::to_allocvec(&snapshot) to get
the bytes and use tokio::fs to write the Vec to the file, sync it, and then
do an atomic move (rename()) to the actual file. Dead simple.
The interesting part is the log. First, commands are arbitrary sizes, so the
easiest method is to do a length-prefixed frame with the length as a u32 and
then the serialized command. Nothing too crazy there. The kicker was that I
needed to also store off the index of the log entry because after a snapshot,
the entries in the log no longer start from index 1, they start from the
snapshot’s last included index + 1. There are also cases in the Raft algorithm
where previous log entries that would have been committed to the log file (maybe
not committed, but added to the durable file) would be overwritten if a leader
came in and corrected the log. As a result, for every read, the log file needed
to be processed and each frame needed to be read and then the file needed to be
truncated at the place new entries would start. Most of the time, this would
just be the end, but it could be in the middle. Also, if a partial write
occurred and a full frame didn’t get in there, I couldn’t return a half-baked
log entry, so I had to deal with that as well. The general logic for reading
through the log file and then selecting a truncate point looked like this (it’s
pretty similar for restore_log_entry but the store_log_entry needed this as
well to find the spot to write):
let mut reader = BufReader::new(log);
let mut pos = 0;
loop {
let Ok(index) = reader.read_u64().await else {
// no more frames to read
break;
};
if index == addendum.start_index {
// we should overwrite this entry
break;
}
let Ok(length) = reader.read_u32().await else {
break;
};
let mut entry = vec![0u8; length as usize];
if reader.read_exact(&mut entry).await.is_err() {
break;
}
// clean read of a frame, advance our position and keep going
pos += size_of::<u64>() + size_of::<u32>() + length as usize;
}
Once each frame has been processed and I find the truncation point, I just start writing from that position in the file and the new entries are added.
Beyond that Storage was fairly straightforward. Once that was done, it was
time to wire it up with the key value server!
Example Key Value Server
In crates/raft-kv you can see it all come together. I will just hit on the
setup and the two handlers.
First, I defined a struct for arguments using clap to start. It’s a bit
manual but works for this demonstration. Once those are parsed, I create a
config type for the RaftNode and start up the node (meaning the event loop
kicks off and starts participating in the cluster).
let config = build_config(args).await;
tracing::info!("Initializing Raft node");
let (raft_node, mut applied_receiver) = RaftNode::new(config).await;
Then, I setup the state I share between the handlers. This state is defined like this:
struct AppState {
node: RaftNode,
store: Arc<RwLock<HashMap<String, String>>>,
}
It simply holds the RaftNode to communicate to the cluster and the key value
store itself (wrapped in a Arc<RwLock<>> so that readers can read quick). It’s
initialized in main like this:
let store = Arc::new(RwLock::new(HashMap::new()));
let state = Arc::new(AppState {
node: raft_node,
store: store.clone(),
});
Before starting the HTTP server, though, I need to take the mpsc::Receiver
that RaftNode::new gives and spawn another task off to read from that applied
channel. That’s the channel that the event loop will write to when the
application needs to do something. The three things the event loop will ask is
to 1) apply a command to the store (like a Get or Set), 2) install a new
snapshot (meaning the event loop has put a new snapshot file on durable storage
and the entire state of the store needs to change), or 3) please give me a
current snapshot of your application state and the event loop will use that as
the snapshot bytes for other nodes. The most interesting situation is actually
receiving a command to apply so that arm of the match statement after reading
from the applied channel looks like this:
AppliedEntry::Command(cmd) => {
// deserialize the command to figure out what we do with it
let Ok(kv_cmd) = postcard::from_bytes(&cmd) else {
continue;
};
match kv_cmd {
KvCommand::Get(_key) => {}
KvCommand::Set(key, value) => {
let mut writer = map.write().unwrap();
let _ = writer.insert(key, value); // don't care if had been set
}
KvCommand::Delete(key) => {
let mut writer = map.write().unwrap();
let _ = writer.remove(&key); // don't care if had been set
}
}
}
It’s as simple as deserializing the command from the log, grabbing a write lock
to the HashMap and updating as necessary. The Get command is blank and
actually will likely get taken out when I make future changes, but I touch on
that more below. For the snapshot cases, it’s really just the difference between
deserializing the ‘full’ state and setting that state or just serializing the
current state and sending it over a oneshot channel for the event loop to
process more from there. Now, let’s talk about the simple HTTP handlers that
interact with the cluster!
For reading a key, I haven’t set things up in RaftCore to do linearizable
reads (meaning reads could be stale between the time a command is proposed and
actually applied to the state machine). I plan to work on that next but for now,
we do local reads so the GET handler for that just looks like this:
async fn get_handler(
State(state): State<Arc<AppState>>,
Path(key): Path<String>,
) -> Result<Json<GetResponse>, StatusCode> {
let state = state
.store
.read()
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let value = state.get(&key);
Ok(Json(GetResponse {
value: value.cloned(),
}))
}
Simple again! Get a read lock on the HashMap, read the specified key, and send
it back to the user. Set is really just as simple:
async fn set_handler(
State(state): State<Arc<AppState>>,
Json(body): Json<SetRequest>,
) -> Result<Json<SetResponse>, StatusCode> {
let command = postcard::to_allocvec(&KvCommand::Set(body.key, body.value))
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
state
.node
.propose(command)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(SetResponse { success: true }))
}
This just uses the provided propose command on the RaftNode type and waits
until that returns. There are instances, such as when there is no majority,
where an HTTP client would hang here. I believe that is expected behavior
according to Raft, but could also change in the future. One way that propose
could fail is if the node is not the leader (because Raft prohibits anyone but
the leader from proposing new commands). Right now, I just throw up my hands and
give a 500 Internal Server Error but could probably enhance the knowledge of the
cluster and the return value to include the address of the current leader and
send back a 302 Redirect to instruct the client to try the request again there.
For now, I think this is fine.
Running it!
Now the moment of truth, running three nodes. I’ll start them up like so in three different shells:
cargo run -p raft-kv -- --nodes 127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003 --id 1 --addr 0.0.0.0:3001 --directory /tmp/raft-kv/node1
cargo run -p raft-kv -- --nodes 127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003 --id 2 --addr 0.0.0.0:3002 --directory /tmp/raft-kv/node2
cargo run -p raft-kv -- --nodes 127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003 --id 3 --addr 0.0.0.0:3003 --directory /tmp/raft-kv/node3
And then see the cluster get its leader elected fairly quickly:
INFO raft_kv: Initializing persistent storage and transport
INFO raft_kv: Initializing Raft node
INFO raftcore: initializing Raft from empty state id=1
INFO raft_event_loop: Starting event loop node_id=1
INFO raftcore: election timeout reached; beginning election node_id=1
INFO raftcore: election timeout reached; beginning election node_id=1
INFO raftcore: election timeout reached; beginning election node_id=1
INFO raftcore: election timeout reached; beginning election node_id=1
INFO raftcore: election timeout reached; beginning election node_id=1
INFO raftcore: won election, transition to Leader node_id=1
INFO raftcore: update commit index from 0 to 1 (apply 1 entries) self.id=1
And can also see that log entries have made their way in (because of the no-op entry a leader makes upon being elected):
xxd /tmp/raft-kv/node1/log
# Me, here, notice the 64-bit big endian index (0x0000000000000001)
# |
# ___________________
00000000: 0000 0000 0000 0001 0000 0002 0500 ..............
--------- ^^^^
| |
| |
the 32-bit length of the command |
|
the no-op command (postcard did this) |
I can propose a command:
curl -X POST http://localhost:3001/key -H 'Content-Type: application/json' -d '{"key":"hello","value":"world"}'
{"success":true}%
And (because we allow for anyone to read and don’t have it linearizable) I can ask node2 for that value:
curl http://localhost:3002/key/hello
{"value":"world"}%
I can even kill off the current leader (node1 in my case) and see the cluster heal and still accept new commands on the new leader (node3 won this time):
curl -X POST http://localhost:3003/key -H 'Content-Type: application/json' -d '{"key":"dead","value":"leader"}'
{"success":true}%
And then start node1 back up and read the new command it missed (and see that upon startup, immediately healed and got the state it needed):
curl http://localhost:3001/key/dead
{"value":"leader"}%
Besides cluster membership, a few cosmetic improvements, and linearizable reads, this project is pretty much complete. I may come back and work more on it, but I’m fairly proud that I was able to build this from the ground up. I’m likely going to shift more focus on the FoundryDB series but may even design that so it can use Raft as the storage layer to replicate the database across a cluster of Raft nodes (who knows!). This was a great project and I really learned a TON. It was my first sans-I/O approach and it really clicked on how to separate the concerns. I’m very thankful to have support from Claude as my “LLM Instructor” who guided me along and pointed out issues and supported design (weird world to be thanking an LLM? but I guess I should thank Anthropic for providing the product?). While I know Claude could’ve written this in a day, I wouldn’t have learned nearly as much and I know the code inside and out. Claude was supportive to be a sanity check for me as a lone Raft programmer but I now know Raft extremely well and the challenge of wiring this entire system up was exceptionally rewarding. Just seeing commands replicate is pretty awesome!!
Until next time! (I’ll try to follow my rules more).
You can checkout the entire repo here (notice that both Claude and me diverged
from the original CLAUDE.md file once things got real). I did have Claude
generate the README because I have more code to write!!!
Roundup Series
In this section of the Roundup’s, I’ll post the “series” so you can cycle through!
| Previous Roundup | Next Roundup |
|---|---|
| Roundup - April 9th | Coming Soon! |