Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement Apache Pulsar binary Protocol

2025-02-14 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces "how to implement Apache Pulsar binary protocol". In daily operation, I believe many people have doubts about how to implement Apache Pulsar binary protocol. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful for you to answer the doubts about "how to implement Apache Pulsar binary protocol". Next, please follow the editor to study!

Pulsar uses protocolBuf as a tool for writing binary protocols

Approximate message type (as of version 2.7) message BaseCommand {enum Type {CONNECT = 2; CONNECTED = 3; / / consumer registration SUBSCRIBE = 4; / / producer registration PRODUCER = 5; / / write message SEND = 6 to topic; / / response SEND_RECEIPT= 7 written / / write exception response SEND_ERROR = 8; / / send message to consumer MESSAGE = 9; / / confirm whether a message is successfully consumed ACK = 10; / / consumer request message FLOW = 11; UNSUBSCRIBE = 12; / / A generic successful response SUCCESS = 13 / / A general exception response ERROR = 14; CLOSE_PRODUCER = 15; CLOSE_CONSUMER = 16; / / Producer response PRODUCER_SUCCESS = 17; / / PING for network layer keepAlive = 18; PONG = 19; / / REDELIVER_UNACKNOWLEDGED_MESSAGES = 20; PARTITIONED_METADATA = 21 PARTITIONED_METADATA_RESPONSE = 22; LOOKUP = 23; LOOKUP_RESPONSE = 24; CONSUMER_STATS = 25; CONSUMER_STATS_RESPONSE = 26; / / REACHED_END_OF_TOPIC = 27; SEEK = 28; GET_LAST_MESSAGE_ID = 29; GET_LAST_MESSAGE_ID_RESPONSE = 30 / / ACTIVE_CONSUMER_CHANGE = 31; GET_TOPICS_OF_NAMESPACE = 32; GET_TOPICS_OF_NAMESPACE_RESPONSE = 33; GET_SCHEMA = 34; GET_SCHEMA_RESPONSE = 35; AUTH_CHALLENGE = 36; AUTH_RESPONSE = 37; ACK_RESPONSE = 38; GET_OR_CREATE_SCHEMA = 39 GET_OR_CREATE_SCHEMA_RESPONSE = 40; / / transaction related / / transaction-related is easy to understand. The following ignores 50-61} / /.} CommandConnect

Here, a CONNECT request will be sent as soon as the client connects to the channel of the server connection.

Here will be some authentication and protocol version of the information reported.

After communicating with the client version, the server knows which features the client supports and will do some compatibility processing.

Equivalent to ApiVersionRequest in kafka

/ / org.apache.pulsar.client.impl.ClientCnxpublic void channelActive (ChannelHandlerContext ctx) throws Exception {super.channelActive (ctx); this.timeoutTask = this.eventLoopGroup.scheduleAtFixedRate (()-> checkRequestTimeout (), operationTimeoutMs, operationTimeoutMs, TimeUnit.MILLISECONDS); if (proxyToTargetBrokerAddress = = null) {if (log.isDebugEnabled ()) {log.debug ("{} Connected to broker", ctx.channel ()) }} else {log.info ("{} Connected through proxy to target broker at {}", ctx.channel (), proxyToTargetBrokerAddress) } / / Send CONNECT command ctx.writeAndFlush (newConnectCommand ()) .addListener (future-> {if (future.isSuccess ()) {if (log.isDebugEnabled ()) {log.debug ("Complete: {}", future.isSuccess () } state = State.SentConnectFrame;} else {log.warn ("Error during handshake", future.cause (); ctx.close ();}});} CommandConnected

This is actually CommandConnect's response, but the name has been changed.

(it's easy to mismatch)

/ / org.apache.pulsar.broker.service.ServerCnxprotected void handleConnect (CommandConnect connect) {checkArgument (state = = State.Start) If (log.isDebugEnabled ()) {log.debug ("Received CONNECT from {}, auth enabled: {}:" + "has original principal = {}, original principal = {}", remoteAddress, service.isAuthenticationEnabled (), connect.hasOriginalPrincipal (), connect.getOriginalPrincipal ()) } String clientVersion = connect.getClientVersion (); int clientProtocolVersion = connect.getProtocolVersion (); features = new FeatureFlags (); if (connect.hasFeatureFlags ()) {features.copyFrom (connect.getFeatureFlags ());} if (! service.isAuthenticationEnabled ()) {completeConnect (clientProtocolVersion, clientVersion); return } / /.} CommandSubscribe

This RPC is used by consumer to register with the server.

The specific location of the call is that the last line of the ConsumerImpl constructor will request the server to connect with the client. If a Connection is obtained, the callback connectionOpened of the successful connection will be called. If it is consumer, the request will be sent to register the consumer-related information.

If associated with the above CommandConnect request, the request is sent after the CommandConnect.

/ / org.apache.pulsar.client.impl.ConsumerImpl@Override public void connectionOpened (final ClientCnx cnx) {/ /. There are a lot of preparation parameters above. Ignore / / build a subscription ByteBuf request = Commands.newSubscribe (topic, subscription, consumerId, requestId, getSubType (), priorityLevel, consumerName, isDurable, startMessageIdData, metadata) ReadCompacted, conf.isReplicateSubscriptionState (), InitialPosition.valueOf (subscriptionInitialPosition.getValue ()), startMessageRollbackDuration, schemaInfo, createTopicIfDoesNotExist, conf.getKeySharedPolicy () }

Proto definition description (see note)

Message CommandSubscribe {/ / here corresponds to the four types of subscription enum SubType {Exclusive = 0; Shared = 1; Failover = 2; Key_Shared = 3;} / / topic name required string topic = 1; / / subscription name required string subscription = 2; / / subscription type required SubType subType = 3 / / this is used to mark the consumer ID on this network connection: required uint64 consumer_id = 4; / / request ID of the network layer required uint64 request_id = 5; / / consumer name optional string consumer_name = 6; / / priority of consumer. The consumer with high priority is easy to receive the request optional int32 priority_level = 7 first. / / whether the subsciption is persistent / / Signal wether the subscription should be backed by a / / durable cursor or not optional bool durable = 8 [default = true]; / / If specified, the subscription will position the cursor / / markd-delete position on the particular message id and / / will send messages from that point optional MessageIdData start_message_id = 9; / / Custom tag Map / Add optional metadata key=value to this consumer repeated KeyValue metadata = 10 with some consumer added Optional bool read_compacted = 11; optional Schema schema = 12; / / where the initialization location starts, the latest or the oldest enum InitialPosition {Latest = 0; Earliest = 1;} / Signal whether the subscription will initialize on latest / / or not-- earliest optional InitialPosition initialPosition = 13 [default = Latest]; / / geo-replication related, ignore / / Mark the subscription as "replicated" first. Pulsar will make sure / / to periodically sync the state of replicated subscriptions / / across different clusters (when using geo-replication). Optional bool replicate_subscription_state = 14; / / If true, the subscribe operation will cause a topic to be / / created if it does not exist already (and if topic auto-creation / / is allowed by broker. / / If false, the subscribe operation will fail if the topic / / does not exist. Optional bool force_topic_creation = 15 [default = true]; / / this is the time when the consumption schedule is reset according to time / / If specified, the subscription will reset cursor's position back / / to specified seconds and will send messages from that point optional uint64 start_message_rollback_duration_sec = 16 [default = 0]; / / key_Share mode, do not look at optional KeySharedMeta keySharedMeta = 17;} CommandProducer for the time being

This RPC corresponds to consumer, which is used by producer to register with the server, and the call location is in the same org.apache.pulsar.client.impl.ProducerImpl.connectionOpened.

/ Create a new Producer ona topic, assigning the given producer_id,/// all messages sent with this producer_id will be persisted on the topicmessage CommandProducer {/ / topic required string topic = 1; required uint64 producer_id = 2; / / request number of the network layer required uint64 request_id = 3; / / If a producer name is specified, the name will be used, / otherwise the broker will generate a unique name optional string producer_name = 4 / / whether it is encrypted to write optional bool encrypted = 5 [default = false]; / / metadata Map / Add optional metadata key=value to this producer repeated KeyValue metadata = 6; optional Schema schema = 7; / / it should be called producer_epoch / / If producer reconnect to broker, the epoch of this producer will + 1 optional uint64 epoch = 8 [default = 0] / / Indicate the name of the producer is generated or user provided / / Use default true here is in order to be forward compatible with the client optional bool user_provided_producer_name = 9 [default = true]; / / here are three ways to write / / Require that this producers will be the only producer allowed on the topic optional ProducerAccessMode producer_access_mode = 10 [default = Shared]; / / Topic epoch is used to fence off producers that reconnects after a new / / exclusive producer has already taken over. This id is assigned by the / / broker on the CommandProducerSuccess. The first time, the client will / / leave it empty and then it will always carry the same epoch number on / / the subsequent reconnections. Optional uint64 topic_epoch = 11;} enum ProducerAccessMode {Shared = 0; / / By default multiple producers can publish ona topic Exclusive = 1; / / Require exclusive access for producer. Fail immediately if there's already a producer connected. WaitForExclusive = 2; / / Producer creation is pending until it can acquire exclusive access} CommandProducerSuccess

This is the successful response as a CommandProduce request

/ Response from CommandProducermessage CommandProducerSuccess {/ / Network layer id required uint64 request_id = 1; / / producer name required string producer_name = 2; / / The last sequence id that was stored by this producer in the previous session / / This will only be meaningful if deduplication has been enabled. Optional int64 last_sequence_id = 3 [default =-1]; optional bytes schema_version = 4; / / The topic epoch assigned by the broker. This field will only be set if we / / were requiring exclusive access when creating the producer. Optional uint64 topic_epoch = 5; / / this should be related to the above ProducerAccessMode, later there is an opportunity to introduce this / / If producer is not "ready", the client will avoid to timeout the request / / for creating the producer. Instead it will wait indefinitely until it gets / / a subsequent `CommandProducerSuccess` with `producer_ready== true`. Optional bool producer_ready = 6 [default = true];} CommandSend

This is the RPC that producer uses to send messages to the server.

You can use the org.apache.pulsar.client.impl.ProducerImpl.sendAsync method to trace all the way to the location of the call. The general message will be serialized into this request after logical processing such as batch, encryption, chunking, and so on.

The format of the specific serialization is the following

BaseCommand is CommandSend.

/ org.apache.pulsar.common.protocol.Commandsprivate static ByteBufPair serializeCommandSendWithSize (BaseCommand cmd, ChecksumType checksumType, MessageMetadata msgMetadata, ByteBuf payload) {/ Wire format / / [TOTAL_SIZE] [CMD_SIZE] [CMD] [MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD]

The protocol format actually contains only the [CMD] section above.

Message CommandSend {required uint64 producer_id = 1; required uint64 sequence_id = 2; optional int32 num_messages = 3 [default = 1]; optional uint64 txnid_least_bits = 4 [default = 0]; optional uint64 txnid_most_bits = 5 [default = 0]; / / Add highest sequence id to support batch message with external sequence id optional uint64 highest_sequence_id = 6 [default = 0]; optional bool is_chunk = 7 [default = false];} CommandSendReceipt

This is the successful response after the server has successfully processed the message persistence.

Message CommandSendReceipt {required uint64 producer_id = 1; / / this is used to ensure the order of required uint64 sequence_id = 2; optional MessageIdData message_id = 3; / / this should be used to remove duplicates optional uint64 highest_sequence_id = 4 [default = 0];} / / this is the returned successfully written message id, this structure reuses message MessageIdData {required uint64 ledgerId = 1; required uint64 entryId = 2 in other locations Optional int32 partition = 3 [default =-1]; / / here is optional int32 batch_index = 4 [default =-1]; repeated int64 ack_set = 5; optional int32 batch_size = 6;} CommandSendError

This is the response of the CommandSend exception

Message CommandSendError {required uint64 producer_id = 1; required uint64 sequence_id = 2; required ServerError error = 3; required string message = 4;} CommandFlow

This is used to inform the server of the number of messages currently acceptable to my consumer.

The server records the number of messages currently accepted by each consumer in a subscription.

When assigning messages to which consumer, this number is used to determine whether the consumer currently accepts messages.

The location we know so far is that after the connectionOpened method successfully processes the Subscription registration, an CommandFlow request is sent to the server to push the message.

However, it is conceivable that this message will be sent if the consumer queue is idle.

Message CommandFlow {required uint64 consumer_id = 1; / / Max number of messages to prefetch, in addition / / of any number previously specified required uint32 messagePermits = 2;} CommandMessage

In fact, the server may push the message to the consumer, and the server will actively send the request to the consumer. (this logic is in the dispatcher in the subscription of the server)

The specific location of the call is in org.apache.pulsar.broker.service.Consumer#sendMessages

This method is called by the class org.apache.pulsar.broker.service.Dispatcher if you look up one level.

The format here is the same as the format written above. The Command here is actually a header of RPC followed by the payload of the message.

/ / Wire format// [TOTAL_SIZE] [CMD_SIZE] [CMD] [MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD] / metadataAndPayload contains from magic-number to the payload includedmessage CommandMessage {required uint64 consumer_id = 1; / / here is id required MessageIdData message_id = 2 of the message; / / how many times this message has been retransmitted optional uint32 redelivery_count = 3 [default = 0] / / which of this message has been ack repeated int64 ack_set = 4;} CommandAck

This message used for successful consumption of ack can be used to ack a single message.

You can also accumulate acknowledgements (similar to kafka).

In order to reduce the frequency of RPC, an optimization of batch ack is made on the client side.

The corresponding processing on the server side generally updates the data stored in the ManagedCursor and persists the result of the ack.

Message CommandAck {/ / ack type: cumulative confirmation or individual confirmation: enum AckType {Individual = 0; Cumulative = 1;} required uint64 consumer_id = 1; required AckType ack_type = 2; / / if the type is repeated, you can use ack as batch / / In case of individual acks, the client can pass a list of message ids repeated MessageIdData message_id = 3 / / Acks can contain a flag to indicate the consumer / / received an invalid message that got discarded / / before being passed on to the application. Enum ValidationError {UncompressedSizeCorruption = 0; DecompressionError = 1; ChecksumMismatch = 2; BatchDeSerializeError = 3; DecryptionError = 4;} / / some abnormal cases may also ack this message. Some information will be recorded here: optional ValidationError validation_error = 4; repeated KeyLongValue properties = 5; optional uint64 txnid_least_bits = 6 [default = 0]; optional uint64 txnid_most_bits = 7 [default = 0] / / Network layer request id optional uint64 request_id = 8;} CommandRedeliverUnacknowledgedMessages

This is the RPC that consumer tells the server which messages need to be redelivered.

Message CommandRedeliverUnacknowledgedMessages {required uint64 consumer_id = 1; repeated MessageIdData message_ids = 2;} CommandSuccess & CommandError

This is actually a common response, which can be used by almost all requests if there are no special fields to be returned.

Unlike Kafka, every request and response carries an ApiKey that does not correspond strictly to one to one.

Message CommandSuccess {required uint64 request_id = 1; optional Schema schema = 2;} message CommandError {required uint64 request_id = 1; required ServerError error = 2; required string message = 3;} CommandPing & CommandPong

These two are empty, and the main function is to maintain the keepAlive of the tcp connection application layer.

Org.apache.pulsar.common.protocol.PulsarHandler#handleKeepAliveTimeout

/ / Commands to probe the state of connection.// When either client or broker doesn't receive commands for certain// amount of time, they will send a Ping probe.message CommandPing {} message CommandPong {} at this point, the study on "how to implement the Apache Pulsar binary protocol" is over. I hope I can solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report