In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
The TaildirSource class diagram is as follows (list main classes)
TailDirSource class
TailDirSource inherits the AbstractSource class, while the channelProcessor property in the AbstractSource class is responsible for submitting the Event in Source to the Channel component
The TailDirSource class matches the log file by configuring parameters, gets the log file update and records the offset that has been read to a specific file (position file)
Configure () method:
1. Determine whether the configuration loaded from the configuration file is legal, including conditions such as the existence of the filegroups and the file path in filegroups.
two。 Initialize variables such as batchSize,skipToEnd,writePosInterval,idleTimeout
BatchSize defines the batch processing size of sending Event to Channel
SkipToEnd defines whether to read the data from the end of the file or from the beginning of the file each time the program starts.
WritePosInterval,TaildirSource reads each monitoring file records the read offset of the monitoring file in the location file, and writePosInterval defines the interval at which the location file is updated.
The idleTimeout log file was not modified during the idleTimeout interval, and the file will be closed.
Start () method:
The ReliableTaildirEventReader object is created with the initialized variable configure (), and two thread pools, idleFileChecker and positionWriter, are created to monitor the log file and record the offset of the log file read, respectively.
IdleFileChecker implements a Runnable interface to traverse all the monitored files in reader to check whether the last modification time + idleTimeout of the file is less than the current time, indicating that the log file has not been modified within idleTimeout time, and the file will be closed.
Private class idleFileCheckerRunnable implements Runnable {@ Override public void run () {try {long now = System.currentTimeMillis (); for (TailFile tf: reader.getTailFiles (). Values ()) {if (tf.getLastUpdated () + idleTimeout)
< now && tf.getRaf() != null) { idleInodes.add(tf.getInode()); } } } catch (Throwable t) { logger.error("Uncaught exception in IdleFileChecker thread", t); } }} positionWriter主要作用是记录日志文件读取的偏移量,以json格式("inode", inode, "pos", tf.getPos(), "file", tf.getPath()),其中inode是linux系统中特有属性,在适应其他系统(Windows等)日志采集时ReliableTaildirEventReader.getInode()方法需要修改(注意:在利用Linux系统上inode实现上,文件是通过inode记录日志读取偏移量。所以即使文件名改变了,也不影响日志读取,在我实现Window版本上,只采用了文件名对应日志读取偏移量,文件名改变影响日志读取)。pos则是记录的日志读取的偏移量,file记录了日志文件的路径 process()方法: process方法记录了TailDirSource类中主要的逻辑,获取每个监控的日志文件,调用tailFileProcess获取每个日志文件的更新数据,并将每条记录转换为Event(具体细节要看ReliableTaildirEventReader的readEvents方法) public Status process() { Status status = Status.READY; try { existingInodes.clear(); existingInodes.addAll(reader.updateTailFiles()); for (long inode : existingInodes) { TailFile tf = reader.getTailFiles().get(inode); if (tf.needTail()) { tailFileProcess(tf, true); } } closeTailFiles(); try { TimeUnit.MILLISECONDS.sleep(retryInterval); } catch (InterruptedException e) { logger.info("Interrupted while sleeping"); } } catch (Throwable t) { logger.error("Unable to tail files", t); status = Status.BACKOFF; } return status;} ReliableTaildirEventReader类 构造ReliableTaildirEventReader对象的时候,首先会判断各种必须参数是否合法等,然后加载position file获取每个文件上次记录的日志文件读取的偏移量 loadPositionFile(String filePath) 不粘贴方法的具体代码,主要就是获取每个监控日志文件的读取偏移量 readEvents()的各个不同参数方法中,下面这个是最主要的,该方法获取当前日志文件的偏移量,调用TailFile.readEvents(numEvents, backoffWithoutNL, addByteOffset)方法将日志文件每行转换为Flume的消息对象Event,并循环将每个event添加header信息。 public List readEvents(int numEvents, boolean backoffWithoutNL) throws IOException { if (!committed) { if (currentFile == null) { throw new IllegalStateException("current file does not exist. " + currentFile.getPath()); } logger.info("Last read was never committed - resetting position"); long lastPos = currentFile.getPos(); currentFile.updateFilePos(lastPos); } List events = currentFile.readEvents(numEvents, backoffWithoutNL, addByteOffset); if (events.isEmpty()) { return events; } Map headers = currentFile.getHeaders(); if (annotateFileName || (headers != null && !headers.isEmpty())) { for (Event event : events) { if (headers != null && !headers.isEmpty()) { event.getHeaders().putAll(headers); } if (annotateFileName) { event.getHeaders().put(fileNameHeader, currentFile.getPath()); } } } committed = false; return events;} openFile(File file, Map headers, long inode, long pos) 方法根据日志文件对象,headers,inode和偏移量pos创建一个TailFile对象 TailFile类 TaildirSource通过TailFile类操作处理每个日志文件,包含了RandomAccessFile类,以及记录日志文件偏移量pos,最新更新时间lastUpdated等属性 RandomAccessFile完美的符合TaildirSource的应用场景,RandomAccessFile支持使用seek()方法随机访问文件,配合position file中记录的日志文件读取偏移量,能够轻松简单的seek到文件偏移量,然后向后读取日志内容,并重新将新的偏移量记录到position file中。 readEvent(boolean backoffWithoutNL, boolean addByteOffset)方法: 下图描述了该方法的调用层级,readEvent简单的理解就是将每行日志转为Event消息体,方法最终调用的是readFile()方法。The readLine () method, which is a little difficult, is still being studied.
Public LineResult readLine () throws IOException {LineResult lineResult = null; while (true) {if (bufferPos = = NEED_READING) {if (raf.getFilePointer ())
< raf.length()) {//当文件指针位置小于文件总长度的时候,就需要读取指针位置到文件最后的数据 readFile(); } else { if (oldBuffer.length >0) {lineResult = new LineResult (false, oldBuffer); oldBuffer = new byte [0]; setLineReadPos (lineReadPos + lineResult.line.length);} break;}} for (int I = bufferPos; I)
< buffer.length; i++) { if (buffer[i] == BYTE_NL) { int oldLen = oldBuffer.length; // Don't copy last byte(NEW_LINE) int lineLen = i - bufferPos; // For windows, check for CR if (i >0 & & buffer [I-1] = = BYTE_CR) {lineLen-= 1;} else if (oldBuffer.length > 0 & & oldBuffer [oldBuffer.length-1] = = BYTE_CR) {oldLen-= 1;} lineResult = new LineResult (true, concatByteArrays (oldBuffer, 0, oldLen, buffer, bufferPos, lineLen); setLineReadPos (lineReadPos + (oldBuffer.length + (I-bufferPos + 1) OldBuffer = new byte [0]; if (I + 1 < buffer.length) {bufferPos = I + 1;} else {bufferPos = NEED_READING;} break;}} if (lineResult! = null) {break } / / NEW_LINE not showed up at the end of the buffer oldBuffer = concatByteArrays (oldBuffer, 0, oldBuffer.length, buffer, bufferPos, buffer.length-bufferPos); bufferPos = NEED_READING;} return lineResult;}
ReadFile () Press BUFFER_SIZE (default 8KB) as buffer to read log file data
Private void readFile () throws IOException {if ((raf.length ()-raf.getFilePointer ()) < BUFFER_SIZE) {buffer = new byte [(int) (raf.length ()-raf.getFilePointer ())];} else {buffer = new byte [buff _ SIZE];} raf.read (buffer, 0, buffer.length); bufferPos = 0;}
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.