Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize Flune Client Development

2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

How to achieve Flune Client development, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain for you in detail, people with this need can come to learn, I hope you can gain something.

Due to the great diversity of data production methods in practical work, Flume includes some built-in mechanisms to collect data, but more often users prefer to connect the application directly with flume. So this side runs the user development application, connects to flume through IPC or RPC and sends data to flume.

1. RPC client interface

Flume's RpcClient implements the RPC mechanism of Flume. Users' applications can simply call Flume Client SDK's append (Event) or appendBatch (List) methods to send data without worrying about the details of the underlying information exchange. Users can provide the event they need by directly implementing the Event interface, such as using a simple and convenient implementation of the SimpleEvent class or using EventBuilder's writeBody () static helper method.

Since Flume 1.4.0, Avro has been the default RPC protocol. NettyAvroRpcClient and ThriftRpcClient implement the RpcClient interface. In the implementation, we need to know that the host and port of the target flume agent we are going to connect are used to create an client instance, and then use RpcClient to send data to flume agent.

The official website gives an example of Avro RPCclients, which is directly used as an actual test example.

Here we put client.init ("host.example.org", 41414)

Change it to client.init ("192.168.233.128", 50000); dock with our mainframe

[java] view plain copy

Import org.apache.flume.Event

Import org.apache.flume.EventDeliveryException

Import org.apache.flume.api.RpcClient

Import org.apache.flume.api.RpcClientFactory

Import org.apache.flume.event.EventBuilder

Import java.nio.charset.Charset

Public class MyApp {

Public static voidmain (String [] args) {

MyRpcClientFacade client = new MyRpcClientFacade ()

/ / Initializeclient with the remote Flume agent's host and port

/ / client.init ("host.example.org", 41414)

Client.init ("192.168.233.128", 50000)

/ / Send 10events to the remote Flume agent. That agent should be

/ / configured tolisten with an AvroSource.

String sampleData = "Hello Flume!"

For (int I = 0; I < 10; iTunes +) {

Client.sendDataToFlume (sampleData)

}

Client.cleanUp ()

}

}

Class MyRpcClientFacade {

Private RpcClient client

Private String hostname

Private int port

Public void init (String hostname, int port) {

/ / Setup the RPCconnection

This.hostname = hostname

This.port = port

This.client = RpcClientFactory.getDefaultInstance (hostname, port)

/ / Use thefollowing method to create a thrift client (instead of the above line):

/ / this.client = RpcClientFactory.getThriftInstance (hostname, port)

}

Public void sendDataToFlume (String data) {

/ / Create aFlume Event object that encapsulates the sample data

Event event = EventBuilder.withBody (data, Charset.forName ("UTF-8"))

/ / Send theevent

Try {

Client.append (event)

} catch (EventDeliveryException e) {

/ / clean up andrecreate the client

Client.close ()

Client = null

Client = RpcClientFactory.getDefaultInstance (hostname, port)

/ / Use thefollowing method to create a thrift client (instead of the above line):

/ / this.client = RpcClientFactory.getThriftInstance (hostname, port)

}

}

Public void cleanUp () {

/ / Close the RPCconnection

Client.close ()

}

}

This code does not explain, mainly send HelloFlume 10 times to flume, and remember to add the lib files under the home directory of flume installation to the project in order to run the program normally.

The following is the proxy configuration:

[html] view plain copy

# configuration file: avro_client_case20.conf

# Name the components on this agent

A1.sources = R1

A1.sinks = K1

A1.channels = C1

# Describe/configure the source

A1.sources.r1.type = avro

A1.sources.r1.port = 50000

A1.sources.r1.host = 192.168.233.128

A1.sources.r1.channels = C1

# Describe the sink

A1.sinks.k1.channel = C1

A1.sinks.k1.type = logger

# Use a channel which buffers events inmemory

A1.channels.c1.type = memory

A1.channels.c1.capacity = 1000

A1.channels.c1.transactionCapacity = 100

Note here that, as mentioned earlier, AvroSource or Thrift Source is needed at the receiving end to listen on the interface. So when configuring the agent, write a1.sources.r1.type as avro or thrift.

# hit the command

Flume-ng agent-c conf- f conf/avro_client_case20.conf-n A1-Dflume.root.logger=INFO,console

After starting successfully

Run the Java program in eclipse, or you can package it and run the JAVA program on the server.

# View the console output at the proxy terminal sent by the startup source

You can see that 10 pieces of data are sent normally.

To explain here, client.append (event) in the development code can not only send a piece of data, but also send a List (string) data information, that is, send it in batches. There's no demonstration here.

II. Failover Client

This class package encapsulates Avro RPCclient's class default to provide fault handling capabilities. Hosts uses spaces to separate the flume agent represented by host:port to form a fault handling group. This Failover RPC Client does not currently support thrift. If there is a problem with the currently selected host agent, the failover client is automatically loaded into the next host in the group.

The following is an example of official website development:

[java] view plain copy

/ / Setup properties for the failover

Properties props = new Properties ()

Props.put ("client.type", "default_failover")

/ / List of hosts (space-separated list of user-chosen host aliases)

Props.put ("hosts", "h 2 h 3 h 4")

/ / host/port pair for each host alias

String host1 = "host1.example.org:41414"

String host2 = "host2.example.org:41414"

String host3 = "host3.example.org:41414"

Props.put ("hosts.h2", host1)

Props.put ("hosts.h3", host2)

Props.put ("hosts.h4", host3)

/ / create the client with failover properties

RpcClient client = RpcClientFactory.getInstance (props)

The following is a development example of the test

[java] view plain copy

Import org.apache.flume.Event

Import org.apache.flume.EventDeliveryException

Import org.apache.flume.api.RpcClient

Import org.apache.flume.api.RpcClientFactory

Import org.apache.flume.event.EventBuilder

Import java.nio.charset.Charset

Import java.util.Properties

Public class Failover_Client {

Public static void main (String [] args) {

MyRpcClientFacade2 client = new MyRpcClientFacade2 ()

/ / Initialize client with the remote Flume agent's host and port

Client.init ()

/ / Send 10 events to the remote Flume agent. That agent should be

/ / configured to listen with an AvroSource.

String sampleData = "Hello Flume!"

For (int I = 0; I < 10; iTunes +) {

Client.sendDataToFlume (sampleData)

}

Client.cleanUp ()

}

}

Class MyRpcClientFacade2 {

Private RpcClient client

Private String hostname

Private int port

Public void init () {

/ / Setup the RPC connection

/ / Use the following method to create a thrift client (instead of the above line):

/ / this.client = RpcClientFactory.getThriftInstance (hostname, port)

/ / Setup properties for the failover

Properties props = new Properties ()

Props.put ("client.type", "default_failover")

/ / List of hosts (space-separated list of user-chosen host aliases)

Props.put ("hosts", "h 2 h 3 h 4")

/ / host/port pair for each host alias

String host1 = "192.168.233.128virtual 50000"

String host2 = "192.168.233.128pur50001"

String host3 = "192.168.233.128purl 50002"

Props.put ("hosts.h2", host1)

Props.put ("hosts.h3", host2)

Props.put ("hosts.h4", host3)

/ / create the client with failover properties

Client = RpcClientFactory.getInstance (props)

}

Public void sendDataToFlume (String data) {

/ / Create a Flume Event object that encapsulates the sample data

Event event = EventBuilder.withBody (data, Charset.forName ("UTF-8"))

/ / Send the event

Try {

Client.append (event)

} catch (EventDeliveryException e) {

/ / clean up and recreate the client

Client.close ()

Client = null

Client = RpcClientFactory.getDefaultInstance (hostname, port)

/ / Use the following method to create a thrift client (instead of the above line):

/ / this.client = RpcClientFactory.getThriftInstance (hostname, port)

}

}

Public void cleanUp () {

/ / Close the RPC connection

Client.close ()

}

}

This code sets up three host for failover, which is lazy and simulated with three ports of the same host. The code still sends the Hello Flume 10 times to the first flume agent, and when the first agent fails, it sends it to the second agent to fail over sequentially.

The following is the previous one of the agent configuration, and copy the configuration file

Cp avro_client_case20.conf avro_client_case21.conf

Cp avro_client_case20.conf avro_client_case22.conf

Modify the avro_client_case21.conf and avro_client_case22.conf

A1.sources.r1.port= 50001 and a1.sources.r1.port= 50002

# hit the command

Flume-ng agent-c conf- f conf/avro_client_case20.conf-n A1-Dflume.root.logger=INFO,console

Flume-ng agent-c conf- f conf/avro_client_case21.conf-n A1-Dflume.root.logger=INFO,console

Flume-ng agent-c conf- f conf/avro_client_case22.conf-n A1-Dflume.root.logger=INFO,console

After starting successfully

Run the JAVA program Failover_Client.java in eclipse, or you can package it and run the JAVA program on the server.

# View the console output in the 3 proxy terminals sent by the startup source

We can see that the first proxy terminal received the data while the other two terminals had no data.

Then we turn off the process of the first terminal, run the client program again, and then find that this time is happening to the second terminal. When the second terminal is also closed, the data is sent to the last terminal. Here we can see that the agent host transfer for failover is sequential.

III. LoadBalancing RPC client

Flume Client SDK also supports the use of load-balanced Rpc Client across multiple host. This type of client has a list of host:port hosts separated by spaces and forms a load balancing group. This client can specify a load balancing policy, either randomly selecting a configured host or cycling a host. Of course, you can also write a class that implements the LoadBalancingRpcClient$HostSelector interface so that users can use the selection order they have written. In this case, the user-defined class needs to be specified as the value of the host-selector property. LoadBalancing RPC Client does not currently support thrift.

If backoff is enabled, the client failure will be blacklisted, and the selected failed host will be excluded from the blacklist only after the specified timeout. When the timeout arrives, if the host still does not respond, it is considered a continuous failure and the timeout time increases exponentially to avoid getting caught up in a long wait for the unresponsive host.

The maximum timeout for this backoff can be configured through the maxBackoff property, in milliseconds. By default, the maxBackoff value is 30 seconds (specified in the orderSelector class).

Here is an example of the official website

[java] view plain copy

/ / Setup properties for the load balancing

Properties props = new Properties ()

Props.put ("client.type", "default_loadbalance")

/ / List of hosts (space-separated list of user-chosen host aliases)

Props.put ("hosts", "h 2 h 3 h 4")

/ / host/port pair for each host alias

String host1 = "host1.example.org:41414"

String host2 = "host2.example.org:41414"

String host3 = "host3.example.org:41414"

Props.put ("hosts.h2", host1)

Props.put ("hosts.h3", host2)

Props.put ("hosts.h4", host3)

Props.put ("host-selector", "random"); / / For random host selection

/ / props.put ("host-selector", "round_robin"); / / For round-robin host

/ selection

Props.put ("backoff", "true"); / / Disabled by default.

Props.put ("maxBackoff", "10000"); / / Defaults 0, which effectively

/ / becomes 30000 ms

/ / Create the client with load balancing properties

RpcClient client = RpcClientFactory.getInstance (props)

The following is a development example of the test

[java] view plain copy

Import java.nio.charset.Charset

Import org.apache.flume.Event

Import org.apache.flume.EventDeliveryException

Import org.apache.flume.api.RpcClient

Import org.apache.flume.api.RpcClientFactory

Import org.apache.flume.event.EventBuilder

Import java.util.Properties

Public class Load_Client {

Public static void main (String [] args) {

MyRpcClientFacade3 client = new MyRpcClientFacade3 ()

/ / Initialize client with the remote Flume agent's host and port

Client.init ()

/ / Send 10 events to the remote Flume agent. That agent should be

/ / configured to listen with an AvroSource.

String sampleData = "Flume Load_Client"

For (int I = 0; I < 10; iTunes +) {

Client.sendDataToFlume (sampleData)

}

Client.cleanUp ()

}

}

Class MyRpcClientFacade3 {

Private RpcClient client

Private String hostname

Private int port

Public void init () {

Properties props = new Properties ()

Props.put ("client.type", "default_loadbalance")

/ / List of hosts (space-separated list of user-chosen host aliases)

Props.put ("hosts", "h 2 h 3 h 4")

/ / host/port pair for each host alias

String host1 = "192.168.233.128virtual 50000"

String host2 = "192.168.233.128pur50001"

String host3 = "192.168.233.128purl 50002"

Props.put ("hosts.h2", host1)

Props.put ("hosts.h3", host2)

Props.put ("hosts.h4", host3)

Props.put ("host-selector", "random"); / / For random host selection

/ / props.put ("host-selector", "round_robin"); / / For round-robin host

/ selection

Props.put ("backoff", "true"); / / Disabled by default.

Props.put ("maxBackoff", "10000"); / / Defaults 0, which effectively

/ / becomes 30000 ms

/ / Create the client with load balancing properties

Client = RpcClientFactory.getInstance (props)

}

Public void sendDataToFlume (String data) {

/ / Create a Flume Event object that encapsulates the sample data

Event event = EventBuilder.withBody (data, Charset.forName ("UTF-8"))

/ / Send the event

Try {

Client.append (event)

} catch (EventDeliveryException e) {

/ / clean up and recreate the client

Client.close ()

Client = null

Client = RpcClientFactory.getDefaultInstance (hostname, port)

/ / Use the following method to create a thrift client (instead of the above line):

/ / this.client = RpcClientFactory.getThriftInstance (hostname, port)

}

}

Public void cleanUp () {

/ / Close the RPC connection

Client.close ()

}

}

Random load balancer props.put ("host-selector", "random") is used here. During the test, use the previous three accepting agents to configure avro_client_case20.conf, avro_client_case21.conf, and avro_client_case22.conf, and set them up.

# hit the command

Flume-ng agent-c conf- f conf/avro_client_case20.conf-n A1-Dflume.root.logger=INFO,console

Flume-ng agent-c conf- f conf/avro_client_case21.conf-n A1-Dflume.root.logger=INFO,console

Flume-ng agent-c conf- f conf/avro_client_case22.conf-n A1-Dflume.root.logger=INFO,console

After starting successfully

Run the JAVA program Failover_Client.java in eclipse, or you can package it and run the JAVA program on the server.

# View the console output in the 3 proxy terminals sent by the startup source

Here is Host1. Two pieces of data have been received.

Here is Host2. Two pieces of data have been received.

Here is Host3. Six pieces of data have been received.

You can see that in our development example, host-selector chooses random, so the program also sends data randomly. Let's test the polling round_robin option.

In the program, we modify this sentence.

/ / props.put ("host-selector", "random"); / / For random host selection

Props.put ("host-selector", "round_robin"); / / Forround-robin host

Then run the Java program

Here is Host1. Four pieces of data have been received.

Here is Host2. Three pieces of data have been received.

The same Host3, received three pieces of data, here will not show the picture. Polling is to put the pictures in order.

Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report