In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly shows you "flume how to customize source, sink", the content is easy to understand, clear, hope to help you solve doubts, the following let the editor lead you to study and learn "flume how to customize source, sink" this article.
Custom source development:
Source is to collect logs and store them in channel.
Source provides two mechanisms: PollableSource (rotation pull) and EventDrivenSource (event driven)
If you use EventDrivenSource, you can start additional threads in the start method and keep sending data to channel. If you use PollableSource, you can repeat it over and over again in the process () implementation.
Public class MySource extends AbstractSource implements Configurable, PollableSource {private String myProp; @ Override public void configure (Context context) {String myProp = context.getString ("myProp", "defaultValue"); / / Process the myProp value (e.g. Validation, convert to another type,...) / / Store myProp for later retrieval by process () method this.myProp = myProp } @ Override public void start () {/ / Initialize the connection to the external client} @ Override public void stop () {/ / Disconnect from external client and do any additional cleanup / / (e.g. Releasing resources or nulling-out field values).. } @ Override public Status process () throws EventDeliveryException {Status status = null; try {/ / This try clause includes whatever Channel/Event operations you want to do / / Receive new data Event e = getSomeData (); / / Store the Event into this Source's associated Channel (s) getChannelProcessor () .processEvent (e); status = Status.READY;} catch (Throwable t) {/ / Log exception, handle individual exceptions as needed status = Status.BACKOFF / / re-throw all Errors if (t instanceof Error) {throw (Error) t;}} finally {txn.close ();} return status;}}
Or
Package org.apache.flume; import org.apache.flume.conf.Configurable; import org.apache.flume.source.AbstractSource; public class TailSource extends AbstractSource implements EventDrivenSource, Configurable {@ Override public void configure (Context context) {} @ Override public synchronized void start () {@ Override public synchronized void stop () {}}
Custom sink:
Sink pulls logs from channel for processing.
Process will be called again and again, and you just need to fetch the data of channel in process.
Public class MySink extends AbstractSink implements Configurable {private String myProp; @ Override public void configure (Context context) {String myProp = context.getString ("myProp", "defaultValue"); / / Process the myProp value (e.g. Validation) / / Store myProp for later retrieval by process () method this.myProp = myProp;} @ Override public void start () {/ / Initialize the connection to the external repository (e.g. HDFS) that / / this Sink will forward Events to. } @ Override public void stop () {/ / Disconnect from the external respository and do any / / additional cleanup (e.g. Releasing resources or nulling-out / / field values).. } @ Override public Status process () throws EventDeliveryException {Status status = null; / / Start transaction Channel ch = getChannel (); Transaction txn = ch.getTransaction (); txn.begin (); try {/ / This try clause includes whatever Channel operations you want to do Event event = ch.take (); / / Send the Event to the external repository. / / storeSomeData (e); txn.commit (); status = Status.READY;} catch (Throwable t) {txn.rollback (); / / Log exception, handle individual exceptions as needed status = Status.BACKOFF; / / re-throw all Errors if (t instanceof Error) {throw (Error) t;}} return status }} these are all the contents of the article "how to customize source and sink by flume". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.