Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the TopicLookup request processing method?

2025-02-14 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

The main content of this article is to explain "what is the method of handling TopicLookup requests". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn "what is the TopicLookup request processing method"?

Simple logical explanation

Determine the namespace by topic name

Find the bundle allocation information for this namespace

Confirm which bundle the topic belongs to according to the bundle assignment information

Confirm which broker is in charge of the bundle based on the bundle information, and return the address of the broker.

CommandLookup is mainly used to find out which broker is in charge of Topic.

The general client can query through the http protocol or the binary protocol.

Message CommandLookupTopic {/ / topic name required string topic = 1; / / Network layer request id required uint64 request_id = 2; optional bool authoritative = 3 [default = false]; / / TODO-Remove original_principal, original_auth_data, original_auth_method / / Original principal that was verified by / / a Pulsar proxy. Optional string original_principal = 4; / / Original auth role and auth Method that was passed / / to the proxy. Optional string original_auth_data = 5; optional string original_auth_method = 6; / / from which specified connection point optional string advertised_listener_name = 7;}

Look directly at the server code ServerCnx here.

Protected void handleLookup (CommandLookupTopic lookup) {final long requestId = lookup.getRequestId (); final boolean authoritative = lookup.isAuthoritative (); final String advertisedListenerName = lookup.hasAdvertisedListenerName ()? Lookup.getAdvertisedListenerName (): null; / / verify topic name TopicName topicName = validateTopicName (lookup.getTopic (), requestId, lookup); if (topicName = = null) {return;} / / where Semaphore is the current limiter final Semaphore lookupSemaphore = service.getLookupRequestSemaphore () requested by the server Lookup If (lookupSemaphore.tryAcquire ()) {.... IsTopicOperationAllowed (topicName, TopicOperation.LOOKUP) .thenApply (isAuthorized-> {/ / authenticated if (isAuthorized) {lookupTopicAsync (getBrokerService (). Pulsar (), topicName, authoritative, getPrincipal ()) GetAuthenticationData (), requestId, advertisedListenerName) .handle (lookupResponse, ex)-> {if (ex = = null) {ctx.writeAndFlush (lookupResponse) } else {.... } lookupSemaphore.release (); return null;});} else {.... }) .automatically (ex-> {.... });} else {/ / if an exception is sent, `CommandLookupTopicResponse` / / this is the new way to define binary messages / Wire format / / [TOTAL_SIZE] [CMD_SIZE] [CMD] ctx.writeAndFlush (newLookupErrorResponse (ServerError.TooManyRequests, "Failed due to too many pending lookup requests", requestId)) }} TopicLookupBase.lookupTopicAsync

Org.apache.pulsar.broker.lookup.TopicLookupBase#lookupTopicAsync

This is a static method.

Main

Validation verifies the cluster, topic name, etc. (there is a logic for cross-cluster checking. Skip it first)

Lookup logic

The logic of the check is skipped here, and the actual core logic is on the following two lines.

LookupOptions options = LookupOptions.builder () .authorship (authoritative) .accounsedListenerName (advertisedListenerName) .loadTopicsInBundle (true) / / the condition here is true. Build (); pulsarService.getNamespaceService (). GetBrokerServiceUrlAsync (topicName, options)

The main logic is in NamespaceService. PulsarService can be thought of as a global object, any core logic object needed by pulsar.

(for example, NamspaceService,BrokerService,ConfigurationCacheService, etc.) you can get it from this object.

NamespaceService.getBrokerServiceUrlAsync

The main logic here is

Locate the namespace based on the passed topic name

Then confirm which NamespaceBundle the topic belongs to.

Then find the address of the bundle's owner broker according to the NamespaceBundle.

Public CompletableFuture getBrokerServiceUrlAsync (TopicName topic, LookupOptions options) {.... CompletableFuture future = getBundleAsync (topic) .thenCompose (bundle-> findBrokerServiceUrl (bundle, options));} public CompletableFuture getBundleAsync (TopicName topic) {return bundleFactory.getBundlesAsync (topic.getNamespaceObject ()) .thenApply (bundles-> bundles.findBundle (topic));}

The bundleFactory in this is actually an asynchronously loaded cache.

Let's look at the definition.

/ / org.apache.pulsar.common.naming.NamespaceBundleFactoryprivate final AsyncLoadingCache bundlesCache / / public NamespaceBundleFactory (PulsarService pulsar, HashFunction hashFunc) {/ / .this.bundlesCache = Caffeine.newBuilder () .recordStats () / record metric .buildAsync (/ / load the logic of cache (NamespaceName namespace, Executor executor)-> {String path = AdminResource.joinPath (LOCAL_POLICIES_ROOT, namespace.toString () .... CompletableFuture future = new CompletableFuture () / / Read the static bundle data from the policies pulsar .getLocalZkCacheService () / / get LocalZooKeeperCacheService .policiesCache () .getWithStatAsync (path) .thenAccept (result-> {/ / this is actually to find out whether the number of bundle is configured separately for this namespace. BundlesData bundlesData = result.map (Entry::getKey) .map (p-> p.bundles) .orElse (null) / / get namespaceBundle NamespaceBundles namespaceBundles = getBundles (namespace, bundlesData, result.map (Entry::getValue) .map (s-> s.getVersion ()) .orElse (- 1) via namespace;.... Future.complete (namespaceBundles);}) .automatically (ex-> {future.completeExceptionally (ex); return null;}); return future;}); / /.}

Let's briefly talk about the NamespaceBundles class, which holds all the NamespaceBundle of the Namespace, providing an aggregated view.

This class represents a hash ring, which is divided into several fragments according to the number of fragments configured.

Each broker will follow a certain algorithm to determine which part of the ring belongs to him.

Topic will also be assigned to this hash ring according to a certain algorithm.

This allows broker to determine which topic he is responsible for.

You can return the lookup request, and this process also triggers the topic loading process.

NamespaceBundles.findBundle

This function is to determine which NamespaceBundle the topic belongs to.

/ Map a segment from topic to hash ring, which is identified by NamespaceBundle as public NamespaceBundle findBundle (TopicName topicName) {checkArgument (this.nsname.equals (topicName.getNamespaceObject (); long hashCode = factory.getLongHashCode (topicName.toString ()); NamespaceBundle bundle = getBundle (hashCode); if (topicName.getDomain (). Equals (TopicDomain.non_persistent)) {bundle.setHasNonPersistentTopic (true);} return bundle }

At this point we can determine the information of the namespace and how many bundle the namespce is divided into.

And you can determine which namespacebundle the topic belongs to.

The next step is to find the responsible broker based on namespaceBundle.

NamespaceService.findBrokerServiceUrl

Here is to determine the broker according to namespacebundle

/ / this records the metadata information of a broker public class NamespaceEphemeralData {private String nativeUrl; private String nativeUrlTls; private String httpUrl; private String httpUrlTls; private boolean disabled; private Map advertisedListeners;} private CompletableFuture findBrokerServiceUrl (NamespaceBundle bundle, LookupOptions options) {ConcurrentOpenHashMap targetMap; return targetMap.computeIfAbsent (bundle, (k)-> {CompletableFuture future = new CompletableFuture () / / First check if we or someone else already owns the bundle ownershipCache.getOwnerAsync (bundle) .thenAccept (nsData-> {/ / nsData: Optional if (! nsData.isPresent ()) {/ / if this information is not found If (options.isReadOnly ()) {/ / Do not attempt to acquire ownership future.complete (Optional.empty ()) } else {/ / currently no one is in charge of this bundle trying to find the owner pulsar.getExecutor of this bundle (). Execute (()-> {searchForCandidateBroker (bundle, future, options);}) }} else if (nsData.get () .isDisabled ()) {/ / namespce is unload future.completeExceptionally (new IllegalStateException (String.format ("Namespace bundle% s is being unloaded", bundle) } else {/ / found the logic here, just splice the normal response directly. / / find the target future.complete (Optional.of (new LookupResult (nsData.get () (exception-> {...}); / / this targetMap is actually used to make a lock structure to avoid multiple loads. / / https://github.com/apache/pulsar/pull/1527 future.whenComplete ((r, t)-> pulsar.getExecutor (). Execute (()-> targetMap.remove (bundle); return future;});}

In this way, if the owner information for this topic exists in the cache, it can be returned directly.

At this point, I believe you have a deeper understanding of "what is the method of handling TopicLookup requests?" you might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report