(by Unmesh Joshi)
The Limits of a Single Server. (Basics)
Separate Business Logic and Data Layer. (Talks two-tier, but doesn't talk about the rationale why, other than "This way, most of the application logic executes on the separate server utilizing a separate network, CPU, memory, and disk. This architecture works particularly well if most users can be served from caches put at different layers in the architecture. It makes sure that only a portion of all requests need to reach the database layer." Not impressed so far.)
Partitioning Data. (Basically, "partition data", and that's it! No other explanation.)
A Look at Failures.
Replication: Masking Failures.
Defining the Term "Distributed System." "A distributed system is a software architecture that consists of multiple interconnected nodes or servers working together to achieve a common goal. These nodes communicate with each other over a network and coordinate their actions to provide a unified and scalable computing environment. In a distributed system, the workload is distributed across multiple servers, allowing for parallel processing and improved performance. The system is designed to handle large amounts of data and accommodate a high number of concurrent users. Most importantly, it offers fault tolerance and resilience by replicating data and services across multiple nodes, ensuring that the system remains operational even in the presence of failures or network disruptions."
The Patterns Approach. (Focuses on "recurring solution" and--ugh--completely ignores consequences.)
(Simple example, four key-value pairs, replicated on three servers.)
Keeping Data Resilient on a Single Server.
Competing Updates.
Dealing with the Leader Failing.
Multiple Failures Need a Generation Clock.
Log Entires Cannot Be Committed until They Are Accepted by a Majority Quorum.
Followers Commit Based on a High-Water Mark.
Leaders Use a Series of Queues to Remain Responsive to Many Clients.
Followers Can Handle Read Requests to Reduce Load on the Leader.
A Large Amount of Data Can Be Partitioned over Multiple Nodes.
Partitions Can Be Replicated for Resilience.
A Minimum of Two Phases Are Needed to Maintain Consistency across Partitions.
In Distributed Computing, Ordering Cannot Depend on System Timestamps.
A Consistent Core Can Manage the Membership of a Data Cluster.
Gossip Dissemination for Decentralized Cluster Management.
Provide durability guarantee without the storage data structures to be flushed to disk, by persisting every state change as a command to the append only log.
Also known as: Commit Log
Problem Strong durability guarantee is needed even in the case of the server machines storing data failing. Once a server agrees to perform an action, it should do so even if it fails and restarts losing all of its in-memory state.
Solution Store each state change as a command in a file on a hard disk. A single log is maintained for each server process. It is sequentially appended. A single log, appended sequentially, simplifies handling of logs at restart and the subsequent online operations (when the log is appended with new commands). Each log entry is given a unique identifier. This identifier helps in implementing certain other operations on the log, such as Segmented Log, cleaning the log with Low-Water Mark, and so on. The log updates can be implemented with Singular Update Queue.
Split log into multiple smaller files instead of a single large file for easier operations.
Problem A single log file can grow and become a performance bottleneck as it is read at startup. Older logs are cleaned up periodically, but doing cleanup operations on a single huge file is difficult to implement.
Solution Single log is split into multiple segments. Log files are rolled after a specified size limit. With log segmentation, there needs to be an easy way to map logical log offsets (or log sequence numbers) to the log segment files. This can be done in two ways: (1) Each log segment name is generated by some known prefix and a base offset (or a log sequence number). (2) Each log sequence number is divided into two parts, the name of the file and the transaction offset. With this information, the read operation is two steps. For a given offset (or transaction ID), the log segment is identified and all the log records are read from subsequent log segments.
An index in the write-ahead log showing which portion of the log can be discarded.
Problem A write-ahead log maintains every update to the persistent store. It can grow indefinitely over time. Segmented Log ensures smaller files, but the total disk storage can grow indefinitely if not checked.
Solution Have a mechanism to tell logging machinery which portion of the log can be safely discarded. The mechanism gives the lowest offset, or low-water mark, before which the logs can be discarded. Have a task running in the background, in a separate thread, which continuously checks which portion of the log can be discarded and deletes the files on the disk.
Have a single server to coordinate replication across a set of servers.
Problem To achieve fault tolerance in systems that manage data, the data needs to be replicated on multiple servers.
It’s also important to give some guarantee about consistency to clients. When data is updated on multiple servers, you need to decide when to make it visible to clients. Write and read Majority Quorum is not sufficient, as some failure scenarios can cause clients to see data inconsistently. Each individual server does not know about the state of data on the other servers in the quorum. It’s only when data is read from multiple servers, the inconsistencies can be resolved. In some cases, this is not enough. Stronger guarantees are needed about the data sent to clients.
Solution Select one server in the cluster as a leader. The leader is responsible for taking decisions on behalf of the entire cluster and propagating the decisions to all the other servers.
Every server at startup looks for an existing leader. If no leader is found, it triggers a leader election. The servers accept requests only after a leader is elected successfully. Only the leader handles the client requests. If a request is sent directly to a follower server, the follower can forward it to the leader server.
Leader Election
In smaller clusters of three to five nodes, like those systems that implement consensus, leader election can be done within the data cluster itself without depending on any external system. Leader election happens at server startup. Every server starts a leader election at startup and tries to elect a leader. The system does not accept any client requests unless a leader is elected. As explained in the Generation Clock pattern, every leader election also needs to update the generation number. A server can only be in one of the three states: Leader, Follower, or Looking For Leader (sometimes referred to as Candidate). HeartBeat mechanism is used to detect when an existing leader has failed, so that a new leader election can be started. State updates can be done without any hassle of manipulating synchronization and locking by using Singular Update Queue. New leader election is started by sending each of the peer servers a message requesting a vote.
Show a server is available by periodically sending a message to all the other servers.
Problem When multiple servers form a cluster, each server is responsible for storing some portion of the data, based on the partitioning and replication schemes used. Timely detection of server failures is important for taking corrective actions by making some other server responsible for handling requests for the data on a failed server.
Solution Periodically send a request to all the other servers indicating liveness of the sending server. Select the request interval to be more than the network round trip time between the servers. All the listening servers wait for the timeout interval, which is a multiple of the request interval. In general, timeout interval > request interval > network round trip time between the servers
It is useful to know the network round trip times within and between data-centers when choosing values for the heartbeat interval and timeout. For example, if the network round trip time between the servers is 20 ms, heartbeats can be sent every 100 ms, and servers can check after 1 second to give enough time for multiple heartbeats to be sent without getting false negatives. If a server receives no heartbeat within this interval, it declares that the sending server has failed.
Both servers, the one sending the heartbeat and the one receiving it, have a scheduler defined as follows. The scheduler is given a method to be executed at a regular interval. When started, the task is scheduled to execute the given method. On the sending server, the scheduler executes a method to send heartbeat messages. On the receiving server, the failure detection mechanism has a similar scheduler started. At regular intervals, it checks if a heartbeat was received.
Liveness is the property of a system that says that the system always makes progress. Safety is the property that says that the system is always in the correct state. If we focus only on safety, then the system as a whole might not make progress. If we focus only on liveness, then safety might be compromised.
Avoid two groups of servers making independent decisions by requiring majority for taking every decision.
Problem In a distributed system, whenever a server takes any action, it needs to ensure that in the event of a crash the results of the actions are available to the clients. This can be achieved by replicating the result to other servers in the cluster. But that leads to the question: How many other servers need to confirm the replication before the original server can be confident that the update is fully recognized? If the original server waits for too many replications, then it will respond slowly—reducing liveness. But if it doesn’t have enough replications, then the update could be lost—a failure of safety. It’s critical to balance between the overall system performance and system integrity.
Solution A cluster agrees that it’s received an update when a majority of the nodes in the cluster have acknowledged the update. We call this number a quorum. So if we have a cluster of five nodes, we need a quorum of 3. For a cluster of n nodes, the quorum is n / 2 + 1.
The need for a quorum indicates how many failures can be tolerated—which is the size of the cluster minus the quorum. A cluster of five nodes can tolerate two of them failing. In general, if we want to tolerate f failures we need a cluster size of 2f + 1.
Consider two examples that need a quorum:
Updating data in a cluster of servers. High-Water Mark is used to ensure that only data guaranteed to be available on the majority of servers is visible to clients.
Leader election. In Leader and Followers, a leader is selected only if it gets votes from a majority of the servers.
Deciding on Number of Servers in a Cluster The cluster can function only if the majority of servers are up and running. In systems doing data replication, there are two things to consider:
The throughput of write operations. Every time data is written to the cluster, it needs to be copied to multiple servers. Every additional server adds some overhead to complete this write. The latency of data write is directly proportional to the number of servers forming the quorum. As we will see below, doubling the number of servers in a cluster will reduce throughput to half of the value for the original cluster.
The number of failures which need to be tolerated. The number of server failures tolerated depends on the size of the cluster. But just adding one more server to an existing cluster doesn’t always give you more fault tolerance: With a three-server cluster, adding one server doesn’t increase failure tolerance.
Considering these two factors, most practical quorum-based systems have cluster sizes of three or five. A five-server cluster tolerates two server failures and has tolerable data write throughput of a few thousand requests per second.
A monotonically increasing number indicating the generation of the server.
Also known as: Term; Epoch; Generation
Problem In Leader and Followers setup, there is a possibility of the leader being temporarily disconnected from the followers. There might be a garbage collection pause in the leader process, or a temporary network disruption which disconnects the leader from the follower. In this case the leader process is still running, and after the pause or the network disruption is over, it will try sending replication requests to the followers. This is dangerous, as meanwhile the rest of the cluster might have selected a new leader and accepted requests from the client. It is important for the rest of the cluster to detect any requests from the old leader. The old leader itself should also be able to detect that it was temporarily disconnected from the cluster and take necessary corrective action to step down from leadership.
Solution Maintain a monotonically increasing number indicating the generation of the server. Every time a new leader election happens, it should be marked by incrementing the generation. The generation needs to be available beyond a server reboot, so it is stored with every entry in the Write-Ahead Log. As discussed in High-Water Mark, followers use this information to find conflicting entries in their log.
At startup, the server reads the last known generation from the log. With Leader and Followers, servers increment the generation every time there’s a new leader election. Servers send the generation to other servers as part of the vote requests. This way, after a successful leader election, all the servers have the same generation. Once the leader is elected, followers are told about the new generation. Thereafter, the leader includes the generation in each request it sends to the followers. It includes it in every HeartBeat message as well as the replication requests sent to followers.
Leader persists the generation along with every entry in its Write-Ahead Log. This way, it is also persisted in the follower log as part of the replication mechanism of Leader and Followers.
If a follower gets a message from a deposed leader, the follower can tell because its generation is too low. The follower then replies with a failure response. When a leader gets such a failure response, it becomes a follower and expects communication from the new leader.
Generation Clock pattern is an example of a Lamport Clock: a simple technique used to determine ordering of events across a set of processes, without depending on a system clock. Each process maintains an integer counter, which is incremented after every action the process performs. Each process also sends this integer to other processes along with the messages processes exchange. The process receiving the message sets its integer counter by choosing the maximum between its own counter and the integer value of the message. This way, any process can figure out which action happened before the other by comparing the associated integers. The comparison is possible for actions across multiple processes as well, if the messages were exchanged between the processes. Actions which can be compared this way are said to be causally related.
An index in the write-ahead log showing the last successful replication.
Also known as: CommitIndex
Problem The Write-Ahead Log pattern is used to recover state after a server crashes and restarts. But a write-ahead log is not enough to provide availability in case of server failure. If a single server fails, then clients won’t be able to function until the server restarts. To get a more available system, we can replicate the log on multiple servers. Using Leader and Followers the leader replicates all its log entries to a Majority Quorum of followers. Now, should the leader fail, a new leader can be elected, and clients can mostly continue to work with the cluster as before. But there are still a couple things that can go wrong:
The leader can fail before sending its log entries to any followers.
The leader can fail after sending log entries to some followers, but before sending it to the majority of followers.
In these error scenarios, some followers can be missing entries in their logs, and some followers can have more entries than others. So it becomes important for each follower to know what part of the log is safe to be made available to the clients.
Solution The high-water mark is an index into the log file that records the last log entry known to have successfully replicated to a Majority Quorum of followers. The leader also passes on the high-water mark to its followers during its replication. All servers in the cluster should only transmit to clients the data that reflects updates below the high-water mark.
Use two consensus-building phases to reach safe consensus even when nodes disconnect.
Problem When multiple nodes share state, they often need to agree between themselves on a particular value. With Leader and Followers, the leader decides and passes its value to the followers. But if there is no leader, then the nodes need to determine a value themselves. (Even with a leader-follower, they may need to do this to elect a leader.)
A leader can ensure that replicas safely acquire an update by using Two-Phase Commit, but without a leader we can have competing nodes attempt to gather a Majority Quorum. This process is further complicated because any node may fail or disconnect. A node may achieve majority quorum on a value, but disconnect before it is able to communicate this value to the entire cluster.
Solution The Paxos algorithm was developed by Leslie Lamport, published in his 1998 paper “The Part-Time Parliament” [Lamport1998]. Paxos works in three phases to make sure multiple nodes agree on the same value in spite of partial network or node failures. The first two phases act to build consensus around a value and the last phase then communicates that consensus to the remaining replicas.
Prepare phase: Establish the latest Generation Clock and gather any already accepted values.
Accept phase: Propose a value for this generation for replicas to accept.
Commit phase: Let all the replicas know that a value has been chosen.
In the first phase (called prepare phase), the node proposing a value (called a proposer) contacts all the nodes in the cluster (called acceptors) and asks them if they will promise to consider its value. Once a majority quorum of acceptors returns such a promise, the proposer moves onto the second phase. In the second phase (called the accept phase), the proposer sends out a proposed value. If a majority quorum of nodes accepts this value then the value is chosen. In the final phase (called the commit phase), the proposer can then commit the chosen value to all the nodes in the cluster.
Flow of the Protocol
Paxos is a difficult protocol to understand. We’ll start by showing an example of a typical flow of the protocol (Table 11.1), and then dig into some of the details of how it works. This brief explanation aims to provide an intuitive sense of how the protocol works, but it’s not a comprehensive description to base an implementation upon.
Proposer | Acceptor |
---|---|
Obtains the next generation number from a Generation Clock. Sends a prepare request with this generation number to all acceptors. | - |
- | If the generation number of the prepare request is later than its promised generation variable, it updates its promise generation with this later value and returns a promise response. If it has already accepted a proposal, it returns this proposal. |
When it receives promises from a majority quorum of acceptors, it checks if any of these responses contain accepted values. If so, it changes its own proposed value to that of the returned proposal with the highest generation number. It sends accept requests to all acceptors with its generation number and proposed value. | - |
- | If the generation number of the accept request is later than or equal to its promised generation variable, it stores the proposal as its accepted proposal and responds that it has accepted the request. |
When it receives a successful response from a majority quorum of acceptors, it records the value as chosen and sends commit messages to all nodes. | - |
Keep the state of multiple nodes synchronized by using a write-ahead log that is replicated to all the cluster nodes.
Problem When multiple nodes share a state, that state needs to be synchronized. All cluster nodes need to agree on the same state, even when some nodes crash or get disconnected. This requires achieving consensus for each state change request.
But achieving consensus on individual requests is not enough. Each replica also needs to execute requests in the same order, otherwise different replicas can get into a different final state, even if they have consensus on an individual request.
Solution Cluster nodes maintain a Write-Ahead Log. Each log entry stores the state required for consensus along with the user request. They coordinate to build consensus over the log entries, so that all cluster nodes have exactly the same write-ahead log. The requests are then executed sequentially as per the log. Because all cluster nodes agree on each log entry, they execute the same requests in the same order. This ensures that all the cluster nodes share the same state.
A fault-tolerant consensus-building mechanism using Majority Quorum needs two phases.
A phase to establish a Generation Clock and to know about the log entries replicated in the previous Majority Quorum.
A phase to replicate requests on all the cluster nodes.
Executing two phases for each state change request is inefficient. Therefore, cluster nodes elect a leader at startup. The leader election phase establishes the Generation Clock and detects all the log entries in the previous Majority Quorum (the entries the previous leader might have copied to the majority of the cluster nodes). Once there is a stable leader, only the leader coordinates the replication. Clients communicate with the leader. The leader adds each request to the log and makes sure it’s replicated on all the followers. Consensus is reached once a log entry is successfully replicated to the majority of the followers. This way, when there is a stable leader, only a single phase to reach consensus is needed for each state change operation.
Last modified 02 October 2024