Skip to content

cocky-punch/raft

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

40 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Raft [WIP]

An implementation of Raft Consensus Algorithm in Zig

Features

  • Leader tracking
  • In-memory transport
  • Cluster-wide coordination
  • Basic TCP transport
  • gRPC transport
  • Client redirection
  • Election timeout logic
  • Client command submission
  • Log replication
  • Apply committed entries
  • Heartbeats / ticks
  • State machine plumbing
  • Client acknowledgment / client request retries - confirmations
  • Persistent log
  • Rotation of the log files
  • Persistent snapshots
  • Proper handler of TimeoutNow - Leadership Transfer Mechanism - message (Raft extensions)
  • Read-only GET message (Raft extensions)

Installation

Fetch the master or a release

zig fetch --save https://github.com/cocky-punch/raft/archive/refs/heads/master.tar.gz
# or by a release version
# zig fetch --save https://github.com/cocky-punch/raft/archive/refs/tags/[RELEASE_VERSION].tar.gz

which will save a reference to the library into build.zig.zon

Then add this into build.zig

// after "b.installArtifact(exe)" line
const raft = b.dependency("raft", .{
  .target = target,
  .optimize = optimize,
});
exe.root_module.addImport("raft", raft.module("raft"));

Usage

  • Implement MyStateMachine#apply(...)
  • create a config for each node
  • in each config specify a node itself and all the nodes, including itself again, of a cluster
  • run each of the nodes
  • interract with them via the cli-client
const std = @import("std");
const raft = @import("raft");

const Allocator = std.mem.Allocator;

// Your own state machine implementation.
// This will apply committed commands (Set/Delete) to a file
const MyStateMachine = struct {
    db_file: std.fs.File,

    // Interface function
    // Custom implementation, for a custom storage
    pub fn apply(self: *MyStateMachine, entry: raft.LogEntry) void {
        switch (entry.command) {
            .Set => |cmd| {
                // Simulate persistent insert by appending to file
                _ = self.db_file.writer().print("SET {s} {any}\n", .{cmd.key, cmd.value}) catch {};
            },
            .Delete => |cmd| {
                _ = self.db_file.writer().print("DELETE {s}\n", .{cmd.key}) catch {};
            },


            //TODO
            .Update => |u| {
                // Optional: check if key exists first
                if (!self.db.contains(u.key)) return error.KeyNotFound;
                try self.db_file_update_key(u.key, u.value); // Overwrite
            },
            //TODO
            .Get => |cmd| {
                // find a value in the file
            },
        }
    }

    pub fn init() !MyStateMachine {
        const f = try std.fs.cwd().createFile("raft_db.txt", .{ .read = true, .truncate = false });
        return MyStateMachine{ .db_file = f };
    }

    pub fn deinit(self: *MyStateMachine) void {
        self.db_file.close();
    }

    //mock method
    inline fn db_file_contain_key(k: string) bool {
        return true;
    }

    //mock method
    inline fn db_file_update_key(k: string, v: string) void {
    }
};

pub fn main() !void {
    const allocator = std.heap.page_allocator;

    // Load config
    const config = try raft.loadConfig(allocator, "node1_raft.yaml");

    // Wrap the state machine logic
    var sm_impl = try MyStateMachine.init();
    const sm = raft.StateMachine(MyStateMachine){
        .ctx = &sm_impl,
    };

    const Node = raft.RaftNode(MyStateMachine);
    const ClusterT = raft.Cluster(MyStateMachine);
    var cluster = ClusterT.init(allocator);
    var node = try Node.init(allocator, config.self_id, sm);

    for (config.nodes) |peer| {
        try cluster.addNodeAddress(peer.id, peer.address);
    }

    // Start TCP server in background
    var server = raft.RaftTcpServer(MyStateMachine).init(
        allocator,
        &node,
        &cluster,
        50, // max clients
    );
    const self_port = blk: {
        for (config.nodes) |p| {
            if (p.id == config.self_id) break :blk p.address.port;
        }
        return error.SelfNotFound;
    };
    const t = try std.Thread.spawn(.{}, raft.RaftTcpServer(MyStateMachine).start, .{ &server, self_port });
    t.detach();

    // Main loop: Raft ticking (election, heartbeat, etc.)
    while (true) {
        try cluster.tick();
        try server.checkCommittedAcks(); // sends acks to clients if commit_index advanced
        std.time.sleep(50 * std.time.ns_per_ms);
    }
}

Examples

in the corresponding directory