In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
Today, I will talk to you about how to carry out kafka source code analysis in flink. Many people may not know much about it. In order to make you understand better, the editor has summarized the following contents for you. I hope you can get something according to this article.
Recently, I have been working on something related to flink sql, and the goal of the first phase is to solve the problems of kafka consumption and writing. However, some students do not understand very well, today we will analyze the inheritance level of the package in detail.
The flink source code is as follows:
Public class KafkaTableSourceFactory implements StreamTableSourceFactory {private ConcurrentHashMap kafkaTableSources = new ConcurrentHashMap (); @ Override public Map requiredContext () {Map context = new HashMap (); context.put (CONNECTOR_TYPE (), KafkaConnectorDescriptor.CONNECTOR_TYPE); context.put (CONNECTOR_PROPERTY_VERSION (), String.valueOf (KafkaConnectorDescriptor.CONNECTOR_PROPERTY_VERSION)); return context;} @ Override public List supportedProperties () {List properties = new ArrayList () Properties.add (KafkaConnectorDescriptor.DATABASE_KEY); properties.add (KafkaConnectorDescriptor.TABLE_KEY); return properties;} @ Override public StreamTableSource createStreamTableSource (Map properties) {/ / need to cache KafkaTableSource kafkaTableSource; String dataBase = properties.get (KafkaConnectorDescriptor.DATABASE_KEY) to avoid frequent triggers; String table = properties.get (KafkaConnectorDescriptor.TABLE_KEY) If (! kafkaTableSources.containsKey (dataBase + table)) {Kafka08UDMPBTableSource.Builder builder = new Kafka08UDMPBTableSource.Builder (); kafkaTableSource = builder .cluster (dataBase) .subject (table) .build (); kafkaTableSources.put (dataBase + table,kafkaTableSource);} else {kafkaTableSource = kafkaTableSources.get (dataBase + table) } return kafkaTableSource }} class Kafka08PBTableSource protected (topic: String, properties: Properties, schema: TableSchema, typeInformation: TypeInformation [Row], paramMap: util.LinkedHashMap [String, AnyRef] EntryClass: String) extends KafkaTableSource (schema, topic, properties, new PBRowDeserializationSchema (typeInformation, paramMap,entryClass)) {override def createKafkaConsumer (topic: String, properties: Properties, deserializationSchema: DeserializationSchema [Row]): FlinkKafkaConsumerBase [Row] = {this.setStartupMode (StartupMode.EARLIEST) new FlinkKafkaConsumer08 (topic, deserializationSchema, properties). SetStartFromEarliest ()}}
The following user-defined sink class for kafka:
Class Kafka08UDMPBTableSink (topic: String, properties: Properties, partitioner: optional [FlinkKafkaPartitioner [Row]], paramMap: util.LinkedHashMap [String, AnyRef], serializationSchema: SerializationSchema [Row], fieldNames: Array [String] FieldTypes: Array [TypeInformation [_]]) extends KafkaTableSink (topic, properties, partitioner.orElse (new FlinkFixedPartitioner [Row])) {override def createKafkaProducer (topic: String, properties: Properties, serializationSchema: SerializationSchema [Row], partitioner: optional [FlinkKafkaPartitioner [Row]]): SinkFunction [Row] = {new FlinkKafkaProducer08 [Row] (topic, serializationSchema, properties) Partitioner.orElse (new FlinkFixedPartitioner [row])} override def createSerializationSchema (rowSchema: RowTypeInfo) = serializationSchema override def createCopy = new Kafka08UDMPBTableSink (topic, properties, this.partitioner, paramMap, serializationSchema, fieldNames, fieldTypes) override def configure (fieldNames: Array [String], fieldTypes: Array [TypeInformation [_]): KafkaTableSink = {super.configure (this.fieldNames, this.fieldTypes)} override def getFieldNames: Array [String] = this.fieldNames / * * Returns the types of the table fields. * / override def getFieldTypes: array [TypeInformation [_]] = this.fieldTypes override def emitDataStream (dataStream: DataStream [Row]): Unit = {val kafkaProducer = createKafkaProducer (topic, properties, serializationSchema, partitioner) dataStream.addSink (kafkaProducer) .name (TableConnectorUtil.generateRuntimeName (this.getClass, fieldNames))}} public class TrackRowDeserializationSchema implements SerializationSchema, DeserializationSchema {private static final long serialVersionUID =-2885556750743978636L; / * Type information describing the input type. * / private TypeInformation typeInfo = null; private LinkedHashMap paraMap; private String inSchema; private String outSchema; private String inClass; private String outClass;} public class TrackRowFormatFactory extends TableFormatFactoryBase implements SerializationSchemaFactory, DeserializationSchemaFactory {public TrackRowFormatFactory () {super (TrackValidator.FORMAT_TYPE_VALUE, 1, false);} public TrackRowFormatFactory (String type, int version, boolean supportsSchemaDerivation) {super (type, version, supportsSchemaDerivation) } @ Override protected List supportedFormatProperties () {final List properties = new ArrayList (); properties.add (TrackValidator.FORMAT_IN_SCHEMA); properties.add (TrackValidator.FORMAT_IN_CLASS); properties.add (TrackValidator.FORMAT_OUT_CLASS); properties.add (TrackValidator.FORMAT_OUT_SCHEMA); properties.add (TrackValidator.FORMAT_TYPE_INFORMATION); properties.add (TrackValidator.FORMAT_TYPE_VALUE) Return properties;}} after reading the above, do you have any further understanding of how to analyze the kafka source code in flink? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.