In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
In this issue, the editor will bring you about how to study the NoSQL database distributed system algorithm. The article is rich in content and analyzes and describes for you from a professional point of view. I hope you can get something after reading this article.
The scalability of the system is the main reason to promote the development of the NoSQL movement, including distributed system coordination, failover, resource management and many other features. This makes the NoSQL sound like a big basket that can fit into anything. Although the NoSQL movement has not brought fundamental technological changes to distributed data processing, it has still led to an overwhelming amount of research and practice on various protocols and algorithms. It is through these attempts that some effective database construction methods are gradually summed up. In this article, I will give some systematic descriptions of the distributed characteristics of NoSQL databases.
Next we will look at some distributed strategies, such as replication in fault detection, which are marked in boldface and divided into three segments:
Data consistency. NoSQL needs to make a tradeoff between distributed system consistency, fault tolerance and performance, low latency and high availability, generally speaking, data consistency is a must, so this section is mainly about data replication and data recovery.
Data placement. A database product should be able to cope with different data distributions, cluster topologies and hardware configurations. In this section we will discuss how to distribute and adjust the data distribution in order to solve the problem in time, provide persistence guarantee, efficiently query and ensure the balanced use of resources (such as memory and hard disk space) in training.
Peer-to-peer system. Technologies like leader election have been used in multiple database products to achieve fault tolerance and strong data consistency. However, even decentralized databases (without centers) track their global status and detect failures and topology changes. This section describes several techniques that keep the system in a consistent state. System Coordination. Coordination techniques like leader election are used in.
Data consistency
As we all know, distributed systems often encounter network isolation or latency, in which case the isolated part is not available, so it is impossible to maintain high availability without sacrificing consistency. This fact is often referred to as "CAP theory". However, consistency is a very expensive thing in distributed systems, so you often need to make some concessions, not just for usability, but also for a variety of tradeoffs. To study these tradeoffs, we note that consistency problems in distributed systems are caused by data isolation and replication, so we will start with the characteristics of replication:
Availability. In the case of network isolation, the rest of the system can still handle read and write requests.
Read and write delay. Read and write requests can be processed in a short time.
Read and write malleable. The pressure of reading and writing can be evenly shared by multiple nodes.
Fault tolerance. The processing of read and write requests does not depend on any particular node.
Data persistence. Node failures under certain conditions will not cause data loss.
Consistency. Consistency is much more complex than the previous features, and we need to discuss several different points of view in detail. But we will not cover too much consistency theory and concurrency models, because this is beyond the scope of this article, I will only use a simplified system made up of some simple features.
Read and write consistency. From a read-write point of view, the basic goal of a database is to make the time for replica convergence as short as possible (that is, the time in which updates are delivered to all replicas) to ensure ultimate consistency. In addition to this weaker guarantee, there are some stronger consistency features:
Read consistency after writing. The effect of writing on data item X can always be seen by subsequent reads on X.
Read consistency after reading. After a read of data item X, subsequent reads of X should return the same or newer value as the first return value.
Write consistency. Write conflicts often occur in partitioned databases. The database should be able to handle such conflicts and ensure that multiple write requests will not be processed by different partitions. The database provides several different consistency models in this regard:
Atoms write. If the database provides API, a write operation can only be a separate atomic assignment, and the way to avoid write conflicts is to find out the "latest version" of each data. This enables all nodes to get the same version at the end of the update, regardless of the order of the update. Network failures and delays often cause inconsistent update order among the nodes. The data version can be represented by a timestamp or a user-specified value. This is the method used by Cassandra.
Atomized reading-rewriting. Applications sometimes require read-modify-write sequence operations rather than separate atomic write operations. If two clients read the same version of the data, modify and write the modified data back, according to the atomic write model, the later update will overwrite the previous one. This behavior is incorrect in some cases (for example, two clients add new values to the same list value). The database provides at least two solutions:
Conflict prevention. Read-change-write can be thought of as a transaction in a special case, so distributed locks or consistent protocols such as PAXOS [20,21] can solve this problem. This technology supports atomic read and write semantics and arbitrary isolation level transactions. Another approach is to avoid distributed concurrent writes and route all writes to specific data items to a single node (which can be a global master or a partitioned master). In order to avoid conflicts, databases must sacrifice availability in the case of network isolation. This approach is often used in many systems that provide strong consistency guarantees (such as most relational databases, HBase,MongoDB).
Collision detection. The database tracks conflicts of concurrent updates and chooses to roll back one of them or maintain both versions for client resolution. Concurrent updates are usually tracked by a vector clock [19] (an optimistic lock), or a complete version history is maintained. This method is used in Riak, Voldemort, CouchDB.
Now let's take a closer look at the commonly used replication techniques and classify them according to the characteristics described. The first picture depicts the logical relationship between different technologies and the tradeoff coordinates between different technologies in terms of system consistency, scalability, availability and latency. The second diagram depicts each technology in detail.
The duplicate factor is 4. The read and write coordinator can be an external client or an internal proxy node.
We will go through all the technologies from weak to strong according to consistency:
(a, anti-entropy) consistency is the weakest, based on the strategy as follows. Select any node to update during the write operation, and if the new data has not been transferred to the read node through the anti-entropy protocol in the background, then you will still read the old data. The next section describes the anti-entropy protocol in more detail. The main features of this method are:
High propagation latency makes it difficult to use in data synchronization, so it is typically used only as an auxiliary function to detect and fix unplanned inconsistencies. Cassandra uses the anti-entropy algorithm to transfer database topology and other metadata information between nodes.
The consistency guarantee is weak: write conflicts and read-write inconsistencies can occur even without a failure.
High availability and robustness under network isolation. Asynchronous batch processing is used instead of updating one by one, which results in excellent performance.
Persistence protection is weak because the new data initially has only a single copy.
(B) an improvement to the above mode is that updates are sent asynchronously to all available nodes at the same time that any node receives a request for update data. This is also considered to be directional anti-entropy.
Compared with pure anti-entropy, this approach greatly improves consistency with a small performance sacrifice. However, formal consistency and persistence remain the same.
If some node is not available at that time because of network failure or node failure, the update will eventually be transmitted to that node through the anti-entropy propagation process.
(C) in the previous mode, the operation failure of a node can be better handled by using prompt handover technology [8]. The expected update for the failed node is recorded on the additional proxy node and indicates that the update will be passed to that node once the characteristic node is available. This improves consistency and reduces replication convergence time.
(d, read and write once) because the responsible node that prompts the transfer may also fail before the update is delivered, in this case it is necessary to ensure consistency through the so-called read repair. Each read starts an asynchronous process that requests a data digest (such as a signature or hash) from all nodes storing the data, and unifies the data version on each node if it is found that the summary returned by each node is inconsistent. We use one-time read-write to name techniques that combine A, B, C, and D-none of them provide strict consistency guarantees, but can already be put into practice as a self-contained method.
(e, several reads and writes) the strategy above is a heuristic enhancement that reduces the replication convergence time. In order to ensure greater consistency, usability must be sacrificed to ensure a certain degree of read-write overlap. The usual practice is to write W copies at the same time instead of one, and read R copies when reading.
First, you can configure the number of copies W > 1.
Second, because RedW > N, there is bound to be overlap between the written node and the read node, so at least one of the multiple copies of the data read will be relatively new (Wroom2, Rene3, Native 4 in the figure above). This ensures consistency (read-write consistency for a single user) when read-write requests are carried out sequentially (read after writing), but not global read consistency. Use the example in the following illustration to see that Renew2 is not transactional for updating two replicas, because the write operation is non-transactional for the update of two replicas. When the update is not completed, it is possible to read that both are old values or one new and one old:
For certain read latency requirements, setting different values of R and W can adjust write latency and persistence, and vice versa.
If WN/2 can guarantee that conflicts are detected in time during atomic read and rewrite that conforms to the rollback model.
Strictly speaking, although this model can tolerate the failure of individual nodes, it is not good for fault tolerance of network isolation. In practice, the approach of "approximate quantity pass" is often used to improve usability in some scenarios at the expense of consistency.
(F, read all write several) read consistency problems can be mitigated by accessing all copies (reading the data or checking the summary) while reading the data. This ensures that as long as the data on at least one node is updated, the new data can be seen by the reader. But in the case of network isolation, this guarantee will not work.
(G, master-slave) this technique is often used to provide read and rewrite at atomic write or conflict detection persistence levels. In order to achieve the level of conflict prevention, a centralized management or lock must be used. The simplest strategy is to use master-slave asynchronous replication. All write operations for specific data items are routed to a central node and executed sequentially above. In this case, the master node becomes a bottleneck, so the data must be divided into separate extents (different slices have different master) in order to provide scalability.
(h, Transactional Read Quorum Write Quorum and Read One Write All) the method of updating multiple copies can avoid write conflicts by using transaction control techniques. The well-known approach is to use a two-phase commit protocol. However, two-phase commit is not entirely reliable because the failure of the coordinator may cause resource congestion. The PAXOS submission protocol [20, 21] is a more reliable choice, but with a bit of performance loss. A small step forward on this basis is to read one copy and write all replicas. This method puts the updates of all replicas in one transaction, which provides strong fault-tolerant consistency but loses some performance and availability.
Some of the tradeoffs in the above analysis need to be re-emphasized:
Consistency and availability. Tight trade-offs have been given by CAP theory. In the case of network isolation, the database either centralizes the data or accepts the risk of data loss.
Consistency and scalability. It can be seen that even if the read-write consistency guarantee reduces the scalability of the replica set, write conflicts can be handled in a relatively scalable way only in the atomic write model. The atomic read-rewrite model avoids conflicts by adding temporary global locks to the data. This shows that dependencies between data or operations, even on a small scale or for a short period of time, can damage scalability. Therefore, it is very important for scalability to carefully design the data model and store the data separately.
Consistency and delay. As mentioned above, when a database needs to provide strong consistency or persistence, it should be biased towards reading and writing all replicas. However, it is clear that consistency is inversely proportional to request delay, so using several copy techniques would be a more acceptable approach.
Failover and consistency / scalability / latency. Interestingly, the trade-off conflicts between fault tolerance and consistency, scalability, and latency are not severe. By reasonably abandoning some performance and consistency, the cluster can tolerate node failures up to up to. This compromise is evident in the difference between the two-phase commit and the PAXOS protocol. Another example of this compromise is the addition of specific consistency guarantees, such as "read what you write" using a strict session process, but this increases the complexity of failover [22].
Anti-entropy protocol, rumor propagation algorithm
Let's start with the following scenarios:
There are many nodes, and each piece of data has a copy on several of them. Each node can handle update requests separately, and each node periodically synchronizes its status with other nodes, so that all copies tend to be consistent after a period of time. How does the synchronization process work? When does synchronization begin? How to select synchronized objects? How to exchange data? We assume that both nodes always overwrite the old data with newer versions or that both versions are reserved for processing by the application layer.
This problem is common in scenarios such as data consistency maintenance and cluster state synchronization (such as the dissemination of cluster member information). Although the coordinator who introduces a monitoring database and makes a synchronization plan can solve this problem, a decentralized database can provide better fault tolerance. The main method of decentralization is to use a well-designed contagion protocol [7], which is relatively simple, but provides a good convergence time, and can tolerate the failure of any node and network isolation. Although there are many types of contagion algorithms, we only focus on the anti-entropy protocol because it is used by NoSQL databases.
The anti-entropy protocol assumes that synchronization is performed according to a fixed schedule, and each node periodically randomly or according to some rules selects another node to exchange data to eliminate the difference. There are three anti-style anti-entropy protocols: push, pull and hybrid. The principle of the push protocol is to simply select a random node and send the data status to it. It is obviously stupid to push all the data out in a real-world application, so nodes generally work in the way shown in the following figure.
Node A, as the synchronous initiator, prepares a data summary that contains the fingerprint of the data on A. After receiving the summary, node B compares the data in the summary with the local data and returns the data differences to An into a summary. Finally, A sends an update to BMageB and then updates the data. Pull and hybrid protocols are similar, as shown in the figure above.
The anti-entropy protocol provides good convergence time and scalability. The following figure shows an updated simulation result propagated in a 100-node cluster. In each iteration, each node is associated with only one randomly selected peer.
It can be seen that the convergence of the pull method is better than that of the push method, which can be proved theoretically [7]. And there is also a problem of "converging tail" in the way of pushing. After many iterations, although almost all the nodes have been traversed, a few have not been affected. Compared with the simple push and pull method, the hybrid method is more efficient, so it is usually used in practical applications. Anti-entropy is scalable because the average conversion time increases in the form of a logarithmic function of the cluster size.
Although these techniques seem simple, there are still many studies focusing on the performance of anti-entropy protocols under different constraints. One of them uses a more efficient structure to use network topology instead of random selection [10]. Under the condition of limited network bandwidth, adjust the transmission rate or use advanced rules to select the data to be synchronized [9]. Summary computing also faces challenges, and the database maintains a recently updated log to facilitate summary computing.
Final consistent data type Eventually Consistent Data Types
In the previous section we assumed that the two nodes always merge their data versions. However, it is not easy to resolve update conflicts, and it is surprisingly difficult for all copies to eventually reach a semantically correct value. A well-known example is that deleted entries in the Amazon Dynamo database [8] can be reproduced.
Let's assume an example to illustrate this problem: the database maintains a logical global counter, and each node can increase or decrease the count. Although each node can maintain its own value locally, these local counts cannot be combined by simple addition and subtraction. Suppose an example: there are three nodes A, B, and C, each of which performs an add operation. If A gets a value from B and adds it to the local copy, then C gets the value from B, and then C gets the value from A, then the last value of C is 4, which is wrong. The solution to this problem is to maintain a pair of counters for each node using a data structure similar to a vector clock [19]:
Cassandra counts in a similar way [11]. More complex and ultimately consistent data structures can also be designed by using state-based or operation-based replication theory. For example, a series of such data structures are mentioned in [1], including:
Counter (add and subtract operation)
Collection (add and remove operations)
Graph (add edges or vertices, remove edges or vertices)
List (insert or remove a location)
Ultimately, the functionality of consistent data types is usually limited, with additional performance overhead.
Data placement
This section focuses on algorithms that control the placement of data in a distributed database. These algorithms are responsible for mapping data items to appropriate physical nodes, migrating data between nodes, and global allocation of resources such as memory.
Balanced data
Let's start with a simple protocol that provides seamless data migration between cluster nodes. This often occurs in scenarios such as cluster expansion (adding new nodes), failover (some nodes are down), or balancing data (data is unevenly distributed among nodes). The scenario depicted in figure A below-there are three nodes, and the data is randomly distributed on three nodes (assuming that the data is all key- value).
If the database does not support internal data balancing, publish the database instance on each node, as shown in figure B above. This requires manual cluster expansion, stopping the database instance to be migrated, moving it to a new node, and then starting it on the new node, as shown in figure C. Although the database can monitor every record, many systems, including MongoDB, Oracle Coherence, and Redis Cluster under development, still use automatic equalization technology. That is, to slice the data and take each data slice as the smallest unit of migration, which is based on the consideration of efficiency. It is obvious that the number of fragments will be more than the number of nodes, and data fragments can be evenly distributed among the nodes. Seamless data migration can be achieved according to a simple protocol, which can redirect the customer's data migration node and migration node when data fragmentation is migrated. The following figure depicts a state machine of get (key) logic implemented in Redis Cluster.
Assuming that each node knows the cluster topology, any key can be mapped to the corresponding data shard and the data shard can be mapped to the node. If the node determines that the requested key belongs to a local shard, it will look for it locally (the box above). If the node determines that the requested key belongs to another node X, it sends a permanent redirect command to the client (the box at the bottom of the figure). Permanent redirection means that the client can cache the mapping between shards and nodes. If the shard migration is in progress, the moving out node and the moving in node will mark the corresponding shard and lock the shard data one by one and then start to move. The outgoing node will first look for the key locally, and if not, redirect the client to the migrating node, if the key has been migrated. This redirection is one-time and cannot be cached. The inbound node handles redirection locally, but periodic queries are permanently redirected before the migration is completed.
Data fragmentation and replication in dynamic Environment
Another issue we focus on is how to map records to physical nodes. A more direct method is to use a table to record the mapping relationship between each range of key and nodes, a range of key corresponds to a node, or use the hash value of key and the value obtained by the number of nodes as the node ID. However, the method of hash modeling is not very useful in the case of cluster changes, because adding or decreasing nodes will cause a complete rearrangement of data in the cluster. Makes replication and failure recovery difficult.
There are many ways to enhance the perspective of replication and failure recovery. The most famous is the consistent hash. There are already many introductions about consistency hash on the Internet, so I will only provide a basic introduction here, just for the sake of the integrity of the article. The following figure depicts the basic principles of consistent hash:
A consistent hash is basically a key-value mapping structure-it maps keys (usually hash-passed) to physical nodes. The value space of the key after hash is an ordered fixed-length binary string, and it is obvious that each key in this range will be mapped to one of the three nodes A, B, and C in figure A. For replica replication, close the value space into a ring and move clockwise along the ring until all replicas are mapped to the appropriate nodes, as shown in figure B. In other words, Y will be positioned on node B because it is within the scope of B, the first copy should be placed in C, the second copy should be placed in A, and so on.
The benefits of this structure are reflected in the addition or decrease of a node, because it only causes data rebalancing in the adjacent area. As shown in figure C, the addition of node D only affects data item X but has no effect on Y. Similarly, removing node B (or B invalidation) affects only the copies of Y and X, not X itself. However, as mentioned in reference [8], this approach has both advantages and disadvantages, that is, the burden of rebalancing is borne by neighboring nodes, which will move a large amount of data. The adverse effects of this problem can be mitigated to some extent by mapping each node to multiple ranges rather than one, as shown in figure D. This is a compromise that avoids excessive load concentration when rebalancing data, but maintains an appropriate reduction in the total number of balances compared to module-based mappings.
It is not easy to maintain a complete and coherent hash ring for a large cluster. There will be no problem for relatively small database clusters, and it is interesting to study how to combine data placement with network routing in peer-to-peer networks. A good example is the Chord algorithm, which makes the integrity of the ring yield to the search efficiency of a single node. The Chord algorithm also uses the idea of ring mapping keys to nodes, which is very similar to consistent hash in this respect. The difference is that a particular node maintains a short list, and the logical position of the nodes in the list grows exponentially on the ring (see figure below). This makes it possible to use a binary search to locate a key with just a few web jumps.
This picture shows a cluster of 16 nodes, depicting how Node A finds the key placed on Node D. (a) depicts the route, and (B) depicts the local image of the ring for nodes A, B, C. There is more about data replication in distributed systems in reference [15].
Slicing data according to multiple attributes
Consistent hash's data placement strategy works when you only need to access data through the primary key, but things are much more complicated when you need to query by multiple attributes. A simple approach (used by MongoDB) is to use primary keys to distribute data regardless of other attributes. The result is that queries based on the primary key can be routed to the appropriate node, but the processing of other queries traverses all nodes in the cluster. The imbalance in query efficiency causes the following problems:
There is a dataset in which each piece of data has several attributes and corresponding values. Is there a data distribution strategy that enables queries with any number of attributes to be handed over to as few nodes as possible?
The HyperDex database provides a solution. The basic idea is to treat each attribute as an axis in a multi-dimensional space and map the region in the space to the physical node. A query corresponds to a hyperplane consisting of multiple adjacent regions in the space, so only these areas are relevant to the query. Let's look at an example in reference [6]:
Each piece of data is a piece of user information with three attributes, First Name, Last Name, and Phone Number. These attributes are regarded as a three-dimensional space, and the feasible data distribution strategy is to map each quadrant to a physical node. A query such as "First Name = John" corresponds to a plane that runs through four quadrants, meaning that only four nodes will participate in processing this query. A query with two attribute restrictions corresponds to a straight line that runs through two quadrants, as shown in the figure above, so only two nodes are involved in the processing.
The problem with this method is that the spatial quadrant increases exponentially with the number of attributes. As a result, queries with only a few attribute restrictions are projected to many spatial areas, that is, many servers. Splitting a data item with more attributes into several subitems with relatively few attributes, and mapping each subitem to a separate subspace instead of mapping the entire piece of data to a multidimensional space can alleviate this problem to some extent:
This can provide better query-to-node mapping, but increases the complexity of cluster coordination, because in this case a piece of data will be scattered in multiple independent subspaces, and each subspace corresponds to several physical nodes. Transaction problems must be considered when updating data. Reference [6] has more introduction and implementation details of this technology.
Passivated copy
Some applications have strong random read requirements, which requires all the data to be kept in memory. In this case, it usually takes more than twice the memory to slice the data and copy each shard master-slave, because each data has one copy on the master node and one copy on the slave node. In order to replace the master node in the event of failure, the memory size on the slave node should be the same as that on the master node. If the system can tolerate a short interruption or performance degradation in the event of node failure, fragmentation can also be avoided.
The following figure depicts 16 shards on 4 nodes, one for each shard in memory, and a copy on the hard disk:
The gray arrow highlights the fragmented copy on node 2. Shards on other nodes are also copied. The red arrow depicts how the copy is loaded into memory in the event of node 2 failure. The uniform distribution of replicas in the cluster makes it possible to reserve only a small amount of memory to store replicas activated in the event of node failure. In the figure above, the cluster can withstand the failure of a single node with only 1 beat 3 memory reserved. In particular, it is important to point out that the activation of the copy (adding load memory from the hard disk) takes some time, which can cause a short period of performance degradation or interruption of the part of the data service that is being restored.
System coordination
In this section we will discuss two technologies related to system coordination. Distributed coordination is a relatively large field, and many people have done in-depth research on it for decades. This article deals with only two technologies that have been put into practice. Information on distributed locking, consensus protocol, and other basic technologies can be found in many books or network resources, or you can see Resources [17, 18, 21].
Fault detection
Fault detection is the basic function of any distributed system with fault tolerance. In fact, all fault detection protocols are based on heartbeat communication mechanism, and the principle is very simple. The monitored component sends heartbeat information to the monitoring process periodically (or the monitoring process polls the monitored component). If the heartbeat information is not received for a period of time, it is considered invalid. In addition, there are other functional requirements for a true distributed system:
`Adaptive. Fault detection should be able to cope with temporary network failures and delays, as well as changes in cluster topology, load, and bandwidth. But this is very difficult because there is no way to tell whether a process that has not responded for a long time has really failed, so fault detection requires a tradeoff between fault identification time (how long it takes to identify a real fault, that is, how long a process is considered to be a failure) and a false alarm rate. This tradeoff factor should be dynamically and automatically adjusted.
`flexibility. At first glance, fault detection only needs to output a Boolean value indicating whether the monitored process is working or not, but this is not enough in practical applications. Let's take a look at an example similar to MapReduce in reference [12]. There is a distributed application composed of a master node and several work nodes. The master node maintains a job list and assigns the jobs in the list to the work node. The master node can distinguish between different degrees of failure. If the master node suspects that a work node is dead, he will no longer assign jobs to that node. Second, over time, if the heartbeat information of the node is not received, the primary node will reassign the jobs running on this node to other nodes. Finally, the master node confirms that the node has failed and releases all related resources.
`Extensibility and robustness. As a system function, failure detection should be able to expand with the expansion of the system. It should be robust and consistent, that is, even in the event of a communication failure, all nodes in the system should have a consistent view (that is, all nodes should know which nodes are not available and which nodes are available. The cognition of each node should not conflict, and there should be no situation in which one node knows that node An is not available while the other node does not.
The so-called cumulative failure detector [12] can solve the first two problems, and Cassandra [16] has made some modifications to it and applied it to the product. The basic workflow is as follows:
`for each monitored resource, the detector records the heartbeat arrival time Ti.
`calculate the mean and variance of the arrival time within the range of statistical prediction.
Assuming that the distribution of time of arrival is known (the following figure includes a formula of normal distribution), we can calculate the probability of heartbeat delay (the difference between the current time t_now and the last time of arrival Tc) and use this probability to determine whether a failure has occurred. As suggested in reference [12], you can use logarithmic functions to adjust it to improve availability. In this case, the output of 1 means that the probability of misjudgment (thinking that the node is invalid) is 10%. 2 means 1%, and so on.
The monitoring area is organized at different levels according to the degree of importance, and the regions are synchronized through the rumor spread protocol or the central fault-tolerant library, which can not only meet the requirements of expansibility, but also prevent the heartbeat information from flooding in the network [14]. As shown in the following figure (six fault detectors form two areas that communicate with each other through a rumor propagation protocol or a robust library such as ZooKeeper):
Coordinator campaign
The coordinator campaign is an important technique for strongly consistent databases. First of all, it can organize the fault recovery of the master node in the system with master-slave structure. Second, in the case of network isolation, it can disconnect the few nodes to avoid write conflicts.
Bully algorithm is a relatively simple coordinator election algorithm. MongoDB uses this algorithm to determine which one is the main one in the replica set. The main idea of the Bully algorithm is that each member of the cluster can declare that it is the coordinator and notify other nodes. Other nodes can choose to accept the claim or reject it and enter the coordinator competition. Only the node that is accepted by all other nodes can become the coordinator. The node determines who should win according to some attributes. This attribute can be a static ID or an updated metric like the most recent transaction ID (the latest node wins).
The example in the following figure shows the execution of the bully algorithm. Using static ID as a metric, nodes with higher ID values win:
1. Initially, the cluster has five nodes, and node 5 is a recognized coordinator.
two。 Suppose node 5 is dead, and node 2 and node 3 find this at the same time. Two nodes start campaigning and send campaign messages to ID's larger nodes.
3. Node 4 eliminates nodes 2 and 3, and node 3 eliminates node 2.
4. At this point, Node 1 senses the failure of Node 5 and sends campaign information to all nodes with a larger ID.
5. Nodes 2, 3, and 4 all eliminated node 1.
6. Node 4 sends election information to node 5.
7. Node 5 did not respond, so node 4 declared itself elected and announced the news to other nodes.
The coordinator's campaign process counts the number of nodes participating and ensures that at least half of the nodes in the cluster participate in the election. This ensures that in the case of network isolation, only some nodes can elect coordinators (assuming that the network in the network will be divided into multiple areas and are not connected to each other. The result of the coordinator election is bound to select the coordinator in the area with a relatively large number of nodes, of course, provided that there are more than half of the original nodes in that area. If the cluster is isolated into several chunks, and no one has more than half of the total number of nodes, it is impossible to elect a coordinator, and of course, do not expect the cluster to continue to provide services.
The above is the editor for you to share how to study the NoSQL database distributed system algorithm, if you happen to have similar doubts, you might as well refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.