In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
A functional implementation of TAILDIR in Flume 1.7 is simulated, which achieves the reading and writing of the file by manually controlling the reading position of the file, so as to prevent repeated consumption after the flume is hung up.
The following is the code implementation, for reference only. You can read the file contents directly with TAILDIR in production. To read a subdirectory under a directory, you can use the project package implemented on github: https://github.com/qwurey/flume-source-taildir-recursive
Package com.fwmagic.flume.source;import org.apache.commons.io.FileUtils;import org.apache.commons.lang.StringUtils;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.EventDrivenSource;import org.apache.flume.channel.ChannelProcessor;import org.apache.flume.conf.Configurable;import org.apache.flume.event.EventBuilder;import org.apache.flume.source.AbstractSource;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.File;import java.io.IOException Import java.io.RandomAccessFile;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit / * * @ Description: customize Source 1, read the files under the specified directory, such as access.log * 2 of nginx, determine whether the offset file exists before reading the file, and create it if it does not exist * 3. Write an offset file to record the location of the file read each time to prevent repeated consumption when restarting flume. 4, how to customize it? Refer to ExecSource *
* (1): get custom profile properties * (2): create thread pool, send data to channel using channelProcessor * (3): thread pool submission (start task) * task content: * (1): read offset file, create if not, get offset, reset the read pointer to specified offset * (2): read specified log file Package the read line into Event, send Event with Channel * (3): get the offset after reading, reset the offset * (4): call the stop method, close the thread pool, call the super.stop method. * @ Date:Create in 2018-8-19 * / public class TailFileSource extends AbstractSource implements EventDrivenSource, Configurable {/ * listening file * / private String filePath; / * record read offset file * / private String posiFile; / * if the read file has no content, wait a few seconds * / private Long interval; / * read and write the character set of the file * / private String charset; / * the thread reading the file content * / private FileRunner fileRunner / * Thread pool * / private ExecutorService executor; private static final Logger logger = LoggerFactory.getLogger (TailFileSource.class); / * initialize the contents of the configuration file * * @ param context * / @ Override public void configure (Context context) {filePath = context.getString ("filePath"); posiFile = context.getString ("posiFile"); interval = context.getLong ("interval", 2000L) Charset = context.getString ("charset", "UTF-8");} @ Override public synchronized void start () {/ / start a thread to listen to the corresponding log file / / create a thread pool executor = Executors.newSingleThreadExecutor (); / / send data to channel ChannelProcessor channelProcessor = super.getChannelProcessor () with channelProcessor FileRunner = new FileRunner (filePath, posiFile, interval, charset, channelProcessor); executor.submit (fileRunner); super.start ();} @ Override public synchronized void stop () {fileRunner.setFlag (Boolean.FALSE); while (! executor.isTerminated ()) {logger.debug ("waiting for exec executor service to stop") Try {executor.awaitTermination (500, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {e.printStackTrace (); logger.debug ("Interrupted while waiting for executor service to stop,Just exiting."); Thread.currentThread (). Interrupt ();}} super.stop () } public static class FileRunner implements Runnable {private Long interval; private String charset; private Long offset = 0L; private File pFile; private RandomAccessFile raf; private ChannelProcessor channelProcessor; private Boolean flag = Boolean.TRUE; public void setFlag (Boolean flag) {this.flag = flag } public FileRunner (String filePath, String posiFile, Long interval, String charset, ChannelProcessor channelProcessor) {this.interval = interval; this.charset = charset; this.channelProcessor = channelProcessor; / / 1, determine whether there is an offset file, read the offset if there is, and create pFile = new File (posiFile) if not If (! pFile.exists ()) {try {pFile.createNewFile ();} catch (IOException e) {e.printStackTrace (); logger.error ("create position file error!", e) }} / / 2. Determine whether the file content in the offset is greater than 0 try {String offsetStr = FileUtils.readFileToString (pFile, this.charset) / / 3. If there is a record in the offset file, convert the content to Long if (StringUtils.isNotBlank (offsetStr)) {offset = Long.parseLong (offsetStr);} / / 4. If there is an offset, jump directly to the offset location of the file raf = new RandomAccessFile (filePath, "r") / / Jump to the specified location raf.seek (offset);} catch (IOException e) {e.printStackTrace (); logger.error ("read position file error!", e) } @ Override public void run () {/ / listen to the file while (flag) {/ / read the contents of the file String line = null; try {line = raf.readLine () If (StringUtils.isNotBlank (line)) {/ / package the data into Event and send it to Channel line = new String (line.getBytes ("ISO-8859-1"), "UTF-8"); Event event = EventBuilder.withBody (line.getBytes ()); channelProcessor.processEvent (event) / / Update the offset file and write the offset to the file offset = raf.getFilePointer (); FileUtils.writeStringToFile (pFile, offset.toString ());} else {try {Thread.sleep (interval) } catch (InterruptedException e) {e.printStackTrace (); logger.error ("thread sleep error", e);} catch (IOException e) {e.printStackTrace () }}}
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.