SECRET OF CSS

Paxos


The Paxos algorithm was developed by Leslie Lamport,
published in his 1998 paper The
Part-Time Parliament
. Paxos works in three phases to make sure
multiple nodes agree on the same value in spite of partial network or
node failures. The first two phases act to build consensus around a
value, the last phase then communicates that consensus to the remaining
replicas.

In the first phase (called prepare phase), the node
proposing a value (called a proposer) contacts all the nodes
in the cluster (called acceptors) and asks them if they will
promise to consider its value. Once a quorum of acceptors return such a
promise, the proposer moves onto the second phase. In the second phase
(called the accept phase) the proposer sends out a proposed
value, if a quorum of nodes accepts this value then the value is
chosen. In the final phase (called the commit
phase
), the proposer can then commit the chosen value to all the
nodes in the cluster.

Flow of the Protocol

Paxos is a difficult protocol to understand. We’ll start by showing
an example of a typical flow of the protocol, and then dig into some of
the details of how it works. We intend this explanation to provide an
intuitive sense of how the protocol works, but not as a comprehensive
description to base an implementation upon.

Here’s a very brief summary of the protocol.

Proposer

Acceptor

Obtains the next generation number from a Generation Clock. Sends a prepare request with this generation
number to all acceptors.

If the generation number of the prepare request is later than
its promised generation variable, it updates its promise generation
with this later value and returns a promise response. If it has already
accepted a proposal it returns this proposal.

When it receives promises from quorum of acceptors, it looks to
see if any of these responses contain accepted values. If so it changes its
own proposed value to that of the returned proposal with the highest
generation number. Sends accept requests to all acceptors with its generation number and
proposed value.

If the generation number of the accept request is later than
its promised generation variable it stores the proposal as its accepted
proposal and responds that it has accepted the request.

When it receives a successful response from a quorum of
acceptors, it records the value as chosen and sends commit messages to
all nodes.

Those are basic rules for paxos, but it’s very hard to understand how
they combine for an effective behavior. So here’s an example to show how
this works.

mfpaxos initial requests

Consider a cluster of five nodes: Athens, Byzantium, Cyrene,
Delphi, and Ephesus. A client contacts the Athens node, requesting to set
the name to “alice”. The Athens node now needs to initiate a Paxos
interaction to see if all the nodes will agree to this change. Athens is
called the proposer, in that Athens will propose to all the other nodes that
the name of the cluster become “alice”. All the nodes in the cluster
(including Athens) are “acceptors”, meaning they are capable of accepting
proposals.

At the same time that Athens is proposing
“alice”, the node Ephesus gets a request to set the name to “elanor”. This
makes Ephesus also be a proposer.

mfpaxos initial prepare

In the prepare phase the proposers begin by sending some prepare
requests, which all include a generation number. Since Paxos is intended to
avoid single points of failure, we don’t take this from a single generation
clock. Instead each node maintains its own generation clock where it
combines a generation number with a node ID. The node ID is used to break
ties, so [2,a] > [1,e] > [1,a]. Each acceptor records the
latest promise it’s seen so far.

Node Athens Byzantium Cyrene Delphi Ephesus
promised generation 1,a 1,a 0 1,e 1,e
accepted value none none none none none

Since they haven’t seen any requests before this, they all return a
promise to the calling proposer. We call the returned value a “promise”
because it indicates that the acceptor promises to not consider any messages
with an earlier generation clock than the promised one.

mfpaxos a prepare c

Athens sends its prepare message to Cyrene. When it receives a promise in
return, this means it has now got promises from from three of the five nodes, which
represents a Quorum. Athens now shifts from sending
prepare messages to sending accept messages.

It is possible that Athens fails to receive a promise
from a majority of the cluster nodes. In that case Athens
retries the prepare request by incrementing the generation clock.

Node Athens Byzantium Cyrene Delphi Ephesus
promised generation 1,a 1,a 1,a 1,e 1,e
accepted value none none none none none
mfpaxos accept ac

Athens now starts sending accept messages, containing the generation and
the proposed value. Athens and Byzantium accept the proposal.

Node Athens Byzantium Cyrene Delphi Ephesus
promised generation 1,a 1,a 1,a 1,e 1,e
accepted value alice alice none none none
mfpaxos e prepare c

Ephesus now sends a prepare message to Cyrene. Cyrene had sent a promise to
Athens, but Ephesus’s request has a higher generation, so it takes
precedence. Cyrene sends back a promise to Ephesus.

mfpaxos c refuses a

Cyrene now gets an accept request from Athens but rejects it as the
generation number is behind its promise to Ephesus.

Node Athens Byzantium Cyrene Delphi Ephesus
promised generation 1,a 1,a 1,e 1,e 1,e
accepted value alice alice none none none
mfpaxos e send accepts

Ephesus has now got a quorum from its prepare messages, so can move on to
sending accepts. It sends accepts to itself and to Delphi but then crashes
before it can send any more accepts.

Node Athens Byzantium Cyrene Delphi Ephesus
promised generation 1,a 1,a 1,e 1,e 1,e
accepted value alice alice none elanor elanor

Meanwhile, Athens has to deal with the rejection of its accept request from
Cyrene. This indicates that its quorum is no longer promised to it and thus
its proposal will fail. This will always happen to a proposer who loses its
initial quorum like this; for another proposer to achieve quorum at least
one member of the first proposer’s quorum will defect.

In a situation with a simple two phase commit, we would then expect
Ephesus to just go on and get its value chosen, but such a scheme would now
be in trouble since Ephesus has crashed. If it had a lock on a quorum of
acceptors, its crash would deadlock the whole proposal process. Paxos,
however, expects this kind of thing to happen, so Athens will make another
try, this time with a higher generation.

mfpaxos a 2nd prep

It sends prepare messages again, but this time with a higher generation
number. As with the first round, it gets back a trio of promises, but with
an important difference. Athens already accepted “alice”
earlier, and Delphi had accepted “elanor”. Both of these acceptors return a
promise, but also the value that they already accepted, together with the
generation number of that accepted proposal. When they return that
value, they update their promised generation to [2,a] to reflect the
promise they made to Athens.

Node Athens Byzantium Cyrene Delphi Ephesus
promised generation 2,a 1,a 2,a 2,a 1,e
accepted value alice alice none elanor elanor

Athens, with a quorum, must now move onto the accept phase, but
it must propose the already-accepted value with the highest generation,
which is “elanor”, who was accepted by Delphi with a generation of [1,e], which is
greater than Athens’s acceptance of “alice” with [1,a].

mfpaxos a 2nd accept

Athens starts to send out accept requests, but now with “elanor” and its current
generation. Athens sends an accept request to itself, which is accepted. This is a
crucial acceptance because now there are three
nodes accepting “elanor”, which is a quorum for “elanor”, therefore we can consider
“elanor” to be the chosen value.

Node Athens Byzantium Cyrene Delphi Ephesus
promised generation 2,a 1,a 2,a 2,a 1,e
accepted value elanor alice none elanor elanor

But although “elanor” is now the chosen value, nobody is yet aware of it.
Within the accept stage Athens only knows itself having “elanor” as the
value, which isn’t a quorum and Ephesus is offline. All Athens needs to do is
have a couple more accept requests accepted and it will be able to commit.
But now Athens crashes.

At this point Athens and Ephesus have now crashed. But the cluster still
has a quorum of nodes operating, so they should be able to keep working, and
indeed by following the protocol they can discover that “elanor” is the
chosen value.

mfpaxos c prepare

Cyrene gets a request to set the name to “carol”, so it becomes a
proposer. It’s seen generation [2,a] so it kicks off a prepare phase with
generation [3,c]. While it wishes to propose “carol” as the name, for
the moment it’s just issuing prepare requests.

Cyrene sends prepare messages to the remaining nodes in the cluster. As
with Athens’s earlier prepare phase, Cyrene gets accepted values back, so
“carol” never gets proposed as a value. As before, Delphi’s “elanor” is
later than Byzantium’s “alice”, so Cyrene starts an accept phase with
“elanor” and [3,c].

Node Athens Byzantium Cyrene Delphi Ephesus
promised generation 2,a 3,c 3,c 3,c 1,e
accepted value elanor alice none elanor elanor
mfpaxos c accept

While I could continue to crash and wake up nodes, it’s clear now that
“elanor” will win out. As long as a quorum of nodes are up, at least one of
them will have “elanor” as its value, and any node attempting a prepare will
have to contact one node that’s accepted “elanor” in order to get a quorum
for its prepare phase. So we’ll finish with Cyrene sending out commits.

mfpaxos c commit

At some point Athens and Ephesus will come back online and they will
discover what the quorum has chosen.

An example key-value store

The Paxos protocol explained here, builds consensus on a single value
(often called as single-decree Paxos).
Most practical implementations used in mainstream products like
Cosmos DB or Spanner
use a modification of paxos called multi-paxos which is implemented
as a Replicated Log.

But a simple key-value store can be built using basic Paxos. [cassandra]
uses basic Paxos in a similar way to implement it’s light-weight transactions.

The key-value store maintains Paxos instance per key.

class PaxosPerKeyStore…

  int serverId;
  public PaxosPerKeyStore(int serverId) {
      this.serverId = serverId;
  }

  Map<String, Acceptor> key2Acceptors = new HashMap<String, Acceptor>();
  List<PaxosPerKeyStore> peers;

The Acceptor stores the promisedGeneration, acceptedGeneration
and acceptedValue.

class Acceptor…

  public class Acceptor {
      MonotonicId promisedGeneration = MonotonicId.empty();
  
      Optional<MonotonicId> acceptedGeneration = Optional.empty();
      Optional<Command> acceptedValue = Optional.empty();
  
      Optional<Command> committedValue = Optional.empty();
      Optional<MonotonicId> committedGeneration = Optional.empty();
  
      public AcceptorState state = AcceptorState.NEW;
      private BiConsumer<Acceptor, Command> kvStore;

When the key and value is put in the kv store, it runs the Paxos protocol.

class PaxosPerKeyStore…

  int maxKnownPaxosRoundId = 1;
  int maxAttempts = 4;
  public void put(String key, String defaultProposal) {
      int attempts = 0;
      while(attempts <= maxAttempts) {
          attempts++;
          MonotonicId requestId = new MonotonicId(maxKnownPaxosRoundId++, serverId);
          SetValueCommand setValueCommand = new SetValueCommand(key, defaultProposal);

          if (runPaxos(key, requestId, setValueCommand)) {
              return;
          }

          Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), MILLISECONDS);
          logger.warn("Experienced Paxos contention. Attempting with higher generation");
      }
      throw new WriteTimeoutException(attempts);
  }

  private boolean runPaxos(String key, MonotonicId generation, Command initialValue) {
      List<Acceptor> allAcceptors = getAcceptorInstancesFor(key);
      List<PrepareResponse> prepareResponses = sendPrepare(generation, allAcceptors);
      if (isQuorumPrepared(prepareResponses)) {
          Command proposedValue = getValue(prepareResponses, initialValue);
          if (sendAccept(generation, proposedValue, allAcceptors)) {
              sendCommit(generation, proposedValue, allAcceptors);
          }
          if (proposedValue == initialValue) {
              return true;
          }
      }
      return false;
  }

  public Command getValue(List<PrepareResponse> prepareResponses, Command initialValue) {
      PrepareResponse mostRecentAcceptedValue = getMostRecentAcceptedValue(prepareResponses);
      Command proposedValue
              = mostRecentAcceptedValue.acceptedValue.isEmpty() ?
              initialValue : mostRecentAcceptedValue.acceptedValue.get();
      return proposedValue;
  }

  private PrepareResponse getMostRecentAcceptedValue(List<PrepareResponse> prepareResponses) {
      return prepareResponses.stream().max(Comparator.comparing(r -> r.acceptedGeneration.orElse(MonotonicId.empty()))).get();
  }

class Acceptor…

  public PrepareResponse prepare(MonotonicId generation) {

      if (promisedGeneration.isAfter(generation)) {
          return new PrepareResponse(false, acceptedValue, acceptedGeneration, committedGeneration, committedValue);
      }
      promisedGeneration = generation;
      state = AcceptorState.PROMISED;
      return new PrepareResponse(true, acceptedValue, acceptedGeneration, committedGeneration, committedValue);

  }

class Acceptor…

  public boolean accept(MonotonicId generation, Command value) {
      if (generation.equals(promisedGeneration) || generation.isAfter(promisedGeneration)) {
          this.promisedGeneration = generation;
          this.acceptedGeneration = Optional.of(generation);
          this.acceptedValue = Optional.of(value);
          return true;
      }
      state = AcceptorState.ACCEPTED;
      return false;
  }

The value is stored in the kvstore only when it can be successfully committed.

class Acceptor…

  public void commit(MonotonicId generation, Command value) {
      committedGeneration = Optional.of(generation);
      committedValue = Optional.of(value);
      state = AcceptorState.COMMITTED;
      kvStore.accept(this, value);
  }

class PaxosPerKeyStore…

  private void accept(Acceptor acceptor, Command command) {
      if (command instanceof SetValueCommand) {
          SetValueCommand setValueCommand = (SetValueCommand) command;
          kv.put(setValueCommand.getKey(), setValueCommand.getValue());
      }
      acceptor.resetPaxosState();
  }

The paxos state needs to be persisted.
It can be easily done by using a Write-Ahead Log.

Handling multiple values.

It is important to note that Paxos is specified and proven to work on single value.
So handling multiple values with the single value Paxos protocol needs to be done
outside of the protocol specification. One alternative is to reset the state,
and store committed values separately to make sure they are not lost.

class Acceptor…

  public void resetPaxosState() {
      //This implementation has issues if committed values are not stored
      //and handled separately in the prepare phase.
      //See Cassandra implementation for details.
      //https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/db/SystemKeyspace.java#L1232
      promisedGeneration = MonotonicId.empty();
      acceptedGeneration = Optional.empty();
      acceptedValue = Optional.empty();
  }

There is an alternative, as suggested in [gryadka], which slightly modifies the
basic Paxos to allow setting multiple values.
This need for executing steps beyond the basic algorithm
is the reason that in practice Replicated Log is preferred.

Reading the values

Paxos relies on the prepare phase to detect any uncommitted values.
So if basic Paxos is used to implement a key-value store as shown above,
the read operation also needs to run the full Paxos algorithm.

class PaxosPerKeyStore…

  public String get(String key) {
      int attempts = 0;
      while(attempts <= maxAttempts) {
          attempts++;
          MonotonicId requestId = new MonotonicId(maxKnownPaxosRoundId++, serverId);
          Command getValueCommand = new NoOpCommand(key);
          if (runPaxos(key, requestId, getValueCommand)) {
              return kv.get(key);
          }

          Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), MILLISECONDS);
          logger.warn("Experienced Paxos contention. Attempting with higher generation");

      }
      throw new WriteTimeoutException(attempts);
  }



News Credit

%d bloggers like this: