In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
How to achieve Flune Client development, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain for you in detail, people with this need can come to learn, I hope you can gain something.
Due to the great diversity of data production methods in practical work, Flume includes some built-in mechanisms to collect data, but more often users prefer to connect the application directly with flume. So this side runs the user development application, connects to flume through IPC or RPC and sends data to flume.
1. RPC client interface
Flume's RpcClient implements the RPC mechanism of Flume. Users' applications can simply call Flume Client SDK's append (Event) or appendBatch (List) methods to send data without worrying about the details of the underlying information exchange. Users can provide the event they need by directly implementing the Event interface, such as using a simple and convenient implementation of the SimpleEvent class or using EventBuilder's writeBody () static helper method.
Since Flume 1.4.0, Avro has been the default RPC protocol. NettyAvroRpcClient and ThriftRpcClient implement the RpcClient interface. In the implementation, we need to know that the host and port of the target flume agent we are going to connect are used to create an client instance, and then use RpcClient to send data to flume agent.
The official website gives an example of Avro RPCclients, which is directly used as an actual test example.
Here we put client.init ("host.example.org", 41414)
Change it to client.init ("192.168.233.128", 50000); dock with our mainframe
[java] view plain copy
Import org.apache.flume.Event
Import org.apache.flume.EventDeliveryException
Import org.apache.flume.api.RpcClient
Import org.apache.flume.api.RpcClientFactory
Import org.apache.flume.event.EventBuilder
Import java.nio.charset.Charset
Public class MyApp {
Public static voidmain (String [] args) {
MyRpcClientFacade client = new MyRpcClientFacade ()
/ / Initializeclient with the remote Flume agent's host and port
/ / client.init ("host.example.org", 41414)
Client.init ("192.168.233.128", 50000)
/ / Send 10events to the remote Flume agent. That agent should be
/ / configured tolisten with an AvroSource.
String sampleData = "Hello Flume!"
For (int I = 0; I < 10; iTunes +) {
Client.sendDataToFlume (sampleData)
}
Client.cleanUp ()
}
}
Class MyRpcClientFacade {
Private RpcClient client
Private String hostname
Private int port
Public void init (String hostname, int port) {
/ / Setup the RPCconnection
This.hostname = hostname
This.port = port
This.client = RpcClientFactory.getDefaultInstance (hostname, port)
/ / Use thefollowing method to create a thrift client (instead of the above line):
/ / this.client = RpcClientFactory.getThriftInstance (hostname, port)
}
Public void sendDataToFlume (String data) {
/ / Create aFlume Event object that encapsulates the sample data
Event event = EventBuilder.withBody (data, Charset.forName ("UTF-8"))
/ / Send theevent
Try {
Client.append (event)
} catch (EventDeliveryException e) {
/ / clean up andrecreate the client
Client.close ()
Client = null
Client = RpcClientFactory.getDefaultInstance (hostname, port)
/ / Use thefollowing method to create a thrift client (instead of the above line):
/ / this.client = RpcClientFactory.getThriftInstance (hostname, port)
}
}
Public void cleanUp () {
/ / Close the RPCconnection
Client.close ()
}
}
This code does not explain, mainly send HelloFlume 10 times to flume, and remember to add the lib files under the home directory of flume installation to the project in order to run the program normally.
The following is the proxy configuration:
[html] view plain copy
# configuration file: avro_client_case20.conf
# Name the components on this agent
A1.sources = R1
A1.sinks = K1
A1.channels = C1
# Describe/configure the source
A1.sources.r1.type = avro
A1.sources.r1.port = 50000
A1.sources.r1.host = 192.168.233.128
A1.sources.r1.channels = C1
# Describe the sink
A1.sinks.k1.channel = C1
A1.sinks.k1.type = logger
# Use a channel which buffers events inmemory
A1.channels.c1.type = memory
A1.channels.c1.capacity = 1000
A1.channels.c1.transactionCapacity = 100
Note here that, as mentioned earlier, AvroSource or Thrift Source is needed at the receiving end to listen on the interface. So when configuring the agent, write a1.sources.r1.type as avro or thrift.
# hit the command
Flume-ng agent-c conf- f conf/avro_client_case20.conf-n A1-Dflume.root.logger=INFO,console
After starting successfully
Run the Java program in eclipse, or you can package it and run the JAVA program on the server.
# View the console output at the proxy terminal sent by the startup source
You can see that 10 pieces of data are sent normally.
To explain here, client.append (event) in the development code can not only send a piece of data, but also send a List (string) data information, that is, send it in batches. There's no demonstration here.
II. Failover Client
This class package encapsulates Avro RPCclient's class default to provide fault handling capabilities. Hosts uses spaces to separate the flume agent represented by host:port to form a fault handling group. This Failover RPC Client does not currently support thrift. If there is a problem with the currently selected host agent, the failover client is automatically loaded into the next host in the group.
The following is an example of official website development:
[java] view plain copy
/ / Setup properties for the failover
Properties props = new Properties ()
Props.put ("client.type", "default_failover")
/ / List of hosts (space-separated list of user-chosen host aliases)
Props.put ("hosts", "h 2 h 3 h 4")
/ / host/port pair for each host alias
String host1 = "host1.example.org:41414"
String host2 = "host2.example.org:41414"
String host3 = "host3.example.org:41414"
Props.put ("hosts.h2", host1)
Props.put ("hosts.h3", host2)
Props.put ("hosts.h4", host3)
/ / create the client with failover properties
RpcClient client = RpcClientFactory.getInstance (props)
The following is a development example of the test
[java] view plain copy
Import org.apache.flume.Event
Import org.apache.flume.EventDeliveryException
Import org.apache.flume.api.RpcClient
Import org.apache.flume.api.RpcClientFactory
Import org.apache.flume.event.EventBuilder
Import java.nio.charset.Charset
Import java.util.Properties
Public class Failover_Client {
Public static void main (String [] args) {
MyRpcClientFacade2 client = new MyRpcClientFacade2 ()
/ / Initialize client with the remote Flume agent's host and port
Client.init ()
/ / Send 10 events to the remote Flume agent. That agent should be
/ / configured to listen with an AvroSource.
String sampleData = "Hello Flume!"
For (int I = 0; I < 10; iTunes +) {
Client.sendDataToFlume (sampleData)
}
Client.cleanUp ()
}
}
Class MyRpcClientFacade2 {
Private RpcClient client
Private String hostname
Private int port
Public void init () {
/ / Setup the RPC connection
/ / Use the following method to create a thrift client (instead of the above line):
/ / this.client = RpcClientFactory.getThriftInstance (hostname, port)
/ / Setup properties for the failover
Properties props = new Properties ()
Props.put ("client.type", "default_failover")
/ / List of hosts (space-separated list of user-chosen host aliases)
Props.put ("hosts", "h 2 h 3 h 4")
/ / host/port pair for each host alias
String host1 = "192.168.233.128virtual 50000"
String host2 = "192.168.233.128pur50001"
String host3 = "192.168.233.128purl 50002"
Props.put ("hosts.h2", host1)
Props.put ("hosts.h3", host2)
Props.put ("hosts.h4", host3)
/ / create the client with failover properties
Client = RpcClientFactory.getInstance (props)
}
Public void sendDataToFlume (String data) {
/ / Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody (data, Charset.forName ("UTF-8"))
/ / Send the event
Try {
Client.append (event)
} catch (EventDeliveryException e) {
/ / clean up and recreate the client
Client.close ()
Client = null
Client = RpcClientFactory.getDefaultInstance (hostname, port)
/ / Use the following method to create a thrift client (instead of the above line):
/ / this.client = RpcClientFactory.getThriftInstance (hostname, port)
}
}
Public void cleanUp () {
/ / Close the RPC connection
Client.close ()
}
}
This code sets up three host for failover, which is lazy and simulated with three ports of the same host. The code still sends the Hello Flume 10 times to the first flume agent, and when the first agent fails, it sends it to the second agent to fail over sequentially.
The following is the previous one of the agent configuration, and copy the configuration file
Cp avro_client_case20.conf avro_client_case21.conf
Cp avro_client_case20.conf avro_client_case22.conf
Modify the avro_client_case21.conf and avro_client_case22.conf
A1.sources.r1.port= 50001 and a1.sources.r1.port= 50002
# hit the command
Flume-ng agent-c conf- f conf/avro_client_case20.conf-n A1-Dflume.root.logger=INFO,console
Flume-ng agent-c conf- f conf/avro_client_case21.conf-n A1-Dflume.root.logger=INFO,console
Flume-ng agent-c conf- f conf/avro_client_case22.conf-n A1-Dflume.root.logger=INFO,console
After starting successfully
Run the JAVA program Failover_Client.java in eclipse, or you can package it and run the JAVA program on the server.
# View the console output in the 3 proxy terminals sent by the startup source
We can see that the first proxy terminal received the data while the other two terminals had no data.
Then we turn off the process of the first terminal, run the client program again, and then find that this time is happening to the second terminal. When the second terminal is also closed, the data is sent to the last terminal. Here we can see that the agent host transfer for failover is sequential.
III. LoadBalancing RPC client
Flume Client SDK also supports the use of load-balanced Rpc Client across multiple host. This type of client has a list of host:port hosts separated by spaces and forms a load balancing group. This client can specify a load balancing policy, either randomly selecting a configured host or cycling a host. Of course, you can also write a class that implements the LoadBalancingRpcClient$HostSelector interface so that users can use the selection order they have written. In this case, the user-defined class needs to be specified as the value of the host-selector property. LoadBalancing RPC Client does not currently support thrift.
If backoff is enabled, the client failure will be blacklisted, and the selected failed host will be excluded from the blacklist only after the specified timeout. When the timeout arrives, if the host still does not respond, it is considered a continuous failure and the timeout time increases exponentially to avoid getting caught up in a long wait for the unresponsive host.
The maximum timeout for this backoff can be configured through the maxBackoff property, in milliseconds. By default, the maxBackoff value is 30 seconds (specified in the orderSelector class).
Here is an example of the official website
[java] view plain copy
/ / Setup properties for the load balancing
Properties props = new Properties ()
Props.put ("client.type", "default_loadbalance")
/ / List of hosts (space-separated list of user-chosen host aliases)
Props.put ("hosts", "h 2 h 3 h 4")
/ / host/port pair for each host alias
String host1 = "host1.example.org:41414"
String host2 = "host2.example.org:41414"
String host3 = "host3.example.org:41414"
Props.put ("hosts.h2", host1)
Props.put ("hosts.h3", host2)
Props.put ("hosts.h4", host3)
Props.put ("host-selector", "random"); / / For random host selection
/ / props.put ("host-selector", "round_robin"); / / For round-robin host
/ selection
Props.put ("backoff", "true"); / / Disabled by default.
Props.put ("maxBackoff", "10000"); / / Defaults 0, which effectively
/ / becomes 30000 ms
/ / Create the client with load balancing properties
RpcClient client = RpcClientFactory.getInstance (props)
The following is a development example of the test
[java] view plain copy
Import java.nio.charset.Charset
Import org.apache.flume.Event
Import org.apache.flume.EventDeliveryException
Import org.apache.flume.api.RpcClient
Import org.apache.flume.api.RpcClientFactory
Import org.apache.flume.event.EventBuilder
Import java.util.Properties
Public class Load_Client {
Public static void main (String [] args) {
MyRpcClientFacade3 client = new MyRpcClientFacade3 ()
/ / Initialize client with the remote Flume agent's host and port
Client.init ()
/ / Send 10 events to the remote Flume agent. That agent should be
/ / configured to listen with an AvroSource.
String sampleData = "Flume Load_Client"
For (int I = 0; I < 10; iTunes +) {
Client.sendDataToFlume (sampleData)
}
Client.cleanUp ()
}
}
Class MyRpcClientFacade3 {
Private RpcClient client
Private String hostname
Private int port
Public void init () {
Properties props = new Properties ()
Props.put ("client.type", "default_loadbalance")
/ / List of hosts (space-separated list of user-chosen host aliases)
Props.put ("hosts", "h 2 h 3 h 4")
/ / host/port pair for each host alias
String host1 = "192.168.233.128virtual 50000"
String host2 = "192.168.233.128pur50001"
String host3 = "192.168.233.128purl 50002"
Props.put ("hosts.h2", host1)
Props.put ("hosts.h3", host2)
Props.put ("hosts.h4", host3)
Props.put ("host-selector", "random"); / / For random host selection
/ / props.put ("host-selector", "round_robin"); / / For round-robin host
/ selection
Props.put ("backoff", "true"); / / Disabled by default.
Props.put ("maxBackoff", "10000"); / / Defaults 0, which effectively
/ / becomes 30000 ms
/ / Create the client with load balancing properties
Client = RpcClientFactory.getInstance (props)
}
Public void sendDataToFlume (String data) {
/ / Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody (data, Charset.forName ("UTF-8"))
/ / Send the event
Try {
Client.append (event)
} catch (EventDeliveryException e) {
/ / clean up and recreate the client
Client.close ()
Client = null
Client = RpcClientFactory.getDefaultInstance (hostname, port)
/ / Use the following method to create a thrift client (instead of the above line):
/ / this.client = RpcClientFactory.getThriftInstance (hostname, port)
}
}
Public void cleanUp () {
/ / Close the RPC connection
Client.close ()
}
}
Random load balancer props.put ("host-selector", "random") is used here. During the test, use the previous three accepting agents to configure avro_client_case20.conf, avro_client_case21.conf, and avro_client_case22.conf, and set them up.
# hit the command
Flume-ng agent-c conf- f conf/avro_client_case20.conf-n A1-Dflume.root.logger=INFO,console
Flume-ng agent-c conf- f conf/avro_client_case21.conf-n A1-Dflume.root.logger=INFO,console
Flume-ng agent-c conf- f conf/avro_client_case22.conf-n A1-Dflume.root.logger=INFO,console
After starting successfully
Run the JAVA program Failover_Client.java in eclipse, or you can package it and run the JAVA program on the server.
# View the console output in the 3 proxy terminals sent by the startup source
Here is Host1. Two pieces of data have been received.
Here is Host2. Two pieces of data have been received.
Here is Host3. Six pieces of data have been received.
You can see that in our development example, host-selector chooses random, so the program also sends data randomly. Let's test the polling round_robin option.
In the program, we modify this sentence.
/ / props.put ("host-selector", "random"); / / For random host selection
Props.put ("host-selector", "round_robin"); / / Forround-robin host
Then run the Java program
Here is Host1. Four pieces of data have been received.
Here is Host2. Three pieces of data have been received.
The same Host3, received three pieces of data, here will not show the picture. Polling is to put the pictures in order.
Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.