db-19 — ZAB (ZooKeeper Atomic Broadcast)
This lab implements ZAB — the atomic broadcast protocol that drives
Apache ZooKeeper — in Rust, Go, and C++, all three producing a
byte-identical sha256 of a canonical cluster dump for any
(seed, nodes, rounds, proposals, partition) configuration. It inherits
the deterministic-simulator discipline of db-16 and db-17: same
splitmix64 seeding, same (delivery_time, sender, seq) heap tie-break,
same "sorted iteration on the wire" rule.
Where db-17 Raft taught you that one consensus algorithm can be pinned
down to a single byte sequence across three languages, db-19 ZAB does
the same exercise for a different algorithm with a meaningfully
different recovery story: an explicit Discovery / Synchronization phase
between leader election and steady-state broadcast, and a transaction
identifier (zxid) that pairs an epoch with a counter rather than
Raft's single monotonic term + index.
What is it?
ZAB (Reed & Junqueira, LADIS 2008; Junqueira, Reed & Serafini, DSN 2011) is the primary-backup atomic broadcast protocol that ZooKeeper uses to keep its replicated state machine consistent. It is not a generic consensus library; it was designed specifically for ZooKeeper's workload: a small, well-known cluster (3, 5, 7 nodes), a small in-memory state machine, and a strong primary-order guarantee that arbitrary client requests served by the same primary are delivered in the order the primary chose.
ZAB decomposes into four phases. Phase 0 is the original FastLeader- Election; later papers fold it into Phase 1.
-
Leader election (FastLeaderElection). Every node starts in
Looking. Each node broadcasts its current vote — initially for itself — carrying(last_zxid, server_id). On receiving a peer vote, a Looking node updates its own vote to that peer if(peer.last_zxid, peer.id) > (own.last_zxid, own.id)lexicographically, then re-broadcasts. When a quorum of voters agree on the same target, that node is elected: it transitions toLeading, everyone else who voted for it transitions toFollowing. -
Discovery. The new prospective leader picks a fresh
new_epoch = max(accepted_epoch, current_epoch) + 1, sets its ownaccepted_epoch = new_epoch, and broadcastsNewEpoch(new_epoch). Each follower that accepts updates itsaccepted_epochand replies withAckEpoch(current_epoch, last_zxid). Once a quorum of AckEpochs arrives, the leader knows the highest(current_epoch, last_zxid)in the surviving quorum — that node's history is the one that must survive. -
Synchronization. The leader bumps its
current_epoch = new_epoch, resets the per-epoch counter, and broadcastsNewLeader(new_epoch, history)— the whole history that this epoch will start from. Followers replace their local history with the leader's, setcurrent_epoch = new_epoch, and replyAckLeader(new_epoch). On a quorum of AckLeaders the leader declares itself synced and broadcastsCommit(last_zxid_of_history)so followers can advancelast_committedpast the synced tail. -
Broadcast (steady state). Now indistinguishable from Raft's replication phase, modulo names. For each client proposal, the leader assigns
zxid = (current_epoch, ++counter), appends to its history, broadcastsPropose(txn). Followers append in zxid order and replyAck(zxid). On Ack quorum the leader broadcastsCommit(zxid). Heartbeats are implemented as periodic re-sends of the lastCommit(orNewLeaderduring pre-sync) — receiving one from the current leader refreshes the follower's election timer.
The simulator drives sim time forward in integer ticks; messages are
scheduled into a heap with deterministic (delivery_time, sender, seq)
order; an optional partition set drops messages in named directions.
Why does it matter?
-
ZAB is the algorithm under ZooKeeper — and ZooKeeper is the coordination kernel under Kafka (pre-KRaft), HBase, Hadoop YARN, Mesos, Cassandra's lightweight transactions (historically), Druid, and a long list of production systems. Knowing exactly how the NewLeader / Sync handshake works is the difference between operating ZooKeeper and understanding it.
-
ZAB and Raft cover the same problem with meaningfully different shapes. ZAB has an explicit recovery handshake that Raft folds into the AppendEntries consistency check; ZAB's
zxid = (epoch, counter)is essentially Raft's(term, index), but the role each plays is subtly different. Implementing both back-to-back makes the contrast concrete instead of conceptual. -
Three byte-identical implementations force the spec to be unambiguous. Anywhere ZAB "depends on the implementation" — election tie-break, vote rebroadcast on update, AckEpoch idempotency, heap scheduling — has to be pinned down. The cross-language sha256 makes drift loud.
-
Reproducible partitions. With a deterministic
--partition s,d,...flag and a seeded simulator, you can replay the exact sequence of message drops, leader churn, and committed transactions that triggered a bug, on any machine, in any of the three languages. -
Foundation for the rest of the track. db-20 distributed-kv will plug a consensus engine into a real key-value store; db-23 capstone composes the simulator harness across multiple replicated shards.
How does it work?
State (per node)
persistent : current_epoch : u32 # epoch of the leader we accepted into sync
accepted_epoch : u32 # epoch we've ack'd via NewEpoch (>= current_epoch)
history : Vec<Txn>
last_committed : ZxId
volatile : role : Looking | Following | Leading
leader_id : Option<u32>
election : vote_target_id : u32 # who we currently vote for
vote_target_zxid: ZxId # the (last_zxid) we voted on
vote_view : Map<voter_id, leader_id> # tally
leader-only : pending_new_epoch : u32
epoch_acks : Set<follower_id> # AckEpoch quorum tracker
leader_acks : Set<follower_id> # AckLeader quorum tracker
synced : bool
next_counter : u32 # zxid counter under current_epoch
ack_set : Map<ZxId, Set<follower_id>>
timers : election_deadline : u64 # sim-time tick
last_heartbeat_sent : u64
Election timer
reset_election_deadline(t):
election_deadline = t + 150 + splitmix64(seed ^ node_id ^ t) % 150
A 150-tick base plus 150 ticks of seeded jitter avoids split-vote loops. Heartbeats fire every 50 ticks once a leader is synced.
FastLeaderElection (Phase 0)
on entering Looking:
vote_target_id = self.id
vote_target_zxid = self.last_zxid()
vote_view.clear(); vote_view[self.id] = self.id
broadcast LookForLeader { self.id, self.last_zxid, current_epoch }
broadcast Vote { self.id, self.last_zxid, current_epoch, leader=self.id }
check_election()
on Vote(voter, peer_zxid, _, leader_chosen) while Looking:
if (peer_zxid, voter) > (vote_target_zxid, vote_target_id):
vote_target_id = voter
vote_target_zxid = peer_zxid
vote_view.clear(); vote_view[self.id] = voter
broadcast Vote { self.id, self.last_zxid, current_epoch, leader=voter }
vote_view[voter] = leader_chosen
check_election()
check_election():
target = vote_target_id
if count(v in vote_view.values() : v == target) >= quorum:
if target == self.id: become_leading()
else: become_following(target)
LookForLeader is structurally a Vote for the sender: it lets a
late-arriving node bootstrap a tally without waiting for the next
broadcast cycle. Non-Looking peers reply to a Vote with their own
current vote (which points at the live leader), so isolated nodes
converge fast on heal.
Discovery & Synchronization (Phases 1–2)
become_leading():
role = Leading
pending_new_epoch = max(accepted_epoch, current_epoch) + 1
accepted_epoch = pending_new_epoch
epoch_acks = {self.id}
broadcast NewEpoch(pending_new_epoch)
try_finish_discovery() # handles the n=1 case immediately
on NewEpoch(e) from L:
if e > accepted_epoch:
accepted_epoch = e
if role != Following: become_following(L)
reply AckEpoch(current_epoch, last_zxid)
elif e == accepted_epoch:
reply AckEpoch(current_epoch, last_zxid) # idempotent
reset_election_deadline()
on AckEpoch from F (only if Leading):
epoch_acks += F
try_finish_discovery()
try_finish_discovery():
if |epoch_acks| < quorum: return
current_epoch = pending_new_epoch
next_counter = 0
leader_acks = {self.id}
broadcast NewLeader(current_epoch, history.clone())
try_finish_sync()
on NewLeader(e, hist) from L:
if e >= accepted_epoch:
accepted_epoch = e
current_epoch = e
history = hist # follower truncates / extends to leader's history
if role != Following: become_following(L)
reset_election_deadline()
reply AckLeader(e)
on AckLeader(e) from F (only if Leading and e == current_epoch):
leader_acks += F
try_finish_sync()
try_finish_sync():
if synced or |leader_acks| < quorum: return
synced = true
if last_zxid() > last_committed:
last_committed = last_zxid()
broadcast Commit(last_committed)
Broadcast (Phase 3)
propose(payload):
require role == Leading and synced
next_counter += 1
zxid = (current_epoch, next_counter)
history.push(Txn { zxid, payload })
ack_set[zxid] = {self.id}
broadcast Propose(Txn { zxid, payload })
try_commit(zxid) # single-node case
on Propose(txn) from L (only if Following and L == leader_id):
if txn.zxid > last_zxid():
history.push(txn)
reset_election_deadline()
reply Ack(txn.zxid)
on Ack(zxid) from F (only if Leading):
ack_set[zxid] += F
try_commit(zxid)
try_commit(zxid):
if zxid <= last_committed: return
if |ack_set[zxid]| >= quorum:
last_committed = zxid
broadcast Commit(zxid)
on Commit(zxid) from L:
if L is current leader:
reset_election_deadline()
if last_committed < zxid <= last_zxid():
last_committed = zxid
Simulator loop (per tick t in 0..rounds)
1. enqueue scheduled proposals : if t == schedule[i], push payload onto pending
2. inject pending into leader : pick (Leading and synced, lowest id); call propose
3. deliver in-flight : pop heap entries with delivery_time <= t
4. tick all nodes : iterate in ascending id; on_tick may fire election or heartbeat
Proposal schedule: schedule[i] = (i+1) * rounds / (K+1) for i in 0..K (integer division). Each payload is the byte string "zab-<i>"
(plain decimal, no padding). Deterministic, evenly spread, and
independent of cluster behaviour.
Wire format (Rpc)
Nine variants. The simulator never serializes RPCs — it passes them as typed values in memory. The only bytes that ever get hashed are the canonical dump.
LookForLeader { src_id, last_zxid, peer_epoch }
Vote { voter_id, last_zxid, peer_epoch, leader_id }
NewEpoch { new_epoch }
AckEpoch { current_epoch, last_zxid }
NewLeader { new_epoch, history: Vec<Txn> }
AckLeader { new_epoch }
Propose { txn: Txn }
Ack { zxid }
Commit { zxid }
Canonical dump format
file := magic[8 = "DSEZAB01"] u32_le(node_count) node*
node := u32_le id
u8 role # Looking=0, Following=1, Leading=2
u32_le current_epoch
u32_le accepted_epoch
u32_le last_zxid.epoch
u32_le last_zxid.counter
u32_le last_committed.epoch
u32_le last_committed.counter
u32_le history_len
txn * history_len
txn := u32_le zxid.epoch
u32_le zxid.counter
u32_le payload_len
u8 payload[payload_len]
Nodes appear in ascending id order. All multi-byte numbers are
little-endian. The dump is hashed with SHA-256; the lowercase hex
digest is what zabctl prints (no trailing newline).
Primary-order property
ZAB's defining guarantee, distinct from generic atomic broadcast, is
primary order: if a primary (leader) broadcasts proposals p then
q in that order, every follower that delivers both delivers p
before q. This is enforced trivially by the leader's monotonically
increasing next_counter and the follower's txn.zxid > last_zxid()
gate on Propose. Primary order is a per-primary property; across
leadership changes the guarantee is provided by the Discovery / Sync
handshake that explicitly chooses the surviving primary's history.
Cross-language invariants
| Invariant | Why it matters |
|---|---|
splitmix64 constants 0x9E3779B97F4A7C15, 0xBF58476D1CE4E7B5, 0x94D049BB133111EB | identical PRNG output |
election_deadline = t + 150 + splitmix64(seed ^ node_id ^ t) % 150 | identical election firing times |
delivery_delay = 1 + splitmix64(seed ^ src ^ dst ^ t) % 3 | identical message scheduling |
heap order (delivery_time, sender, seq); seq global monotonic | identical delivery sequence |
peers iterated in ascending id (BTreeMap / std::map / explicit loop) | identical broadcast order |
vote_view keyed by voter id, iterated in ascending id | identical election tally |
election tie-break: lexicographic (last_zxid, voter_id) | identical leader choice |
leader-pick for proposal injection: Leading && synced && min id | identical client routing |
proposal schedule (i+1)*rounds/(K+1); payload "zab-<i>" unpadded decimal | identical pending queue contents |
propose() calls try_commit() | identical last_committed for n=1 |
Role enum order Looking=0, Following=1, Leading=2 | identical dump bytes |
dump magic "DSEZAB01"; all integers u32 LE; nodes in ascending id | identical dump bytes |
If any one of these drifts, scripts/cross_test.sh will fail and
cmp -l on the two raw dumps will print the byte offset of the first
divergence.
Files
src/rust/—zab19crate +zabctlbinary.src/go/— modulegithub.com/10xdev/dse/db19+cmd/zabctl.src/cpp/—db19_libstatic library +zabctlbinary +test_db19.scripts/verify.sh— runs the unit tests for all three.scripts/cross_test.sh— proves the three binaries produce byte-identical canonical dumps for six seeded scenarios.
See docs/ for the long-form write-up and steps/ for the staged
implementation path.