Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How does flume-ng customize the interceptor

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly explains "how to customize the interceptor in flume-ng". Interested friends may wish to have a look at it. The method introduced in this paper is simple, fast and practical. Now let the editor take you to learn "how to customize the interceptor for flume-ng"!

The code is as follows:

Package com.wy.flume.interceptor;import java.util.List;import java.util.Map;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.commons.lang.StringUtils;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;import org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer;import org.apache.flume.interceptor.RegexExtractorInterceptorSerializer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.google.common.base.Charsets Import com.google.common.base.Preconditions;import com.google.common.base.Throwables;import com.google.common.collect.Lists;public class RegexExtractorHeaderInterceptor implements Interceptor {static final String REGEX = "regex"; static final String SERIALIZERS = "serializers"; static final String EXTRACTOR_HEADER = "extractorHeader"; static final boolean DEFAULT_EXTRACTOR_HEADER = false; static final String EXTRACTOR_HEADER_KEY = "extractorHeaderKey"; private static final Logger logger = LoggerFactory .getLogger (RegexExtractorHeaderInterceptor.class) Private final Pattern regex; private final List serializers; private final boolean extractorHeader; private final String extractorHeaderKey; private RegexExtractorHeaderInterceptor (Pattern regex, List serializers,boolean extractorHeader, String extractorHeaderKey) {this.regex = regex; this.serializers = serializers; this.extractorHeader = extractorHeader; this.extractorHeaderKey = extractorHeaderKey;} @ Override public void initialize () {/ / NO-OP... } @ Override public void close () {/ / NO-OP... } @ Override public Event intercept (Event event) {String extractorHeaderVal; if (extractorHeader) {extractorHeaderVal = event.getHeaders () .get (extractorHeaderKey);} else {extractorHeaderVal = new String (event.getBody (), Charsets.UTF_8);} Matcher matcher = regex.matcher (extractorHeaderVal); Map headers = event.getHeaders () If (matcher.find ()) {for (int group = 0, count = matcher.groupCount (); group

< count; group++) { int groupIndex = group + 1; if (groupIndex >

Serializers.size () {if (logger.isDebugEnabled ()) {logger.debug ("Skipping group {} to {} due to missing serializer", group, count);} break;} NameAndSerializer serializer = serializers.get (group) If (logger.isDebugEnabled ()) {logger.debug ("Serializing {} using {}", serializer.headerName, serializer.serializer);} headers.put (serializer.headerName, serializer.serializer.serialize (matcher.group (groupIndex);}} return event } @ Override public List intercept (List events) {List intercepted = Lists.newArrayListWithCapacity (events.size ()); for (Event event: events) {Event interceptedEvent = intercept (event); if (interceptedEvent! = null) {intercepted.add (interceptedEvent);}} return intercepted;} public static class Builder implements Interceptor.Builder {private Pattern regex; private List serializerList Private boolean extractorHeader; private String extractorHeaderKey; private final RegexExtractorInterceptorPassThroughSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer (); @ Override public void configure (Context context) {String regexString = context.getString (REGEX); Preconditions.checkArgument (! StringUtils.isEmpty (regexString), "Must supply a valid regex string"); regex = Pattern.compile (regexString); regex.pattern () Regex.matcher ("") .groupCount (); configureSerializers (context); extractorHeader = context.getBoolean (EXTRACTOR_HEADER,DEFAULT_EXTRACTOR_HEADER); if (extractorHeader) {extractorHeaderKey = context.getString (EXTRACTOR_HEADER_KEY); Preconditions.checkArgument (! StringUtils.isEmpty (extractorHeaderKey), "header key must") } private void configureSerializers (Context context) {String serializerListStr = context.getString (SERIALIZERS); Preconditions.checkArgument (! StringUtils.isEmpty (serializerListStr), "Must supply at least one name and serializer"); String [] serializerNames = serializerListStr.split ("\\ s +"); Context serializerContexts = new Context (context.getSubProperties (SERIALIZERS + ".")) SerializerList = Lists.newArrayListWithCapacity (serializerNames.length); for (String serializerName: serializerNames) {Context serializerContext = new Context (serializerContexts.getSubProperties (serializerName + "."); String type = serializerContext.getString ("type", "DEFAULT"); String name = serializerContext.getString ("name"); Preconditions.checkArgument (! StringUtils.isEmpty (name), "Supplied name cannot be empty.") If ("DEFAULT" .equals (type)) {serializerList.add (new NameAndSerializer (name, defaultSerializer));} else {serializerList.add (new NameAndSerializer (name, getCustomSerializer (type, serializerContext) } private RegexExtractorInterceptorSerializer getCustomSerializer (String clazzName, Context context) {try {RegexExtractorInterceptorSerializer serializer = (RegexExtractorInterceptorSerializer) Class .forName (clazzName) .newInstance (); serializer.configure (context); return serializer;} catch (Exception e) {logger.error ("Could not instantiate event serializer.", e) Throwables.propagate (e);} return defaultSerializer;} @ Override public Interceptor build () {Preconditions.checkArgument (regex! = null, "Regex pattern was misconfigured"); Preconditions.checkArgument (serializerList.size () > 0, "Must supply a valid group match id list"); return new RegexExtractorHeaderInterceptor (regex, serializerList, extractorHeader, extractorHeaderKey) }} static class NameAndSerializer {private final String headerName; private final RegexExtractorInterceptorSerializer serializer; public NameAndSerializer (String headerName, RegexExtractorInterceptorSerializer serializer) {this.headerName = headerName; this.serializer = serializer;}

Apply configuration:

Hdp2.sources.s1.interceptors = i2

Hdp2.sources.s1.interceptors.i2.type = com.wy.flume.interceptor.RegexExtractorHeaderInterceptor$Builder

Hdp2.sources.s1.interceptors.i2.regex = ([^ _] +) _ (\ d {8}). *

Hdp2.sources.s1.interceptors.i2.extractorHeader = true

Hdp2.sources.s1.interceptors.i2.extractorHeaderKey = basename

Hdp2.sources.s1.interceptors.i2.serializers = S1 S2

Hdp2.sources.s1.interceptors.i2.serializers.s1.name = log_type

Hdp2.sources.s1.interceptors.i2.serializers.s2.name = log_day

At this point, I believe you have a deeper understanding of "flume-ng how to customize the interceptor", might as well come to the actual operation of it! Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report