In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "what services Apache Pulsar started". The explanation in this article is simple and clear, easy to learn and understand. Please follow the ideas of Xiaobian and go deep into it to study and learn "what services Apache Pulsar started" together!
1. Start the portal.
PulsarStandaloneStarter
In standalone mode, the following services are primarily started
PulsarService
PulsarAdmin
LocalBookeeperEnsemble
WorkerService
PulsarBrokerStarter.BrokerStarter
In normal mode, the following services are started
PulsarService
BookieServer
AutoRecoveryMain
StatsProvider
WorkerService
Just a few of these services.
WorkerService: Pulsar function related, can not be started
PulsarService: Main PulsarBroker related
Bookie Server: Bookkeeper
AutoRecoveryMain: Bookkeeper autorecovery
StatsProvider: Metric Exporter Similar Features
2. PulsarService
PulsarService.start
ProtocolHandlers
Support different protocol processing (kafka protocol)
localZookeeperConnectionProvider
Maintaining zk sessions and zk connections
startZkCacheService
LocalZooKeeperCache => LocalZooKeeperCacheService
GlobalZooKeeperCache => ConfigurationCacheService
BookkeeperClientFactory
Create Configuration Bookkeeper Client
managedLedgerClientFactory
Maintain a ManagedLedger client, borrow Bookkeeper Client
BrokerService
This is the main logic of the server. This is put at the back.
loadManager
Collect cluster machine load and balance load based on load
startNamespaceService
NameSpaceService, ResourceBundle for managing placement, and LoadManager related
schemaStorage
schemaRegistryService
The two above are schema-related.
defaultOffloader
LedgerOffloader, used to put cold data in Ledger (Bookkeeper) into other storage
WebService
webSocketService
http, websocket related
LeaderElectionService
It is related to LoadManager. If it is centralized, it is necessary to select a Leader to balance the load according to the cluster situation regularly.
transactionMetadataStoreService
transaction related
metricGenerator
metric correlation
WorkerService
Pulsar function correlation
3. BrokerServicepublic void start() throws Exception { // producer id Distributed generator this.producerNameGenerator = new DistributedIdGenerator(pulsar.getZkClient(), producerNameGeneratorPath, pulsar.getConfiguration().getClusterName()); //Network Layer Configuration ServerBootstrap bootstrap = defaultServerBootstrap.clone(); ServiceConfiguration serviceConfig = pulsar.getConfiguration(); bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false)); ... //bind port listenChannel = bootstrap.bind(addr).sync().channel(); ... // metric this.startStatsUpdater( serviceConfig.getStatsUpdateInitialDelayInSecs(), serviceConfig.getStatsUpdateFrequencyInSecs()); //a bunch of tasks that need to be performed regularly this.startInactivityMonitor(); //Start 3 schedule tasks to detect respectively // 1. Long term invalid topic // 2. A producer that has been inactive for a long time (de-correlating with message) // 3. Long-term invalid subscription this.startMessageExpiryMonitor(); this.startCompactionMonitor(); this.startMessagePublishBufferMonitor(); this.startConsumedLedgersMonitor(); this.startBacklogQuotaChecker(); this.updateBrokerPublisherThrottlingMaxRate(); this.startCheckReplicationPolicies(); // register listener to capture zk-latency ClientCnxnAspect.addListener(zkStatsListener); ClientCnxnAspect.registerExecutor(pulsar.getExecutor());4. PulsarChannelInitializer
Following the initialization method of netty, we directly look at ChannelInitializer, where we should process the request similarly to Kafka.
protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder( brokerConf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4)); ch.pipeline().addLast("flowController", new FlowControlHandler()); ServerCnx cnx = new ServerCnx(pulsar); ch.pipeline().addLast("handler", cnx); connections.put(ch.remoteAddress(), cnx); }5. ServerCnx
The role of this class is to target Kafka Apis and handle various Api requests
This class is actually a ChannelHandler
Inherited PulsarHandler (mainly responsible for keepalive logic of some connections)
PulsarHandler inherits PulsarDecoder (mainly responsible for serializing and deserializing Api requests)
PulsarDecoder is actually a ChannelInboundHandlerAdapter
PulsarAPi is actually generated by Pulsar.proto, where various Api definitions are written
Thank you for reading, the above is "Apache Pulsar started what services" content, after the study of this article, I believe that we have a deeper understanding of what services Apache Pulsar started, the specific use of the situation also needs to be verified. Here is, Xiaobian will push more articles related to knowledge points for everyone, welcome to pay attention!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.