In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article focuses on "how to implement the ZooKeeper synchronization framework", interested friends may wish to take a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn how to implement the ZooKeeper synchronization framework.
First, define a synchronization interface, which has an execute method, which is mainly responsible for the implementation of synchronization tasks.
The Path parameter is the task node (user), and only the same node will work synchronously. Imagine going to the bank to withdraw money. If everyone has their own counter, the efficiency is obvious.
The SynchronousProcessor parameter is used to handle the specific business.
Synchronous.java
Package org.bigmouth.nvwa.zookeeper.concurrent; / * synchronization, which supports distributed * * @ author Allen Hu * 2015-4-17 * / public interface Synchronous {/ * synchronization, and distinguishes synchronization work according to path logo. Different path will not be synchronized. * * @ param processing result type * @ param path task node * e.g. "/ project/synchronous/0000001" * @ param processor business processor * @ return processing result * / T execute (String path, SynchronousProcessorprocessor);}
MutexLockSynchronous.java
The implementation class of Synchronous is based on the ordinary exclusive lock.
Package org.bigmouth.nvwa.zookeeper.concurrent; import org.apache.curator.framework.recipes.locks.InterProcessLock;import org.apache.curator.framework.recipes.locks.InterProcessMutex;import org.apache.zookeeper.common.PathUtils;import org.bigmouth.nvwa.zookeeper.ZkClientHolder; / * synchronization based on ordinary exclusive lock * * @ author Allen Hu * 2015-4-17 * / public class MutexLockSynchronous implements Synchronous {private final ZkClientHolder zkClientHolder Public MutexLockSynchronous (ZkClientHolder zkClientHolder) {this.zkClientHolder = zkClientHolder;} @ Override publicT execute (String path, SynchronousProcessorprocessor) {PathUtils.validatePath (path); InterProcessLock lock = new InterProcessMutex (zkClientHolder.get (), path); try {lock.acquire (); if (null! = processor) return processor.process () } catch (Exception e) {if (null! = processor) processor.exceptionCaught (e);} finally {try {lock.release ();} catch (Exception e) {}} return null;}}
SynchronousProcessor.java
Task processor interface, which is implemented to complete specific business work
Package org.bigmouth.nvwa.zookeeper.concurrent; / * synchronous service processor * * @ author Allen Hu * 2015-4-17 * / public interface SynchronousProcessor {/ * handle specific business * * @ return * / T process (); / * * exception catch * * @ param throwable * / void exceptionCaught (Throwable throwable);}
ZkClientHolder.java
Of course, without this, the inherited parent class does not need to know, but defines two abstract methods: doInit and doDestroy methods.
Package org.bigmouth.nvwa.zookeeper; import org.apache.commons.lang.StringUtils;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.bigmouth.nvwa.utils.BaseLifeCycleSupport;import org.slf4j.Logger;import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions / * ZooKeeper client holder * * @ author Allen Hu * 2015-4-16 * / public class ZkClientHolder extends BaseLifeCycleSupport {private static final Logger LOGGER = LoggerFactory.getLogger (ZkClientHolder.class); public static final int MAX_RETRIES = 3; public static final int BASE_SLEEP_TIMEMS = 3000; private CuratorFramework zkClient; private final String connectString; private final int sessionTimeout Public ZkClientHolder (String connectString, int sessionTimeout) {Preconditions.checkArgument (StringUtils.isNotBlank (connectString), "connectString cannot be blank"); Preconditions.checkArgument (sessionTimeout > = 10000, "sessionTimeout must be greater than 10000"); this.connectString = connectString; this.sessionTimeout = sessionTimeout;} public CuratorFramework get () {return zkClient } @ Override protected void doInit () {zkClient = CuratorFrameworkFactory.builder () .sessionTimeoutMs (sessionTimeout) .connectString (connectString) .retryPolicy (new ExponentialBackoffRetry (BASE_SLEEP_TIMEMS, MAX_RETRIES)) .build (); zkClient.start () If (LOGGER.isInfoEnabled ()) {LOGGER.info ("Connected to ZooKepper server: {}", connectString);} @ Override protected void doDestroy () {if (null! = zkClient) zkClient.close ();}}
Finally, a test class is used to simulate the process of multi-thread processing tasks for multiple users, and we achieve the goal of synchronization between the same users.
Package org.bigmouth.nvwa.zookeeper.concurrent; import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors; import org.apache.curator.utils.ZKPaths;import org.bigmouth.nvwa.zookeeper.ZkClientHolder;import org.slf4j.Logger;import org.slf4j.LoggerFactory; / * @ author Allen Hu * 2015-4-17 * / public class ConcurrentTest {private static final Logger LOGGER = LoggerFactory.getLogger (ConcurrentTest.class) Private ZkClientHolder zkClientHolder = new ZkClientHolder ("172.16.3.24 zkClientHolder.init 2181", 60000); private Synchronous synchronous; public ConcurrentTest () {zkClientHolder.init (); synchronous = new MutexLockSynchronous (zkClientHolder);} public class Service implements Runnable {private final String id; private final long sleepInMillis; public Service (String id, long sleepInMillis) {this.id = id This.sleepInMillis = sleepInMillis;} @ Override public void run () {synchronous.execute (ZKPaths.makePath ("/ nvwa/zookeeper/concurrent", id), new SynchronousProcessor () {@ Override public String process () {LOGGER.info (id + "star...") Try {Thread.sleep (sleepInMillis);} catch (InterruptedException e) {e.printStackTrace ();} LOGGER.info (id + "has execution!"); return id } @ Override public void exceptionCaught (Throwable throwable) {throwable.printStackTrace ();}});} static ExecutorService executor = Executors.newCachedThreadPool (); public static void main (String [] args) {ConcurrentTest ct = new ConcurrentTest () Executor.submit (ct.new Service ("1", 5000)); / 1 processing 5 seconds executor.submit (ct.new Service ("1", 2000)); / 1 processing 2 seconds executor.submit (ct.new Service ("2", 5000)); / 2 processing 5 seconds executor.submit (ct.new Service ("3", 10000)) / / processing 10 seconds executor.submit (ct.new Service ("3", 500)); / / processing 0.5 seconds}}
As a result, tasks 1, 2, and 3 are parallel, while the same tasks are serial. Such as: the second 1 and so on the first 1 after the implementation of the start.
At this point, I believe you have a deeper understanding of "how to implement the ZooKeeper synchronization framework". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.