db-19 — ZAB (ZooKeeper Atomic Broadcast)

This lab implements ZAB — the atomic broadcast protocol that drives Apache ZooKeeper — in Rust, Go, and C++, all three producing a byte-identical sha256 of a canonical cluster dump for any (seed, nodes, rounds, proposals, partition) configuration. It inherits the deterministic-simulator discipline of db-16 and db-17: same splitmix64 seeding, same (delivery_time, sender, seq) heap tie-break, same "sorted iteration on the wire" rule.

Where db-17 Raft taught you that one consensus algorithm can be pinned down to a single byte sequence across three languages, db-19 ZAB does the same exercise for a different algorithm with a meaningfully different recovery story: an explicit Discovery / Synchronization phase between leader election and steady-state broadcast, and a transaction identifier (zxid) that pairs an epoch with a counter rather than Raft's single monotonic term + index.


What is it?

ZAB (Reed & Junqueira, LADIS 2008; Junqueira, Reed & Serafini, DSN 2011) is the primary-backup atomic broadcast protocol that ZooKeeper uses to keep its replicated state machine consistent. It is not a generic consensus library; it was designed specifically for ZooKeeper's workload: a small, well-known cluster (3, 5, 7 nodes), a small in-memory state machine, and a strong primary-order guarantee that arbitrary client requests served by the same primary are delivered in the order the primary chose.

ZAB decomposes into four phases. Phase 0 is the original FastLeader- Election; later papers fold it into Phase 1.

  1. Leader election (FastLeaderElection). Every node starts in Looking. Each node broadcasts its current vote — initially for itself — carrying (last_zxid, server_id). On receiving a peer vote, a Looking node updates its own vote to that peer if (peer.last_zxid, peer.id) > (own.last_zxid, own.id) lexicographically, then re-broadcasts. When a quorum of voters agree on the same target, that node is elected: it transitions to Leading, everyone else who voted for it transitions to Following.

  2. Discovery. The new prospective leader picks a fresh new_epoch = max(accepted_epoch, current_epoch) + 1, sets its own accepted_epoch = new_epoch, and broadcasts NewEpoch(new_epoch). Each follower that accepts updates its accepted_epoch and replies with AckEpoch(current_epoch, last_zxid). Once a quorum of AckEpochs arrives, the leader knows the highest (current_epoch, last_zxid) in the surviving quorum — that node's history is the one that must survive.

  3. Synchronization. The leader bumps its current_epoch = new_epoch, resets the per-epoch counter, and broadcasts NewLeader(new_epoch, history) — the whole history that this epoch will start from. Followers replace their local history with the leader's, set current_epoch = new_epoch, and reply AckLeader(new_epoch). On a quorum of AckLeaders the leader declares itself synced and broadcasts Commit(last_zxid_of_history) so followers can advance last_committed past the synced tail.

  4. Broadcast (steady state). Now indistinguishable from Raft's replication phase, modulo names. For each client proposal, the leader assigns zxid = (current_epoch, ++counter), appends to its history, broadcasts Propose(txn). Followers append in zxid order and reply Ack(zxid). On Ack quorum the leader broadcasts Commit(zxid). Heartbeats are implemented as periodic re-sends of the last Commit (or NewLeader during pre-sync) — receiving one from the current leader refreshes the follower's election timer.

The simulator drives sim time forward in integer ticks; messages are scheduled into a heap with deterministic (delivery_time, sender, seq) order; an optional partition set drops messages in named directions.


Why does it matter?

  • ZAB is the algorithm under ZooKeeper — and ZooKeeper is the coordination kernel under Kafka (pre-KRaft), HBase, Hadoop YARN, Mesos, Cassandra's lightweight transactions (historically), Druid, and a long list of production systems. Knowing exactly how the NewLeader / Sync handshake works is the difference between operating ZooKeeper and understanding it.

  • ZAB and Raft cover the same problem with meaningfully different shapes. ZAB has an explicit recovery handshake that Raft folds into the AppendEntries consistency check; ZAB's zxid = (epoch, counter) is essentially Raft's (term, index), but the role each plays is subtly different. Implementing both back-to-back makes the contrast concrete instead of conceptual.

  • Three byte-identical implementations force the spec to be unambiguous. Anywhere ZAB "depends on the implementation" — election tie-break, vote rebroadcast on update, AckEpoch idempotency, heap scheduling — has to be pinned down. The cross-language sha256 makes drift loud.

  • Reproducible partitions. With a deterministic --partition s,d,... flag and a seeded simulator, you can replay the exact sequence of message drops, leader churn, and committed transactions that triggered a bug, on any machine, in any of the three languages.

  • Foundation for the rest of the track. db-20 distributed-kv will plug a consensus engine into a real key-value store; db-23 capstone composes the simulator harness across multiple replicated shards.


How does it work?

State (per node)

persistent  : current_epoch   : u32     # epoch of the leader we accepted into sync
              accepted_epoch  : u32     # epoch we've ack'd via NewEpoch (>= current_epoch)
              history         : Vec<Txn>
              last_committed  : ZxId

volatile    : role            : Looking | Following | Leading
              leader_id       : Option<u32>

election    : vote_target_id  : u32              # who we currently vote for
              vote_target_zxid: ZxId             # the (last_zxid) we voted on
              vote_view       : Map<voter_id, leader_id>   # tally

leader-only : pending_new_epoch : u32
              epoch_acks        : Set<follower_id>   # AckEpoch quorum tracker
              leader_acks       : Set<follower_id>   # AckLeader quorum tracker
              synced            : bool
              next_counter      : u32                # zxid counter under current_epoch
              ack_set           : Map<ZxId, Set<follower_id>>

timers      : election_deadline   : u64                # sim-time tick
              last_heartbeat_sent : u64

Election timer

reset_election_deadline(t):
    election_deadline = t + 150 + splitmix64(seed ^ node_id ^ t) % 150

A 150-tick base plus 150 ticks of seeded jitter avoids split-vote loops. Heartbeats fire every 50 ticks once a leader is synced.

FastLeaderElection (Phase 0)

on entering Looking:
    vote_target_id   = self.id
    vote_target_zxid = self.last_zxid()
    vote_view.clear(); vote_view[self.id] = self.id
    broadcast LookForLeader { self.id, self.last_zxid, current_epoch }
    broadcast Vote          { self.id, self.last_zxid, current_epoch, leader=self.id }
    check_election()

on Vote(voter, peer_zxid, _, leader_chosen) while Looking:
    if (peer_zxid, voter) > (vote_target_zxid, vote_target_id):
        vote_target_id   = voter
        vote_target_zxid = peer_zxid
        vote_view.clear(); vote_view[self.id] = voter
        broadcast Vote { self.id, self.last_zxid, current_epoch, leader=voter }
    vote_view[voter] = leader_chosen
    check_election()

check_election():
    target = vote_target_id
    if count(v in vote_view.values() : v == target) >= quorum:
        if target == self.id: become_leading()
        else:                 become_following(target)

LookForLeader is structurally a Vote for the sender: it lets a late-arriving node bootstrap a tally without waiting for the next broadcast cycle. Non-Looking peers reply to a Vote with their own current vote (which points at the live leader), so isolated nodes converge fast on heal.

Discovery & Synchronization (Phases 1–2)

become_leading():
    role = Leading
    pending_new_epoch = max(accepted_epoch, current_epoch) + 1
    accepted_epoch    = pending_new_epoch
    epoch_acks = {self.id}
    broadcast NewEpoch(pending_new_epoch)
    try_finish_discovery()      # handles the n=1 case immediately

on NewEpoch(e) from L:
    if e > accepted_epoch:
        accepted_epoch = e
        if role != Following: become_following(L)
        reply AckEpoch(current_epoch, last_zxid)
    elif e == accepted_epoch:
        reply AckEpoch(current_epoch, last_zxid)   # idempotent
    reset_election_deadline()

on AckEpoch from F (only if Leading):
    epoch_acks += F
    try_finish_discovery()

try_finish_discovery():
    if |epoch_acks| < quorum: return
    current_epoch = pending_new_epoch
    next_counter  = 0
    leader_acks   = {self.id}
    broadcast NewLeader(current_epoch, history.clone())
    try_finish_sync()

on NewLeader(e, hist) from L:
    if e >= accepted_epoch:
        accepted_epoch = e
        current_epoch  = e
        history        = hist          # follower truncates / extends to leader's history
        if role != Following: become_following(L)
        reset_election_deadline()
        reply AckLeader(e)

on AckLeader(e) from F (only if Leading and e == current_epoch):
    leader_acks += F
    try_finish_sync()

try_finish_sync():
    if synced or |leader_acks| < quorum: return
    synced = true
    if last_zxid() > last_committed:
        last_committed = last_zxid()
        broadcast Commit(last_committed)

Broadcast (Phase 3)

propose(payload):
    require role == Leading and synced
    next_counter += 1
    zxid = (current_epoch, next_counter)
    history.push(Txn { zxid, payload })
    ack_set[zxid] = {self.id}
    broadcast Propose(Txn { zxid, payload })
    try_commit(zxid)                   # single-node case

on Propose(txn) from L (only if Following and L == leader_id):
    if txn.zxid > last_zxid():
        history.push(txn)
        reset_election_deadline()
        reply Ack(txn.zxid)

on Ack(zxid) from F (only if Leading):
    ack_set[zxid] += F
    try_commit(zxid)

try_commit(zxid):
    if zxid <= last_committed: return
    if |ack_set[zxid]| >= quorum:
        last_committed = zxid
        broadcast Commit(zxid)

on Commit(zxid) from L:
    if L is current leader:
        reset_election_deadline()
    if last_committed < zxid <= last_zxid():
        last_committed = zxid

Simulator loop (per tick t in 0..rounds)

1. enqueue scheduled proposals : if t == schedule[i], push payload onto pending
2. inject pending into leader  : pick (Leading and synced, lowest id); call propose
3. deliver in-flight           : pop heap entries with delivery_time <= t
4. tick all nodes              : iterate in ascending id; on_tick may fire election or heartbeat

Proposal schedule: schedule[i] = (i+1) * rounds / (K+1) for i in 0..K (integer division). Each payload is the byte string "zab-<i>" (plain decimal, no padding). Deterministic, evenly spread, and independent of cluster behaviour.

Wire format (Rpc)

Nine variants. The simulator never serializes RPCs — it passes them as typed values in memory. The only bytes that ever get hashed are the canonical dump.

LookForLeader { src_id, last_zxid, peer_epoch }
Vote          { voter_id, last_zxid, peer_epoch, leader_id }
NewEpoch      { new_epoch }
AckEpoch      { current_epoch, last_zxid }
NewLeader     { new_epoch, history: Vec<Txn> }
AckLeader     { new_epoch }
Propose       { txn: Txn }
Ack           { zxid }
Commit        { zxid }

Canonical dump format

file := magic[8 = "DSEZAB01"] u32_le(node_count) node*

node := u32_le id
        u8     role               # Looking=0, Following=1, Leading=2
        u32_le current_epoch
        u32_le accepted_epoch
        u32_le last_zxid.epoch
        u32_le last_zxid.counter
        u32_le last_committed.epoch
        u32_le last_committed.counter
        u32_le history_len
        txn * history_len

txn  := u32_le zxid.epoch
        u32_le zxid.counter
        u32_le payload_len
        u8 payload[payload_len]

Nodes appear in ascending id order. All multi-byte numbers are little-endian. The dump is hashed with SHA-256; the lowercase hex digest is what zabctl prints (no trailing newline).

Primary-order property

ZAB's defining guarantee, distinct from generic atomic broadcast, is primary order: if a primary (leader) broadcasts proposals p then q in that order, every follower that delivers both delivers p before q. This is enforced trivially by the leader's monotonically increasing next_counter and the follower's txn.zxid > last_zxid() gate on Propose. Primary order is a per-primary property; across leadership changes the guarantee is provided by the Discovery / Sync handshake that explicitly chooses the surviving primary's history.

Cross-language invariants

InvariantWhy it matters
splitmix64 constants 0x9E3779B97F4A7C15, 0xBF58476D1CE4E7B5, 0x94D049BB133111EBidentical PRNG output
election_deadline = t + 150 + splitmix64(seed ^ node_id ^ t) % 150identical election firing times
delivery_delay = 1 + splitmix64(seed ^ src ^ dst ^ t) % 3identical message scheduling
heap order (delivery_time, sender, seq); seq global monotonicidentical delivery sequence
peers iterated in ascending id (BTreeMap / std::map / explicit loop)identical broadcast order
vote_view keyed by voter id, iterated in ascending ididentical election tally
election tie-break: lexicographic (last_zxid, voter_id)identical leader choice
leader-pick for proposal injection: Leading && synced && min ididentical client routing
proposal schedule (i+1)*rounds/(K+1); payload "zab-<i>" unpadded decimalidentical pending queue contents
propose() calls try_commit()identical last_committed for n=1
Role enum order Looking=0, Following=1, Leading=2identical dump bytes
dump magic "DSEZAB01"; all integers u32 LE; nodes in ascending ididentical dump bytes

If any one of these drifts, scripts/cross_test.sh will fail and cmp -l on the two raw dumps will print the byte offset of the first divergence.


Files

  • src/rust/zab19 crate + zabctl binary.
  • src/go/ — module github.com/10xdev/dse/db19 + cmd/zabctl.
  • src/cpp/db19_lib static library + zabctl binary + test_db19.
  • scripts/verify.sh — runs the unit tests for all three.
  • scripts/cross_test.sh — proves the three binaries produce byte-identical canonical dumps for six seeded scenarios.

See docs/ for the long-form write-up and steps/ for the staged implementation path.