In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
The HDFSSink component is mainly composed of several classes of HDFSEventSink,BucketWriter,HDFSWriter.
Among them, the main function of HDFSEventSink is to determine whether the configuration conditions of Sink are legal, and is responsible for obtaining events from Channel, and determining the corresponding BucketWriter of event by parsing header information of event.
BucketWriter is responsible for generating (roll) files on the HDFS side according to rollCount,rollSize and other conditions, and handles them in the same way in each BucetWriter by configuring the file data format and serialization.
HDFSWriter as an interface, there are three specific implementations of HDFSSequenceFile,HDFSDataStream,HDFSCompressedDataStream.
Key Class Diagram in HDFSSink function
HDFSEventSink class
Before getting through HDFSEventSink, you must have some knowledge of the configuration parameters (Flume-HDFSSink configuration parameters description)
1. In the configure () method, get filePath,fileName and other information from the configuration file. For more information, please see (description of Flume-HDFSSink configuration parameters).
2. The start () method initializes the fixed size thread pool callTimeoutPool, periodic execution thread pool timedRollerPool, and sfWriters, and starts sinkCounter
CallTimeoutPool
TimedRollerPool, periodic execution thread pool mainly includes threads renaming HDFS files (according to retryInterval), threads that meet the requirements of generating files for roll operations (according to idleTimeout), threads that close idle files, etc. (rollInterval)
SfWriters sfWriters is actually an implementation class of LinkedHashMap. By rewriting the removeEldestEntry method, the longest unused writer is removed to ensure that the maximum number of open files with a fixed size (maxOpenFiles) can be maintained in sfWriters.
Counters for monitoring metrics of sinkCounter sink components
3. The process () method is the most important logic in HDFSEventSink (some key nodes are written into the code through comments)
Get the Channel in the process () method
The event is obtained from the Channel according to the batchSize size loop, the header and other information of the event is obtained by parsing the event, and the HDFS destination path and the destination file name of the event are determined.
Each event may correspond to a different bucketWriter and hdfswriter, and each event is added to the corresponding writer
When the number of event reaches the number of batchSize, writer performs flush and commits the transaction at the same time
BucketWriter is responsible for the way of generating (roll) files, handling file format and serialization logic.
Among them, the specific implementation of hdfsWriter includes "SequenceFile", "DataStream" and "CompressedStream"; three, the user determines the implementation of specific hdfsWriter according to the hdfs.fileType parameters
Public Status process () throws EventDeliveryException {Channel channel = getChannel (); / / call the parent getChannel method to establish a connection between Channel and Sink Transaction transaction = channel.getTransaction (); / / each batch commit is based on a transaction transaction.begin (); try {Set writers = new LinkedHashSet (); int txnEventCount = 0 for (txnEventCount = 0; txnEventCount)
< batchSize; txnEventCount++) {Event event = channel.take();//从Channel中取出eventif (event == null) {//没有新的event的时候,则不需要按照batchSize循环取break;}// reconstruct the path name by substituting place holders// 在配置文件中会有"a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S"这样的%表示的变量// 解析配置文件中的变量构造realPath 和 realNameString realPath = BucketPath.escapeString(filePath, event.getHeaders(),timeZone, needRounding, roundUnit, roundValue, useLocalTime);String realName = BucketPath.escapeString(fileName, event.getHeaders(),timeZone, needRounding, roundUnit, roundValue, useLocalTime);String lookupPath = realPath + DIRECTORY_DELIMITER + realName;BucketWriter bucketWriter;HDFSWriter hdfsWriter = null;WriterCallback closeCallback = new WriterCallback() {@Overridepublic void run(String bucketPath) {LOG.info("Writer callback called.");synchronized (sfWritersLock) {sfWriters.remove(bucketPath);//sfWriters以LRU方式维护了一个maxOpenFiles大小的map.始终保持最多打开文件个数}}};synchronized (sfWritersLock) {bucketWriter = sfWriters.get(lookupPath);// we haven't seen this file yet, so open it and cache the handleif (bucketWriter == null) {hdfsWriter = writerFactory.getWriter(fileType);//通过工厂获取文件类型,其中包括"SequenceFile","DataStream","CompressedStream";bucketWriter = initializeBucketWriter(realPath, realName,lookupPath, hdfsWriter, closeCallback);sfWriters.put(lookupPath, bucketWriter);}}// Write the data to HDFStry {bucketWriter.append(event);} catch (BucketClosedException ex) {LOG.info("Bucket was closed while trying to append, " +"reinitializing bucket and writing event.");hdfsWriter = writerFactory.getWriter(fileType);bucketWriter = initializeBucketWriter(realPath, realName,lookupPath, hdfsWriter, closeCallback);synchronized (sfWritersLock) {sfWriters.put(lookupPath, bucketWriter);}bucketWriter.append(event);}// track the buckets getting written in this transactionif (!writers.contains(bucketWriter)) {writers.add(bucketWriter);}}if (txnEventCount == 0) {sinkCounter.incrementBatchEmptyCount();} else if (txnEventCount == batchSize) {sinkCounter.incrementBatchCompleteCount();} else {sinkCounter.incrementBatchUnderflowCount();}// flush all pending buckets before committing the transactionfor (BucketWriter bucketWriter : writers) {bucketWriter.flush();}transaction.commit();if (txnEventCount < 1) {return Status.BACKOFF;} else {sinkCounter.addToEventDrainSuccessCount(txnEventCount);return Status.READY;}} catch (IOException eIO) {transaction.rollback();LOG.warn("HDFS IO error", eIO);return Status.BACKOFF;} catch (Throwable th) {transaction.rollback();LOG.error("process failed", th);if (th instanceof Error) {throw (Error) th;} else {throw new EventDeliveryException(th);}} finally {transaction.close();}} BucketWriter flush() 方法: BucketWriter中维护了一个batchCounter,在这个batchCounter大小不为0的时候会进行doFlush(), doFlush()主要就是对batch中的event进行序列化和输出流flush操作,最终结果就是将events写入HDFS中。 如果用户设置了idleTimeout参数不为0,在doFlush()操作之后,会往定时执行线程池中添加一个任务,该关闭当前连接HDFS的输出对象HDFSWriter,执行时间间隔为idleTimeout,并将这个延迟调度的任务赋值给idleFuture变量。 append()方法: 在介绍flush()方法中,会介绍一个idleFuture变量对应的功能,在append()方法执行前首先会检查idleFuture任务是否执行完毕,如果没有执行完成会设置一个超时时间callTimeout等待该进程完成,然后再进行append之后的操作。这样做主要是为了防止关闭HdfsWriter的过程中还在往HDFS中append数据,在append一半时候,HdfsWriter关闭了。 之后,在正是append()之前,又要首先检查当前是否存在HDFSWirter可用于append操作,如果没有调用open()方法。 每次将event往hdfs中append的时候都需要对rollCount,rollSize两个参数进行检查,在满足这两个参数条件的情况下,就需要将临时文件重命名为(roll)正式的HDFS文件。之后,重新再open一个hdfswriter,往这个hdfswriter中append每个event,当event个数达到batchSize时,进行flush操作。 public synchronized void append(final Event event) throws IOException, InterruptedException {checkAndThrowInterruptedException();// idleFuture是ScheduledFuture实例,主要功能关闭当前HDFSWriter,在append event之前需要判断// idleFuture是否已经执行完成,否则会造成在append一半的时候 hdfswriter被关闭if (idleFuture != null) {idleFuture.cancel(false);// There is still a small race condition - if the idleFuture is already// running, interrupting it can cause HDFS close operation to throw -// so we cannot interrupt it while running. If the future could not be// cancelled, it is already running - wait for it to finish before// attempting to write.if (!idleFuture.isDone()) {try {idleFuture.get(callTimeout, TimeUnit.MILLISECONDS);} catch (TimeoutException ex) {LOG.warn("Timeout while trying to cancel closing of idle file. Idle" +" file close may have failed", ex);} catch (Exception ex) {LOG.warn("Error while trying to cancel closing of idle file. ", ex);}}idleFuture = null;}// If the bucket writer was closed due to roll timeout or idle timeout,// force a new bucket writer to be created. Roll count and roll size will// just reuse this oneif (!isOpen) {if (closed) {throw new BucketClosedException("This bucket writer was closed and " +"this handle is thus no longer valid");}open();}// 检查rollCount,rollSize两个roll文件的参数,判断是否roll出新文件if (shouldRotate()) {boolean doRotate = true;if (isUnderReplicated) {if (maxConsecUnderReplRotations >0 & & consecutiveUnderReplRotateCount > = maxConsecUnderReplRotations) {doRotate = false;if (consecutiveUnderReplRotateCount = = maxConsecUnderReplRotations) {LOG.error ("Hit max consecutive under-replication rotations ({});" + "will not continue rolling files under this path due to" + "under-replication", maxConsecUnderReplRotations);}} else {LOG.warn ("Block Under-replication detected. Rotating file. ");} consecutiveUnderReplRotateCount++;} else {consecutiveUnderReplRotateCount = 0;} if (doRotate) {close (); open ();}} / write the eventtry {sinkCounter.incrementEventDrainAttemptCount (); / / sinkCounter Statistics metrixcallWithTimeout (new CallRunner () {@ Overridepublic Void call () throws Exception {writer.append (event); / / writer is return null;}} through the HDFSWriter created by the configuration parameter hdfs.fileType);} catch (IOException e) {LOG.warn (" Caught IOException writing to HDFSWriter ({}). Closing file ("+ bucketPath +") and rethrowing exception. ", e.getMessage (); try {close (true);} catch (IOException e2) {LOG.warn (" Caught IOException while closing file ("+ bucketPath +"). Exception follows. ", e2);} throw e;} / update statisticsprocessSize + = event.getBody (). Length;eventCounter++;batchCounter++;if (batchCounter = = batchSize) {flush ();}}
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.