In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-14 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article introduces the knowledge of "how to implement the Zookeeper Queue queue". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
1: Barries: fence, meet and know each other.
2:Queue:Queue is what we call the queue.
1:Barries:
1.1: it means that all the sites have reached the barrier before the follow-up calculation can be carried out.
1.2: all threads cannot leave barrier until they have completed their own calculations.
Enter the fence: 1, create a new root node "/ root" 2, want to enter the barrier thread under the "/ root" to establish a word node "/ root/c-i" 3, cycle to listen for changes in the number of "/ root" child nodes, every time it reaches Size, it means that there are Size threads have met the requirements of Barrier.
2:Queue: a model of a producer or consumer
Leave Barrier 1: if you want to leave the Barrier site, delete the child node 2 created under "/ root": loop monitor "/ root" the change in the number of child nodes, when the Size is reduced to 0, it can leave.
3: implementation of Queue queue
1: establish a root node "/ root" 2: the production thread establishes a SEQUENTAIL node under "/ root" 3: the consumer thread checks "/ root" if there is no loop to listen for changes in the "/ root" node until it has its own child node, deleting the sequence number of the smallest node. Package sync; import java.io.IOException;import java.net.InetAddress;import java.net.UnknownHostException;import java.nio.ByteBuffer;import java.util.List;import java.util.Random; import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.data.Stat; public class SyncPrimitive implements Watcher {static ZooKeeper zk = null; static Integer mutex String root; / / synchronization primitive SyncPrimitive (String address) {if (zk = = null) {try {System.out.println ("Starting ZK:"); / / establish a Zookeeper connection and specify watcher zk = new ZooKeeper (address, 3000, this) / / initialize lock object mutex = new Integer (- 1); System.out.println ("Finished starting ZK:" + zk);} catch (IOException e) {System.out.println (e.toString ()); zk = null }} @ Override synchronized public void process (WatchedEvent event) {synchronized (mutex) {/ / when an event occurs, notify is called to enable other wait () points to continue mutex.notify ();}} static public class Barrier extends SyncPrimitive {int size; String name Barrier (String address, String root, int size) {super (address); this.root = root; this.size = size; if (zk! = null) {try {/ / A barrier set up a root directory Stat s = zk.exists (root, false) / / do not register watcher if (s = = null) {zk.create (root, new byte [0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) } catch (KeeperException e) {System.out.println ("keeper exception when instantiating queue:" + e.toString ());} catch (InterruptedException e) {System.out.println ("Interrupted exception.") }} try {/ / get your hostname name = new String (InetAddress.getLocalHost () .getCanonicalHostName (). ToString ());} catch (UnknownHostException e) {System.out.println (e.toString ()) }} boolean enter () throws KeeperException, InterruptedException {/ / create a child node under the root directory. Both create and delete trigger children wathes so that getChildren will be notified and process () will be called zk.create (root + "/" + name, new byte [0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL) Wait until the number of child nodes in the root directory reaches size, the function exits while (true) {synchronized (mutex) {List list = zk.getChildren (root, true); if (list.size ())
< size) { mutex.wait(); //释放mutex上的锁 } else { return true; } } } } boolean leave() throws KeeperException, InterruptedException { //删除自己创建的节点 zk.delete(root + "/" + name, 0); //一直等,直到根目录下有子节点时,函数退出 while (true) { synchronized (mutex) { List list = zk.getChildren(root, true); if (list.size() >0) {mutex.wait ();} else {return true;} static public class Queue extends SyncPrimitive {Queue (String address, String name) {super (address); this.root = name If (zk! = null) {try {/ / A queue set up a root directory Stat s = zk.exists (root, false) If (s = = null) {zk.create (root, new byte [0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) } catch (KeeperException e) {System.out.println ("keeper exception when instantiating queue:" + e.toString ());} catch (InterruptedException e) {System.out.println ("Interrupted exception.") } / / Parameter I is the data boolean produce (int I) throws KeeperException of the node to be created, InterruptedException {ByteBuffer b = ByteBuffer.allocate (4); byte [] value; b.putInt (I); value = b.array () Create a child node under the root directory. Because it is SEQUENTIAL, the node created first has a smaller sequence number zk.create (root + "/ element", value, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); return true;} int consume () throws KeeperException, InterruptedException {int retvalue =-1 Stat stat = null; while (true) {synchronized (mutex) {List list = zk.getChildren (root, true) / / there is no guarantee that list [0] is the lowest sequence number / / if there are no child nodes in the root directory, wait for if (list.size () = = 0) {System.out.println ("Going to wait"); mutex.wait () } / / find the node with the lowest sequence number and delete it else {Integer min = new Integer (list.get (0). Substring (7); for (String s: list) {Integer tmp = new Integer (s.substring (7)) If (tmp < min) min = tmp;} System.out.println ("Temporary value:" + root + "/ element" + min) Byte [] b = zk.getData (root + "/ element" + min, false, stat); zk.delete (root + "/ element" + min, 0); ByteBuffer buffer = ByteBuffer.wrap (b); retvalue = buffer.getInt () Return retvalue;}} public static void main (String [] args) {if (args [0] .equals ("qTest")) queueTest (args); else barrierTest (args) } private static void barrierTest (String [] args) {Barrier b = new Barrier (args [1], "/ b1", new Integer (args [2])); try {boolean flag = b.enter (); System.out.println ("Enter barrier:" + args [2]); if (! flag) System.out.println ("Error when entering the barrier") } catch (KeeperException e) {} catch (InterruptedException e) {} Random rand = new Random (); int r = rand.nextInt (100); for (int I = 0; I < r; iTunes +) {try {Thread.sleep (100) } catch (InterruptedException e) {} try {b.leave ();} catch (KeeperException e) {} catch (InterruptedException e) {} System.out.println ("Left barrier");} private static void queueTest (String [] args) {Queue Q = new Queue (args [1], "/ app1") System.out.println ("Input:" + args [1]); int i; Integer max = new Integer (args [2]); if (args [3] .equals ("p")) {System.out.println ("Producer"); for (I = 0; I < max; iTunes +) try {q.produce (10 + 1) } catch (KeeperException e) {} catch (InterruptedException e) {}} else {System.out.println ("Consumer"); for (I = 0; I < max; iTunes +) try {int r = q.consume () System.out.println ("Item:" + r);} catch (KeeperException e) {iMeltel;} catch (InterruptedException e) {} "how to implement the Zookeeper Queue queue" ends here. Thank you for your reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.