Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the logic of TopicLookup request processing in Apache Pulsar

2025-02-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article focuses on "what is the logic of TopicLookup request processing in Apache Pulsar". Interested friends may wish to have a look at it. The method introduced in this paper is simple, fast and practical. Now let the editor take you to learn "what is the logic of TopicLookup request processing in Apache Pulsar?"

The actual core logic is these two lines of code

LookupOptions options = LookupOptions.builder () .authorship (authoritative) .accounsedListenerName (advertisedListenerName) .loadTopicsInBundle (true) / / the condition here is true. Build (); pulsarService.getNamespaceService (). GetBrokerServiceUrlAsync (topicName, options)

The parameters passed here set loadTopicsInBundle to true. Let's see if there is loadtopic logic during the processing of the lookup request.

NamespaceService.findBrokerServiceUrl

We have noticed that ownershipCache.getOwnerAsync and searchForCandidateBroker are not detailed in this function.

Let's take a look at ownershipCache first.

Private CompletableFuture findBrokerServiceUrl (NamespaceBundle bundle, LookupOptions options) {.... Return targetMap.computeIfAbsent (bundle, (k)-> {... OwnershipCache.getOwnerAsync (bundle) .thenAccept (nsData-> {/ / nsData: Optional if (! nsData.isPresent ()) {. / / currently no one is responsible for this bundle trying to find the owner of this bundle. Pulsar.getExecutor () .execute (()-> {searchForCandidateBroker (bundle) Future, options) });.});} OwnerShipCache class

You can know the main functions of this class from javadoc.

Ownership information about service unit in cache zk

Provide the read and write function of zk

Can be used to find owner information

Can be used to get the ownership of a service unit

GetOwnerAsync is mainly used to check whether there is information in the zk cache. If there is no information, try to read the zk node.

If the node has information, it means someone has got the ownership of this bundle.

If this node is the current machine, the information of bundle load will be notified to listener.

If there is no information on this node, no one is currently responsible for the bundle.

/ / org.apache.pulsar.broker.namespace.OwnerShipCachepublic CompletableFuture getOwnerAsync (NamespaceBundle suName) {/ / the path here is / namespace/ {namespace} / 0x {lowerEndpoint} _ 0x {upperEndpoint} String path = ServiceUnitZkUtils.path (suName); / / ownedBundleFuture is still an AsyncLoadingCache / / this cache information will not be attempted to load, because the called getIfPresent CompletableFuture ownedBundleFuture = ownedBundlesCache.getIfPresent (path) / / if there is anything before, it means that the current broker is owner (this part of the logic is in the loading code of cache, which will be said later) if (ownedBundleFuture! = null) {/ / Either we're the owners or we're trying to become the owner. Return ownedBundleFuture.thenApply (serviceUnit-> {/ / We are the owner of the service unit return Optional.of (serviceUnit.isActive ()? SelfOwnerInfo: selfOwnerInfoDisabled);});} / / if it's not in the cache, let's confirm who the current owner is. / / If we're not the owner, we need to check if anybody else is return resolveOwnership (path) .thenApply (optional-> optional.map (Map.Entry::getKey)) } private CompletableFuture resolveOwnership (String path) {return ownershipReadOnlyCache.getWithStatAsync (path) / / this logic reads the contents under the bundle path from zk. ThenApply (optionalOwnerDataWithStat-> {/ / if there is data under this path It means that someone has successfully obtained the ownership information of this bundle if (optionalOwnerDataWithStat.isPresent ()) {Map.Entry ownerDataWithStat = optionalOwnerDataWithStat.get () Stat stat = ownerDataWithStat.getValue (); / / if the owner of this zk temporary node is the current broker if (stat.getEphemeralOwner () = = localZkCache.getZooKeeper (). GetSessionId ()) {LOG.info ("Successfully reestablish ownership of {}", path) / / here is the logic OwnedBundle ownedBundle = new OwnedBundle (ServiceUnitZkUtils.suBundleFromPath (path, bundleFactory)); if (selfOwnerInfo.getNativeUrl (). Equals (ownerDataWithStat.getKey (). GetNativeUrl ()) {ownedBundlesCache.put (path, CompletableFuture.completedFuture (ownedBundle) } ownershipReadOnlyCache.invalidate (path); / / callback (independent of the main logic) namespaceService.onNamespaceBundleOwned (ownedBundle.getNamespaceBundle ()) will be notified here }} / / what is returned here is an Optional object. If this node does not exist, the return is actually an Empty / / indicating that no one is responsible for the bundle / / at this time and may also return an optional with information. At this point, the broker responsible for this node may be the current machine or another machine. Return optionalOwnerDataWithStat;);}

Let's take a look at the situation if no one is responsible for the bundle.

NamespaceService.searchForCandidateBroker

The logic of this method is to select which broker is the owner of the current bundle.

It is mainly selected by LeaderElectionService and LoadManager.

If the selected broker is native, an attempt is made to get the ownership of the bundle.

If it is another machine, the request will be forwarded to another machine, requesting the other machine to get the ownership.

Private void searchForCandidateBroker (NamespaceBundle bundle, CompletableFuture lookupFuture, LookupOptions options) {. / / will first select the possible broker node String candidateBroker = null; of this bundle according to certain logic. Boolean authoritativeRedirect = les.isLeader (); try {/ / check if this is Heartbeat or SLAMonitor namespace. If (candidateBroker = = null) {if (options.isAuthoritative ()) {/ / leader broker already assigned the current broker as owner candidateBroker = pulsar.getSafeWebServiceAddress () } else / / if this LeaderElectionService is leader | | / / non-centralized loadManager (this is for load balancing) | | / / if the broker of the current leader is not the if of active (! this.loadManager.get () .isCentralized ()) | | pulsar.getLeaderElectionService () .isLeader () / / If leader is not active | Fallback to pick the least loaded from current broker loadmanager | |! isBrokerActive (pulsar.getLeaderElectionService (). GetCurrentLeader (). GetServiceUrl ()) {/ / choose the lightest broker from loadManager Optional availableBroker = getLeastLoadedFromLoadManager (bundle) If (! availableBroker.isPresent ()) {lookupFuture.complete (Optional.empty ()); return;} candidateBroker = availableBroker.get (); authoritativeRedirect = true } else {/ / forward to leader broker to make assignment candidateBroker = pulsar.getLeaderElectionService () .getCurrentLeader () .getServiceUrl () } catch (Exception e) {.} / / so far a candidate broker address has been selected try {checkNotNull (candidateBroker) / / if the candidate broker is the current machine if (candidateBroker.equals (pulsar.getSafeWebServiceAddress () {. / / use ownerShipCache to try to get the bundle's ownership ownershipCache.tryAcquiringOwnership (bundle) .thenAccept (ownerInfo-> {. . / / this is what I said at the beginning of the article whether you need all the topic if (options.isLoadTopicsInBundle ()) {/ / Schedule the task to pre-load topics pulsar.loadNamespaceTopics (bundle) of load in bundle. } / / find the target / / go here to indicate that the current broker has been used as the owner of this bundle, and directly return the local information to the requestor lookupFuture.complete (Optional.of (new LookupResult (ownerInfo) Return;}}) .automatically (exception-> {...}) } else {. / / here the lookup request is forwarded to other broker / / Load managed decider some other broker should try to acquire ownership / / Now setting the redirect url createLookupResult (candidateBroker, authoritativeRedirect) Options.getAdvertisedListenerName () .thenAccept (lookupResult-> lookupFuture.complete (Optional.of (lookupResult) .automatically (ex-> {lookupFuture.completeExceptionally (ex)) Return null;});}} catch (Exception e) {...}} OwnershipCache.tryAcquiringOwnership

Here is the logic for trying to get the ownership of this bundle.

You only need to record the information of the current node on zk.

(there will also be logic for maintaining this cache)

Public CompletableFuture tryAcquiringOwnership (NamespaceBundle bundle) throws Exception {String path = ServiceUnitZkUtils.path (bundle); CompletableFuture future = new CompletableFuture (); LOG.info ("Trying to acquire ownership of {}", bundle); / / get is called here, and this method triggers the logic of cache loading. / / Doing a get () on the ownedBundlesCache will trigger an async ZK write to acquire the lock over the / / service unit ownedBundlesCache.get (path) .thenAccept (namespaceBundle-> {/ / indicates that the ownership of this bundle has been obtained and is returned directly. LOG.info (Successfully acquired ownership of {}, path); namespaceService.onNamespaceBundleOwned (bundle); future.complete (selfOwnerInfo) }) .discretionally (exception-> {/ / if there is a problem during loading (maybe someone else has become leader) / / Failed to acquire ownership if (exception instanceof CompletionException & & exception.getCause () instanceof KeeperException.NodeExistsException) {/ / Confirm who the current leader is resolveOwnership (path) .thenAccept (optionalOwnerDataWithStat-> {/ / here you will get the node information if (optionalOwnerDataWithStat.isPresent ()) {Map.Entry ownerDataWithStat = optionalOwnerDataWithStat.get ()) of the previously successfully obtained ownership. NamespaceEphemeralData ownerData = ownerDataWithStat.getKey (); Stat stat = ownerDataWithStat.getValue () If (stat.getEphemeralOwner ()! = localZkCache.getZooKeeper () .getSessionId ()) {LOG.info ("Failed to acquire ownership of {}-- Already owned by broker {}", path, ownerData) } / / return directly to future.complete (ownerData);} else {...} {}) .automatically (ex-> {.... });} else {...} return null;}); return future;} OwnershipCache loading logic

The logic here is relatively simple. Serialize the connection information of the local machine and write it to the path of this bundle.

Private class OwnedServiceUnitCacheLoader implements AsyncCacheLoader {@ SuppressWarnings ("deprecation") @ Override public CompletableFuture asyncLoad (String namespaceBundleZNode, Executor executor) {if (LOG.isDebugEnabled ()) {LOG.debug ("Acquiring zk lock on namespace {}", namespaceBundleZNode);} byte [] znodeContent; try {znodeContent = jsonMapper.writeValueAsBytes (selfOwnerInfo) } catch (JsonProcessingException e) {/ / Failed to serialize to JSON return FutureUtil.failedFuture (e);} CompletableFuture future = new CompletableFuture () ZkUtils.asyncCreateFullPathOptimistic (localZkCache.getZooKeeper (), namespaceBundleZNode, znodeContent, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, (rc, path, ctx Name)-> {if (rc = = KeeperException.Code.OK.intValue ()) {if (LOG.isDebugEnabled ()) {LOG.debug ("Successfully acquired zk lock on {}", namespaceBundleZNode) } ownershipReadOnlyCache.invalidate (namespaceBundleZNode); future.complete (new OwnedBundle (ServiceUnitZkUtils.suBundleFromPath (namespaceBundleZNode, bundleFactory) } else {/ / Failed to acquire lock future.completeExceptionally (KeeperException.create (rc));}}, null); return future;}} load all topic under bundle

We can already get bundle's ownership from here. Let's take a look at the logic that loaded all the topic before.

PulsarService.loadNamespaceTopics

Public void loadNamespaceTopics (NamespaceBundle bundle) {executor.submit (()-> {NamespaceName nsName = bundle.getNamespaceObject (); List persistentTopics = Lists.newArrayList (); long topicLoadStart = System.nanoTime (); for (String topic: getNamespaceService (). GetListOfPersistentTopics (nsName). Join ()) {try {TopicName topicName = TopicName.get (topic) If (bundle.includes (topicName)) {/ / here will create a Topic object and store it in BrokerService / / later in this section, we will talk about initialization CompletableFuture future = brokerService.getOrCreateTopic (topic) in ManagedLedger. If (future! = null) {persistentTopics.add (future);}.}. Return null;);}

NamespaceService.getListOfPersistentTopics

It's easier here.

Just read all the child nodes of / managed-ledgers/%s/persistent of zk.

Public CompletableFuture getListOfPersistentTopics (NamespaceName namespaceName) {/ / For every topic there will be a managed ledger created. String path = String.format ("/ managed-ledgers/%s/persistent", namespaceName); if (LOG.isDebugEnabled ()) {LOG.debug ("Getting children from managed-ledgers now: {}", path);} return pulsar.getLocalZkCacheService (). ManagedLedgerListCache (). GetAsync (path) .thenApply (znodes-> {List topics = Lists.newArrayList ()) For (String znode: znodes) {topics.add (String.format ("persistent://%s/%s", namespaceName, Codec.decode (znode));} topics.sort (null); return topics;}) } at this point, I believe you have a deeper understanding of "what is the logic of TopicLookup request processing in Apache Pulsar". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report