Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

The method of parsing Json format data by Flink SQL

2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

1. Flink version 1.7.22. Introduce dependency

Use maven to build the project, so pom.xml adds the following dependencies:

Org.apache.flink flink-table_2.11 1.7.2 org.apache.flink flink-json 1.7.2 com.fasterxml.jackson.core jackson-databind 2.9.8 joda- Time joda-time 2.10.1 3. Google Protobuf message definition 3.1 message definition

Response.proto file

Syntax = "proto3"; package com.google.protos;// search response message SearchResponse {uint64 search_time = 1; uint32 code = 2; Result results = 3;} / / search result message Result {string id = 1; repeated Item items = 2;} / / search result item message Item {string id = 1; string name = 2; string title = 3; string url = 4; uint64 publish_time = 5; float score = 6 / / recommendation or similar weighted score}

Message example, including a nested object results and an array object items:

{"search_time": 1553650604, "code": 1553650604, "results": {"id": "449", "items": [{"id": "47", "name": "name47", "title": "title 47" "url": "https://www.google.com.hk/item-47"," publish_time ": 1552884870," score ": 96.03}, {" id ":" 2 "," name ":" name2 "," title ":" title 2 " "url": "https://www.google.com.hk/item-2"," publish_time ": 1552978902," score ": 16.06}, {" id ":" 60 "," name ":" name60 "," title ":" title 60 " "url": "https://www.google.com.hk/item-60"," publish_time ": 1553444982," score ": 62.58}, {" id ":" 67 "," name ":" name67 "," title ":" title 67 " "url": "https://www.google.com.hk/item-67"," publish_time ": 1553522957," score ": 12.17}, {" id ":" 15 "," name ":" name15 "," title ":" title 15 " "url": "https://www.google.com.hk/item-15"," publish_time ": 1553525421," score ": 32.36}, {" id ":" 53 "," name ":" name53 "," title ":" title 53 " "url": "https://www.google.com.hk/item-53"," publish_time ": 1553109227," score ": 52.13}, {" id ":" 70 "," name ":" name70 "," title ":" title 70 " "url": "https://www.google.com.hk/item-70"," publish_time ": 1552781921," score ": 1.72}, {" id ":" 53 "," name ":" name53 "," title ":" title 53 " "url": "https://www.google.com.hk/item-53"," publish_time ": 1553229003," score ": 5.31}, {" id ":" 30 "," name ":" name30 "," title ":" title 30 " "url": "https://www.google.com.hk/item-30"," publish_time ": 1553282629," score ": 26.51}, {" id ":" 36 "," name ":" name36 "," title ":" title 36 " "url": "https://www.google.com.hk/item-36"," publish_time ": 1552665833," score ": 48.76}]}} 3.2 Kakfa Producer issues random response Json string import com.google.protos.GoogleProtobuf.* Import com.googlecode.protobuf.format.JsonFormat;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.text.DecimalFormat;import java.time.Instant;import java.util.Properties;import java.util.Random;import java.util.concurrent.TimeUnit / * @ author lynn * @ ClassName com.lynn.kafka.SearchResponsePublisher * @ Description TODO * @ Date 19-3-26 8:17 * @ Version 1.0 * * / public class SearchResponsePublisher {private static final Logger LOG = LoggerFactory.getLogger (SearchResponsePublisher.class); public String randomMessage (int results) {Random random = new Random (); DecimalFormat fmt = new DecimalFormat ("# # 0.00"); SearchResponse.Builder response = SearchResponse.newBuilder () Response.setSearchTime (Instant.now (). GetEpochSecond ()) .setCode (random.nextBoolean ()? 200 random.nextBoolean 404); Result.Builder result = Result.newBuilder () .setId ("" + random.nextInt (1000)); for (int I = 0; I

< results; i++) { int number = random.nextInt(100); Item.Builder builder = Item.newBuilder() .setId(number+"") .setName("name"+number) .setTitle("标题"+number) .setUrl("https://www.google.com.hk/item-"+number) .setPublishTime(Instant.now().getEpochSecond() - random.nextInt(1000000)) .setScore(Float.parseFloat(fmt.format(random.nextInt(99) + random.nextFloat()))); result.addItems(builder.build()); } response.setResults(result.build()); return new JsonFormat().printToString(response.build()); } /** * * @param args */ public static void main(String[] args) throws InterruptedException{ if(args.length < 3){ System.err.println("Please input broker.servers and topic and records number!"); System.exit(-1); } String brokers = args[0]; String topic = args[1]; int recordsNumber = Integer.parseInt(args[2]); LOG.info("I will publish {} records...", recordsNumber); SearchResponsePublisher publisher = new SearchResponsePublisher();// System.out.println(publisher.randomMessage(10));// if(recordsNumber == 1000) return; Properties props = new Properties(); props.put("bootstrap.servers", brokers); //all:-1 props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer(props); int count = 0; while (count++ < recordsNumber){ producer.send(new ProducerRecord(topic, String.valueOf(Instant.now().toEpochMilli()), publisher.randomMessage(10))); TimeUnit.MILLISECONDS.sleep(100); }// producer.flush(); producer.close(); }}4. 源代码Java:4.1 引入pakcagesimport org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.Types;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Json;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.table.sinks.PrintTableSink;import org.apache.flink.types.Row;import org.slf4j.Logger;import org.slf4j.LoggerFactory;4.2 源代码:// set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(1); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); Kafka kafka = new Kafka().version("0.11") .topic(sourceTopic) .startFromEarliest()// .startFromLatest() .property("bootstrap.servers", brokers) .property("group.id", "res") .property("session.timeout.ms", "30000") .sinkPartitionerFixed(); tableEnv.connect(kafka) .withFormat(new Json() .failOnMissingField(false) .deriveSchema()) .withSchema(new Schema() .field("search_time", Types.LONG()) .field("code", Types.INT()) .field("results", Types.ROW( new String[]{"id", "items"}, new TypeInformation[]{ Types.STRING(), ObjectArrayTypeInfo.getInfoFor(Row[].class, //Array.newInstance(Row.class, 10).getClass(), Types.ROW( new String[]{"id", "name", "title", "url", "publish_time", "score"}, new TypeInformation[]{Types.STRING(),Types.STRING(),Types.STRING(),Types.STRING(),Types.LONG(),Types.FLOAT()} ))}) )).inAppendMode().registerTableSource("tb_json");//item[1] item[10] 数组下标从1开始String sql4 = "select search_time, code, results.id as result_id, items[1].name as item_1_name, items[2].id as item_2_id\n" + "from tb_json"; Table table4 = tableEnv.sqlQuery(sql4); tableEnv.registerTable("tb_item_2", table4); LOG.info("------------------print {} schema------------------", "tb_item_2"); table4.printSchema(); tableEnv.registerTableSink("console4", new String[]{"f0", "f1", "f2", "f3", "f4"}, new TypeInformation[]{ Types.LONG(),Types.INT(), Types.STRING(), Types.STRING(), Types.STRING() }, new PrintTableSink()); table4.insertInto("console4"); // execute program env.execute("Flink Table Json Engine");4.3 SQL语句select search_time, code, results.id as result_id, //嵌套json子字段 items[1].name as item_1_name, //数组对象子字段,数组下标从1开始 items[2].id as item_2_idfrom tb_json   嵌套字段可以通过.连接符直接获取,而数组元素可以通过[下标]获取,下标从1开始,与Java中数组下标从0开始不同. 4.3 Schema定义   按照Json对象的嵌套以及数组格式进行定义,即无需将每个字段展平进行定义,将嵌套字段定义为Row类型,数组类型定义为ObjectArrayTypeInfo或BasicArrayTypeInfo, ObjectArrayTypeInfo的第一个参数为数组类型,如示例中Row[].class 或Array.newInstance(Row.class, 10).getClass()方式获取class. 4.4 经测试发现flink-json*.jar中的代码问题: convert方法中的类型判断使用==,可能时由于flink版本的原因引起的==运算符没有重载.因此将此运算符替换为.equals()方法. JsonRowDeserializationSchema.java private Object convert(JsonNode node, TypeInformation info) { if (Types.VOID.equals(info) || node.isNull()) { return null; } else if (Types.BOOLEAN.equals(info)) { return node.asBoolean(); } else if (Types.STRING.equals(info)) { return node.asText(); } else if (Types.BIG_DEC.equals(info)) { return node.decimalValue(); } else if (Types.BIG_INT.equals(info)) { return node.bigIntegerValue(); } else if(Types.LONG.equals(info)){ return node.longValue(); } else if(Types.INT.equals(info)){ return node.intValue(); } else if(Types.FLOAT.equals(info)){ return node.floatValue(); } else if(Types.DOUBLE.equals(info)){ return node.doubleValue(); } else if (Types.SQL_DATE.equals(info)) { return Date.valueOf(node.asText()); } else if (Types.SQL_TIME.equals(info)) { // according to RFC 3339 every full-time must have a timezone; // until we have full timezone support, we only support UTC; // users can parse their time as string as a workaround final String time = node.asText(); if (time.indexOf('Z') < 0 || time.indexOf('.') >

= 0) {throw new IllegalStateException ("Invalid time format. Only a time in UTC timezone without milliseconds is supported yet. "+" Format: HH:mm:ss'Z' ");} return Time.valueOf (time.substring (0, time.length ()-1));} else if (Types.SQL_TIMESTAMP.equals (info)) {/ / according to RFC 3339 every date-time must have a timezone; / / until we have full timezone support, we only support UTC / / users can parse their time as string as a workaround final String timestamp = node.asText (); if (timestamp.indexOf ('Z'))

< 0) { throw new IllegalStateException( "Invalid timestamp format. Only a timestamp in UTC timezone is supported yet. " + "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); } return Timestamp.valueOf(timestamp.substring(0, timestamp.length() - 1).replace('T', ' ')); } else if (info instanceof RowTypeInfo) { return convertRow(node, (RowTypeInfo) info); } else if (info instanceof ObjectArrayTypeInfo) { return convertObjectArray(node, ((ObjectArrayTypeInfo) info).getComponentInfo()); } else if (info instanceof BasicArrayTypeInfo) { return convertObjectArray(node, ((BasicArrayTypeInfo) info).getComponentInfo()); } else if (info instanceof PrimitiveArrayTypeInfo && ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { return convertByteArray(node); } else { // for types that were specified without JSON schema // e.g. POJOs try { return objectMapper.treeToValue(node, info.getTypeClass()); } catch (JsonProcessingException e) { throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node); } } } JsonRowSerializationSchema.java private JsonNode convert(ContainerNode container, JsonNode reuse, TypeInformation info, Object object) { if (Types.VOID.equals(info) || object == null) { return container.nullNode(); } else if (Types.BOOLEAN.equals(info)) { return container.booleanNode((Boolean) object); } else if (Types.STRING.equals(info)) { return container.textNode((String) object); } else if (Types.BIG_DEC.equals(info)) { // convert decimal if necessary if (object instanceof BigDecimal) { return container.numberNode((BigDecimal) object); } return container.numberNode(BigDecimal.valueOf(((Number) object).doubleValue())); } else if (Types.BIG_INT.equals(info)) { // convert integer if necessary if (object instanceof BigInteger) { return container.numberNode((BigInteger) object); } return container.numberNode(BigInteger.valueOf(((Number) object).longValue())); } else if(Types.LONG.equals(info)){ if(object instanceof Long){ return container.numberNode((Long) object); } return container.numberNode(Long.valueOf(((Number) object).longValue())); } else if(Types.INT.equals(info)){ if(object instanceof Integer){ return container.numberNode((Integer) object); } return container.numberNode(Integer.valueOf(((Number) object).intValue())); } else if(Types.FLOAT.equals(info)){ if(object instanceof Float){ return container.numberNode((Float) object); } return container.numberNode(Float.valueOf(((Number) object).floatValue())); } else if(Types.DOUBLE.equals(info)){ if(object instanceof Double){ return container.numberNode((Double) object); } return container.numberNode(Double.valueOf(((Number) object).doubleValue())); } else if (Types.SQL_DATE.equals(info)) { return container.textNode(object.toString()); } else if (Types.SQL_TIME.equals(info)) { final Time time = (Time) object; // strip milliseconds if possible if (time.getTime() % 1000 >

0) {return container.textNode (timeFormatWithMillis.format (time));} return container.textNode (timeFormat.format (time));} else if (Types.SQL_TIMESTAMP.equals (info)) {return container.textNode (timestampFormat.format ((Timestamp) object) } else if (info instanceof RowTypeInfo) {if (reuse! = null & & reuse instanceof ObjectNode) {return convertRow ((ObjectNode) reuse, (RowTypeInfo) info, (Row) object);} else {return convertRow (null, (RowTypeInfo) info, (Row) object) } else if (info instanceof ObjectArrayTypeInfo) {if (reuse! = null & & reuse instanceof ArrayNode) {return convertObjectArray ((ArrayNode) reuse, ((ObjectArrayTypeInfo) info). GetComponentInfo (), (Object []) object);} else {return convertObjectArray (null, ((ObjectArrayTypeInfo) info). GetComponentInfo (), (Object []) object) } else if (info instanceof BasicArrayTypeInfo) {if (reuse! = null & & reuse instanceof ArrayNode) {return convertObjectArray ((ArrayNode) reuse, ((BasicArrayTypeInfo) info). GetComponentInfo (), (Object []) object);} else {return convertObjectArray (null, ((BasicArrayTypeInfo) info). GetComponentInfo (), (Object []) object) }} else if (info instanceof PrimitiveArrayTypeInfo & & ((PrimitiveArrayTypeInfo) info). GetComponentType () = = Types.BYTE) {return container.binaryNode ((byte []) object);} else {/ / for types that were specified without JSON schema / / e.g. POJOs try {return mapper.valueToTree (object) } catch (IllegalArgumentException e) {throw new IllegalStateException ("Unsupported type information'" + info + "'for object:" + object, e);}} 4.5 submit jar package to the cluster to run

Add a file:

Resources/META-INF/services/org.apache.flink.table.factories.TableFactory

Org.apache.flink.formats.json.JsonRowFormatFactoryorg.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory

Since the files with the same name in kafka-connector jar and json jar will be overwritten after packaging, the contents of the two files need to be retained.

5. Attached PrintTableSink source code

Refer to Alibaba blink Branch

Scala:

BatchCompatibleStreamTableSink.scala

/ * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "ASIS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * / package org.apache.flink.table.sinksimport org.apache.flink.table.api._import org.apache.flink.streaming.api.datastream. {DataStream, DataStreamSink} / * * Defines an external [[TableSink]] to emit a batch [[Table]] for * compatible with stream connect plugin. * / trait BatchCompatibleStreamTableSink [T] extends TableSink [T] {/ * * Emits the DataStream. * / def emitBoundedStream (boundedStream: DataStream [T]): DataStreamSink [_]}

PrintTableSink.scala

/ * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "ASIS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * / package org.apache.flink.table.sinksimport java.lang. {Boolean = > JBool} import java.util.TimeZoneimport java.util. {Date = > JDate} import java.sql.Dateimport java.sql.Timeimport java.sql.Timestampimport org.apache.flink.api.common.typeinfo.TypeInformationimport org.apache.flink.api.java.tuple. {Tuple2 = > JTuple2} import org.apache.flink.api.java.typeutils.RowTypeInfoimport org.apache.flink.streaming.api.datastream. {DataStream DataStreamSink} import org.apache.flink.streaming.api.functions.sink.RichSinkFunctionimport org.apache.flink.streaming.api.operators.StreamingRuntimeContextimport org.apache.flink.types.Rowimport org.apache.flink.configuration.Configurationimport org.apache.flink.table.runtime.functions.DateTimeFunctionsimport org.apache.flink.util.StringUtils/** * A simple [[TableSink]] to output data to console. * * / class PrintTableSink () extends TableSinkBase [JTuple2 [JBool, Row]] with BatchCompatibleStreamTableSink [JTuple2 [JBool, Row]] with UpsertStreamTableSink [Row] {override def emitDataStream (dataStream: DataStream [JTuple2 [JBool, Row]]) = {val sink: PrintSinkFunction = new PrintSinkFunction dataStream.addSink (sink) .name (sink.toString)} override protected def copy: TableSinkBase [JTuple2 [JBool] Row]] = new PrintTableSink () override def setKeyFields (keys: Array [String]): Unit = {} override def setIsAppendOnly (isAppendOnly: JBool): Unit = {} / / override def getRecordType: DataType = DataTypes.createRowType (getFieldTypes, getFieldNames) override def getRecordType: TypeInformation [Row] = {new RowTypeInfo (getFieldTypes, getFieldNames)} / * * Emits the DataStream. * / override def emitBoundedStream (boundedStream: DataStream [JTuple2 [JBool, Row]]) = {val sink: PrintSinkFunction = new PrintSinkFunction () boundedStream.addSink (sink) .name (sink.toString)}} / * * Implementation of the SinkFunction writing every tuple to the standard output. * * / class PrintSinkFunction () extends RichSinkFunction [JTuple2 [JBool, Row]] {private var prefix: String = _ override def open (parameters: Configuration): Unit = {super.open (parameters) val context = getRuntimeContext.asInstanceOf [StreamingRuntimeContext] prefix = "task-" + (context.getIndexOfThisSubtask + 1) + ">"} override def invoke (in: JTuple2 [JBool, Row]): Unit = {val sb = new StringBuilder val row = in.f1 for (I 0) sb.append (" ") val f = row.getField (I) if (f.isInstanceOf [Date]) {sb.append (DateTimeFunctions.dateFormat (f. AsInstanceOf [JDate] .getTime," yyyy-MM-dd ")} else if (f.isInstanceOf [Time]) {sb.append (DateTimeFunctions.dateFormat (f.asInstanceOf [JDate] .getTime) "HH:mm:ss")} else if (f.isInstanceOf1583857352) {sb.append (DateTimeFunctions.dateFormat (f. AsInstanceOf [JDate] .getTime) "yyyy-MM-dd HH:mm:ss.SSS")} else {sb.append (StringUtils.arrayAwareToString (f))} if (in.f0) {System.out.println (prefix + "(+)" + sb.toString ())} else {System.out.println (prefix + "(-)" + sb.toString ())}} override def close ( ): Unit = {this.prefix = ""} override def toString: String = "Print to System.out"}

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report