Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

The use of Curator

2025-02-03 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/02 Report--

Curator

In order to better implement the Java operation of the zookeeper server, the Curator framework appeared later, which is very powerful. It is now a top-level project of Apache, which provides more rich operations, such as session timeout reconnection, master-slave election, distributed counters, distributed locks and so on. API encapsulation suitable for a variety of complex zookeeper scenarios

1 use of Curator framework (1)

The chained programming style is used in the Curator framework for greater readability, and factory methods are used to create connection objects.

1. Using two static factory methods of CuratorFrameworkFactory (with different parameters) to implement

1.1 connectString: connection string

1.2 retryPolicy: retry the connection policy. There are four implementations, which are: ExponentialBackoffRetry, RetryNTimes, RetryOneTimes, RetryUntilElapsed

1.3sessionTimeoutMs: session timeout. Default is 60000ms.

1.4connectionTimeoutMs connection timeout. Default is 15000ms.

Note that users can customize the implementation of the retryPolicy policy through an interface.

2 use of Curator framework (2)

2.1 create a connection

/ * * retry strategy: initial time is 1s, retry 10 times * /

RetryPolicy retryPolicy = new ExponentialBackoffRetry (1000, 10)

/ * create a connection through the factory * /

CuratorFramework cf = CuratorFrameworkFactory.builder ()

.connectString (ZK_ADDR)

.sessionTimeoutMs (SESSION_TIMEOUT)

.retryPolicy (retryPolicy)

.build ()

/ * * Open connection * /

Cf.start ()

2.2 New nodes

/ * *

* New node: specify node type (default is persistent type node without withMode), path, data content

* 1.creatingParentsIfNeeded () recursively creates a parent directory

* 2.withMode () node type (persistent | temporary)

* 3.forPath () specifies the path

, /

Cf.create ()

.creatingParentsIfNeutral ()

.withMode (CreateMode.PERSISTENT)

.forPath ("/ super/c1", "C1 content" .getBytes ())

2.3 Delete Node

/ * *

* Delete a node

* 1.deletingChildrenIfNeeded () Recursive deletion

* 2.guaranteed () ensures that the node is deleted

* 3. WithVersion (int version) / / specific version number

, /

Cf.delete (). DeletingChildrenIfNeeded (). ForPath ("/ super")

2.4 read and modify data

/ * *

* read and modify data: getData () and setData ()

, /

Cf.create () .creatingParentsIfNeeded () .withMode (CreateMode.PERSISTENT) .forPath ("/ super/c1", "C1 content" .getBytes ())

Cf.create () .creatingParentsIfNeeded () .withMode (CreateMode.PERSISTENT) .forPath ("/ super/c2", "c2 content" .getBytes ())

/ * * read node content * /

String c2_data = new String (cf.getData () .forPath ("/ super/c2"))

System.out.println ("c2percent data copyright->" + c2_data)

/ * * modify node content * /

Cf.setData () .forPath ("/ super/c2", "modify the contents of c2" .getBytes ())

String update_c2_data = new String (cf.getData () .forPath ("/ super/c2"))

System.out.println ("update_c2_data-- >" + update_c2_data)

2.5 bind callback function

ExecutorService pool = Executors.newCachedThreadPool ()

Cf.create (). CreatingParentsIfNeeded (). WithMode (CreateMode.PERSISTENT)

.inBackground (new BackgroundCallback () {

@ Override

Public void proce***esult (CuratorFramework cf, CuratorEvent event)

Throws Exception {

System.out.println ("code-- >" + event.getResultCode ())

System.out.println ("type-- >" + event.getType ())

System.out.println ("thread is-- >" + Thread.currentThread () .getName ())

}

}, pool) .forPath ("/ super/c3", "contents of c2" .getBytes ())

System.out.println ("main thread->" + Thread.currentThread () .getName ())

Thread.sleep (Integer.MAX_VALUE)

2.6 read the child node and determine whether the node exists

/ * *

* method for reading child nodes: getChildren ()

* determine whether the node exists: checkExists ()

, /

List list = cf.getChildren () .forPath ("/ super")

For (String p: list) {

System.out.println (p)

}

/ / if the identity for null does not exist

Stat stat = cf.checkExists () .forPath ("/ super/c4")

System.out.println (stat)

3. Use of Curator framework (3)

If you want to use a snooping feature like Wather, Curator must rely on a jar package, and Maven depends on

Org.apache.curator

Curator-recipes

2.4.2

With this dependency package, use NodeCache to register a listening cache in the client instance, and then implement the corresponding listening method. There are two main listening methods.

NodeCacheListener: add and modify operations of listening nodes

PathChildrenCacheListener: listens to add, modify and delete operations of child nodes

4 Curator usage scenario

4.1 distributed Lock

In distributed scenarios, in order to ensure data consistency, synchronization is often needed at some point where the program is running (java provides synchronized or Reentrantlock implementation). For example, look at a small example, which has the problem of distributed non-synchronization.

For example, it used to access a program under high concurrency, but now it accesses multiple server nodes under high concurrency (distributed).

The distributed locks provided by Curator based on zookeeper features are used to deal with data consistency in distributed scenarios. The distribution of zookeeper itself has a writing problem, which has been encountered before. It is highly recommended to use Curator distributed locks here.

Public class Lock2 {/ * * zk address * / private static final String ZK_ADDR = "192.168.1.127 session 2181"; / * * session timeout * / private static final int SESSION_TIMEOUT = 5000; / / MSstatic int count = 10 CuratorFramework cf = CuratorFrameworkFactory.builder (). ConnectString (ZK_ADDR) .sessionTimeoutMs (SESSION_TIMEOUT) .retryPolicy (new ExponentialBackoffRetry (1000, 10). Build (); return cf } public static void main (String [] args) throws Exception {final CountDownLatch countDown = new CountDownLatch (1); for (int I = 0; I

< 10; i++) {new Thread(new Runnable() {@Overridepublic void run() {CuratorFramework cf = createCuratorFramework();cf.start();//锁对象 client 锁节点final InterProcessMutex lock = new InterProcessMutex(cf, "/super");try {countDown.await();lock.acquire(); //获得锁number();Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();} finally {try {lock.release();//释放锁} catch (Exception e) {e.printStackTrace();}}}},"t" + i).start();;}Thread.sleep(2000);countDown.countDown();} public static void number() {count--;System.out.println(Thread.currentThread().getName() + "-->

"+ count);}}

4.2 distributed counter function

When it comes to distributed counters, you may think of the classic method of AtomicInteger (atomic accumulation). It is no problem if you aim at a JVM scenario, but now in a distributed scenario, you need to use the DistributedAtomicInteger of the Curator framework.

Public class CuratorAtomicInteger {/ * * zk address * / private static final String ZK_ADDR = "192.168.1.127 session 2181"; / * session timeout * / private static final int SESSION_TIMEOUT = 5000; / / MSpublic static void main (String [] args) throws Exception {CuratorFramework cf = CuratorFrameworkFactory.builder (). ConnectString (ZK_ADDR) .sessionTimeoutMs (SESSION_TIMEOUT) .retryPolicy (new ExponentialBackoffRetry (1000, 10). Build (); cf.start () / use DistributedAtomicIntegerDistributedAtomicInteger atomicInteger = new DistributedAtomicInteger (cf, "/ superM", new RetryNTimes (3, 1000)); / / atomicInteger.increment (); atomicInteger.add (1); AtomicValue atomicValue = atomicInteger.get (); System.out.println ("atomicValue.succeeded ()->" + atomicValue.succeeded ()); System.out.println ("atomicValue.postValue ()->" + atomicValue.postValue ()); System.out.println ("atomicValue.preValue ()->" + atomicValue.preValue ());}}

4.3 Barrier

4.3.1 DistributedDoubleBarrier

Distributed Barrier class DistributedDoubleBarrier: it blocks waiting processes on all nodes until one is satisfied, and then all nodes start at the same time. It doesn't care who runs first and who runs later, but in the end it must be an exit.

Public class CuratorBarrier {/ * * zk address * / private static final String ZK_ADDR = "192.168.1.220 private static final String ZK_ADDR 2181192.168.1.127 private static final String ZK_ADDR 2181192.168.1.128public class CuratorBarrier 2181"; / * session timeout * / private static final int SESSION_TIMEOUT = 5000; / / MSpublic static void main (String [] args) throws Exception {for (int I = 0; I < 5) New Thread (new Runnable () {@ Overridepublic void run () {try {/ * * instantiate 5 client objects * / CuratorFramework cf = CuratorFrameworkFactory.builder (). ConnectString (ZK_ADDR) .sessionTimeoutMs (SESSION_TIMEOUT). RetryPolicy (new ExponentialBackoffRetry (1000, 10). Build (); cf.start (); DistributedDoubleBarrier barrier = new DistributedDoubleBarrier (cf, "/ superBarrier", 5); Thread.sleep (1000 * (new Random ()) .nextInt (3)) System.out.println (Thread.currentThread () .getName () + "ready!") ; barrier.enter (); System.out.println ("start running at the same time..."); Thread.sleep (1000 * (new Random ()) .nextInt (3)); System.out.println ("run finished..."); barrier.leave (); System.out.println ("quit running at the same time");} catch (Exception e) {e.printStackTrace ();}, "t" + I). Start ();;}

4.3.2 DistributedBarrier

Distributed Barrier class DistributedBarrier: it blocks waiting processes on all nodes (all nodes enter the pending state) until "someone whistles" to start execution, and then all nodes start at the same time

Public class CuratorBarrier2 {/ * * zk address * / private static final String ZK_ADDR = "192.168.1.220 session 2181192.168.1.127 session 2181"; / * session timeout * / private static final int SESSION_TIMEOUT = 5000; / / MSstatic DistributedBarrier barrier = null;public static void main (String [] args) throws Exception {for (int I = 0; I < 5) New Thread (new Runnable () {@ Overridepublic void run () {try {/ * * instantiate 5 client objects * / CuratorFramework cf = CuratorFrameworkFactory.builder (). ConnectString (ZK_ADDR) .sessionTimeoutMs (SESSION_TIMEOUT). RetryPolicy (new ExponentialBackoffRetry (1000, 10). Build (); cf.start (); barrier = new DistributedBarrier (cf, "/ superBarrier"); System.out.println (Thread.currentThread (). GetName () + "set barrier"); barrier.setBarrier () / / set barrier.waitOnBarrier (); / / wait for System.out.println ("start executing program...");} catch (Exception e) {e.printStackTrace ();}, "t" + I). Start ();;} Thread.sleep (5000); barrier.removeBarrier (); / / release}}

5 Curator retry strategy

Several retry strategies implemented within Curator:

1.ExponentialBackoffRetry: retries the specified number of times, and the pause time between each retry increases gradually.

2.RetryNTimes: a retry policy that specifies the maximum number of retries

3.RetryOneTime: retry only once

4.RetryUntilElapsed: try again until the specified time is reached

5.1 ExponentialBackoffRetry

ExponentialBackoffRetry (int baseSleepTimeMs, int maxRetries)

ExponentialBackoffRetry (int baseSleepTimeMs, int maxRetries, int maxSleepMs)

Parameter description

1.baseSleepTimeMs initial sleep time

Maximum number of 2.maxRetries retries

3.maxSleepMs maximum retry time

5.2 RetryNTimes

RetryNTimes (int n, int sleepMsBetweenRetries)

Parameter description

1.N maximum number of retries

The interval between each retry of 2.sleepMsBetweenRetries

5.3 RetryOneTime

RetryOneTime (int sleepMsBetweenRetry)

Parameter description

1.sleepMsBetweenRetry is the time between retries

5.4 RetryUntilElapsed

RetryUntilElapsed (int maxElapsedTimeMs, int sleepMsBetweenRetries)

Parameter description

1.maxElapsedTimeMs maximum retry time

The interval between each retry of 2.sleepMsBetweenRetries

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report