In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "how to use zookeeper client Curator". The content of the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn how to use zookeeper client Curator.
Zookeeper is not designed for high availability, but it uses the ZAB protocol to achieve a high degree of consistency, so it is a CP system. So it is often chosen as a registry, configuration center, distributed lock and other scenarios.
Its performance is very limited, and API is not that easy to use. Xjjdog tends to use Etcd or Consul based on the Raft protocol, which are more lightweight.
Curator is an open source zookeeper client of netflix, which is currently a top-level project of Apache. Compared with the native client provided by Zookeeper, Curator has a higher level of abstraction, which simplifies the development of Zookeeper client. Curator addresses many of the very low-level details of zookeeper client development, including connection reconnection, repeated registration of wathcer and NodeExistsException exceptions, and so on.
Curator consists of a series of modules. For ordinary developers, curator-framework and curator-recipes are commonly used, which are described in turn below.
1.maven dependence
The latest version of curator 4.3.0 supports zookeeper 3.4.x and 3.5, but you need to be aware of the dependencies passed in by curator, which need to be consistent with the actual version used on the server side, such as zookeeper 3.4.6, which we currently use.
Org.apache.curator curator-framework 4.3.0 org.apache.zookeeper zookeeper org.apache.curator curator-recipes 4.3.0 org.apache.zookeeper zookeeper org.apache.zookeeper zookeeper 3.4.6
2.curator-framework
Here are some common zk-related operations.
Public static CuratorFramework getClient () {return CuratorFrameworkFactory.builder () .connectString ("127.0.0.1mot2181") .retryPolicy (new ExponentialBackoffRetry (1000, 3)) .connectionTimeoutMs (15 * 1000) / / connection timeout. Default is 15 seconds .sessionTimeoutMs (60 * 1000) / / session timeout The default is 60 seconds. Namespace ("arch") / / sets the namespace .build () } public static void create (final CuratorFramework client, final String path, final byte [] payload) throws Exception {client.create (). CreatingParentsIfNeeded (). ForPath (path, payload);} public static void createEphemeral (final CuratorFramework client, final String path, final byte [] payload) throws Exception {client.create () .withMode (CreateMode.EPHEMERAL) .forPath (path, payload) } public static String createEphemeralSequential (final CuratorFramework client, final String path, final byte [] payload) throws Exception {return client.create (). WithProtection (). WithMode (CreateMode.EPHEMERAL_SEQUENTIAL) .forPath (path, payload);} public static void setData (final CuratorFramework client, final String path, final byte [] payload) throws Exception {client.setData (). ForPath (path, payload);} public static void delete (final CuratorFramework client, final String path) throws Exception {client.delete (). DeletingChildrenIfNeeded (). ForPath (path) } public static void guaranteedDelete (final CuratorFramework client, final String path) throws Exception {client.delete (). Guaranteed (). ForPath (path);} public static String getData (final CuratorFramework client, final String path) throws Exception {return new String (client.getData (). ForPath (path));} public static List getChildren (final CuratorFramework client, final String path) throws Exception {return client.getChildren (). ForPath (path);}
3.curator-recipescurator-recipes
Provides some references for typical usage scenarios of zk. The following is mainly about the components commonly used in development.
Event monitoring
Zookeeper natively supports event listening by registering watcher, but it is not very convenient to use and requires developers to register watcher repeatedly, which is cumbersome.
Curator introduces Cache to monitor zookeeper server transactions. Cache is the wrapper of event listening in Curator, and its event listening can be seen as a comparison between a local cache view and a remote Zookeeper view. At the same time, Curator can automatically handle repeated registration snooping for developers, thus greatly simplifying the tedious process of native api development.
1) Node Cache
Public static void nodeCache () throws Exception {final String path = "/ nodeCache"; final CuratorFramework client = getClient (); client.start (); delete (client, path); create (client, path, "cache" .getBytes ()); final NodeCache nodeCache = new NodeCache (client, path); nodeCache.start (true) NodeCache.getListenable () .addListener (()-> System.out.println ("node data change, new data is" + new String (nodeCache.getCurrentData (). GetData (); setData (client, path, "cache1" .getBytes ()); setData (client, path, "cache2" .getBytes ()); Thread.sleep (1000); client.close ();}
NodeCache can listen to the specified node. After registering the listener, the node changes will be notified to the corresponding listener.
2) Path Cache
Path Cache is used to listen for child node events of ZNode, including added, updateed, and removed,Path Cache synchronize the status of child nodes, and the resulting events are passed to the registered PathChildrenCacheListener.
Public static void pathChildrenCache () throws Exception {final String path = "/ pathChildrenCache"; final CuratorFramework client = getClient (); client.start (); final PathChildrenCache cache = new PathChildrenCache (client, path, true); cache.start (PathChildrenCache.StartMode.POST_INITIALIZED_EVENT) Cache.getListenable () .addListener ((client1, event)-> {switch (event.getType ()) {case CHILD_ADDED: System.out.println ("CHILD_ADDED:" + event.getData () .getPath ()); break Case CHILD_REMOVED: System.out.println ("CHILD_REMOVED:" + event.getData () .getPath ()); break; case CHILD_UPDATED: System.out.println ("CHILD_UPDATED:" + event.getData () .getPath ()); break Case CONNECTION_LOST: System.out.println ("CONNECTION_LOST:" + event.getData () .getPath ()); break; case CONNECTION_RECONNECTED: System.out.println ("CONNECTION_RECONNECTED:" + event.getData () .getPath ()); break Case CONNECTION_SUSPENDED: System.out.println ("CONNECTION_SUSPENDED:" + event.getData () .getPath ()); break; case INITIALIZED: System.out.println ("INITIALIZED:" + event.getData () .getPath ()); break Default: break;}); / / client.create () .withMode (CreateMode.PERSISTENT) .forPath (path); Thread.sleep (1000); client.create () .withMode (CreateMode.PERSISTENT) .forPath (path + "/ C1"); Thread.sleep (1000) Client.delete () .forPath (path + "/ C1"); Thread.sleep (1000); client.delete () .forPath (path); / / listener node itself does not notify Thread.sleep (1000); client.close ();}
3) Tree Cache
The "combination" of Path Cache and Node Cache monitors the creation, update and deletion events under the path, and caches the data of all child nodes under the path.
Public static void treeCache () throws Exception {final String path = "/ treeChildrenCache"; final CuratorFramework client = getClient (); client.start (); final TreeCache cache = new TreeCache (client, path); cache.start () Cache.getListenable () .addListener ((client1, event)-> {switch (event.getType ()) {case NODE_ADDED: System.out.println ("NODE_ADDED:" + event.getData () .getPath ()); break; case NODE_REMOVED: System.out.println ("NODE_REMOVED:" + event.getData () .getPath () Break; case NODE_UPDATED: System.out.println ("NODE_UPDATED:" + event.getData () .getPath ()); break; case CONNECTION_LOST: System.out.println ("CONNECTION_LOST:" + event.getData () .getPath ()); break Case CONNECTION_RECONNECTED: System.out.println ("CONNECTION_RECONNECTED:" + event.getData () .getPath ()); break; case CONNECTION_SUSPENDED: System.out.println ("CONNECTION_SUSPENDED:" + event.getData () .getPath ()); break Case INITIALIZED: System.out.println ("INITIALIZED:" + event.getData () .getPath ()); break; default: break;}}); client.create () .withMode (CreateMode.PERSISTENT) .forPath (path); Thread.sleep (1000) Client.create () .withMode (CreateMode.PERSISTENT) .forPath (path + "/ C1"); Thread.sleep (1000); setData (client, path, "test" .getBytes ()); Thread.sleep (1000); client.delete () .forPath (path + "/ C1"); Thread.sleep (1000); client.delete () .forPath (path); Thread.sleep (1000); client.close ();}
Election
Curator provides two ways, which are Leader Latch and Leader Election.
1) Leader Latch
One of the candidates is randomly selected as a leader. After that, unless close () is called to release the leadship, the other post-selections cannot become leader.
Public class LeaderLatchTest {private static final String PATH = "/ demo/leader"; public static void main (String [] args) {List latchList = new ArrayList (); List clients = new ArrayList (); try {for (int I = 0; I)
< 10; i++) { CuratorFramework client = getClient(); client.start(); clients.add(client); final LeaderLatch leaderLatch = new LeaderLatch(client, PATH, "client#" + i); leaderLatch.addListener(new LeaderLatchListener() { @Override public void isLeader() { System.out.println(leaderLatch.getId() + ":I am leader. I am doing jobs!"); } @Override public void notLeader() { System.out.println(leaderLatch.getId() + ":I am not leader. I will do nothing!"); } }); latchList.add(leaderLatch); leaderLatch.start(); } Thread.sleep(1000 * 60); } catch (Exception e) { e.printStackTrace(); } finally { for (CuratorFramework client : clients) { CloseableUtils.closeQuietly(client); } for (LeaderLatch leaderLatch : latchList) { CloseableUtils.closeQuietly(leaderLatch); } } } public static CuratorFramework getClient() { return CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .connectionTimeoutMs(15 * 1000) //连接超时时间,默认15秒 .sessionTimeoutMs(60 * 1000) //会话超时时间,默认60秒 .namespace("arch") //设置命名空间 .build(); } } 2)Leader Election 通过LeaderSelectorListener可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。而LeaderLatch则一直持有leadership, 除非调用close方法,否则它不会释放领导权。 public class LeaderSelectorTest { private static final String PATH = "/demo/leader"; public static void main(String[] args) { List selectors = new ArrayList(); List clients = new ArrayList(); try { for (int i = 0; i < 10; i++) { CuratorFramework client = getClient(); client.start(); clients.add(client); final String name = "client#" + i; LeaderSelector leaderSelector = new LeaderSelector(client, PATH, new LeaderSelectorListenerAdapter() { @Override public void takeLeadership(CuratorFramework client) throws Exception { System.out.println(name + ":I am leader."); Thread.sleep(2000); } }); leaderSelector.autoRequeue(); leaderSelector.start(); selectors.add(leaderSelector); } Thread.sleep(Integer.MAX_VALUE); } catch (Exception e) { e.printStackTrace(); } finally { for (CuratorFramework client : clients) { CloseableUtils.closeQuietly(client); } for (LeaderSelector selector : selectors) { CloseableUtils.closeQuietly(selector); } } } public static CuratorFramework getClient() { return CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .connectionTimeoutMs(15 * 1000) //连接超时时间,默认15秒 .sessionTimeoutMs(60 * 1000) //会话超时时间,默认60秒 .namespace("arch") //设置命名空间 .build(); } } 分布式锁 1)可重入锁Shared Reentrant Lock Shared意味着锁是全局可见的, 客户端都可以请求锁。Reentrant和JDK的ReentrantLock类似, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。它是由类InterProcessMutex来实现。它的构造函数为: public InterProcessMutex(CuratorFramework client, String path) 通过acquire获得锁,并提供超时机制: /** * Acquire the mutex - blocking until it's available. Note: the same thread can call acquire * re-entrantly. Each call to acquire must be balanced by a call to release() */ public void acquire(); /** * Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can * call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release() * Parameters: * time - time to wait * unit - time unit * Returns: * true if the mutex was acquired, false if not */ public boolean acquire(long time, TimeUnit unit); 通过release()方法释放锁。InterProcessMutex 实例可以重用。Revoking ZooKeeper recipes wiki定义了可协商的撤销机制。为了撤销mutex, 调用下面的方法: /** * 将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。 * Parameters: * listener - the listener */ public void makeRevocable(RevocationListener listener) 2)不可重入锁Shared Lock 使用InterProcessSemaphoreMutex,调用方法类似,区别在于该锁是不可重入的,在同一个线程中不可重入 3)可重入读写锁Shared Reentrant Read Write Lock 类似JDK的ReentrantReadWriteLock. 一个读写锁管理一对相关的锁。一个负责读操作,另外一个负责写操作。读操作在写锁没被使用时可同时由多个进程使用,而写锁使用时不允许读 (阻塞)。此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。这也意味着写锁可以降级成读锁, 比如请求写锁 —>Read lock-> release write lock. It is impossible to upgrade from a read lock to a write lock. It is mainly implemented by two classes:
InterProcessReadWriteLock InterProcessLock
4) semaphore Shared Semaphore
A counted semaphore is similar to JDK's Semaphore. A set of licenses (permits) maintained by Semaphore in JDK and called Lease in Cubator. Note that all instances must use the same numberOfLeases value. Calling acquire returns a lease object. The client must close these lease objects in finally, otherwise these lease appointments will be lost. However, if the client session is lost for some reason, such as crash, then the lease appointments held by these clients automatically close so that other clients can continue to use these leases. Leases can also be returned in the following ways:
Public void returnAll (Collection leases) public void returnLease (Lease lease)
Note that you can request more than one lease at a time, and if Semaphore's current lease is not enough, the requesting thread will be blocked. At the same time, it also provides overloading methods for timeouts:
Public Lease acquire () public Collection acquire (int qty) public Lease acquire (long time, TimeUnit unit) public Collection acquire (int qty, long time, TimeUnit unit)
The main categories are:
InterProcessSemaphoreV2 Lease SharedCountReader
5) Multi-lock object Multi Shared Lock
Multi Shared Lock is a locked container. When acquire is called, all locks are acquire, and if the request fails, all locks are release. Similarly, when calling release, all locks are release (failure ignored). Basically, it is the representative of the group lock, and the request release operation on it is passed to all the locks it contains. It mainly involves two categories:
InterProcessMultiLock InterProcessLock
Its constructor needs to contain a collection of locks, or a set of path for ZooKeeper.
Public InterProcessMultiLock (List locks) public InterProcessMultiLock (CuratorFramework client, List paths)
Fence
Barrier1) the barrierPath parameter in the DistributedBarrier constructor is used to determine a fence, as long as the barrierPath parameter is the same (the path is the same). In general, the use of fences is as follows:
1. Master client sets up a fence
two。 Other clients will call waitOnBarrier () to wait for the fence to be removed, and the program handles thread blocking.
3. The host client removes the fence, and the processors of other clients continue to run at the same time.
The main methods of the DistributedBarrier class are:
SetBarrier ()-sets the fence
WaitOnBarrier ()-wait for the fence to be removed
RemoveBarrier ()-remove the fence
2) double fence Double Barrier
Double fences allow clients to synchronize at the beginning and end of the calculation. When enough processes are added to the double fence, the process begins to calculate, and when the calculation is complete, leave the fence. The double-fence class is the DistributedDoubleBarrier DistributedDoubleBarrier class that realizes the function of double-fence. Its constructor is as follows:
/ / client-the client / / barrierPath-path to use / / memberQty-the number of members in the barrier public DistributedDoubleBarrier (CuratorFramework client, String barrierPath, int memberQty)
MemberQty is the number of members, and when the enter method is called, the members are blocked until all members call enter. When the leave method is called, it also blocks the calling thread until all members call leave.
Note: the value of the parameter memberQty is only a threshold, not a limit. When the number of waiting bars is greater than or equal to this value, the fence will open!
Like DistributedBarrier, the barrierPath parameter of double fence is also used to determine whether it is the same fence. The use of double fence is as follows:
1. Create a double fence (DistributedDoubleBarrier) on the same path from multiple clients, then call the enter () method and wait for the number of fences to reach memberQty before entering the fence.
two。 When the number of fences reaches memberQty, multiple clients stop blocking and continue to run until the leave () method is executed, waiting for memberQty to block into the leave () method at the same time.
The number of 3.memberQty fences blocks into the leave () method at the same time, and the leave () method of multiple clients stops blocking and continues to run.
The main methods of the DistributedDoubleBarrier class are as follows: enter (), enter (long maxWait, TimeUnit unit)-waiting to enter the fence at the same time
Leave (), leave (long maxWait, TimeUnit unit)-waiting to leave the fence at the same time
Exception handling: DistributedDoubleBarrier monitors the connection status, and the enter () and leave methods throw an exception when the connection is broken.
Counter
Counters can use ZooKeeper to implement a cluster-shared counter. As long as you use the same path, you can get the latest counter value, which is guaranteed by the consistency of ZooKeeper. Curator has two counters, one is counted by int and the other is counted by long.
1) SharedCount
This class uses the int type to count. It mainly involves three categories.
* SharedCount * SharedCountReader * SharedCountListener
SharedCount stands for counter, you can add a SharedCountListener to it, when the counter changes, this Listener can listen for changed events, and SharedCountReader can read the latest values, including literal values and VersionedValue values with version information.
2) DistributedAtomicLong
In addition to having a larger count range than SharedCount, it first tries to set the counter using optimistic locks, and if it is not successful (for example, the counter has been updated by other client during the period), it uses InterProcessMutex to update the count value. This counter has a series of operations:
Get (): get the current value
Increment (): add one
Decrement (): minus one
Add (): increase a specific value
Subtract (): subtract a specific value
TrySet (): try to set the count value
ForceSet (): force the count value to be set
You must check the succeeded () that returns the result, which indicates whether the operation was successful. If the operation is successful, preValue () represents the value before the operation, and postValue () represents the value after the operation.
Thank you for reading, the above is the content of "how to use zookeeper client Curator". After the study of this article, I believe you have a deeper understanding of how to use zookeeper client Curator, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.