In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
Today, I will talk to you about what is the implementation of incremental indexing and the delivery of data to MQ kafka. Many people may not know much about it. In order to make you understand better, the editor has summarized the following content for you. I hope you can get something according to this article.
Implement incremental data indexing
We will implement the processing of incremental data according to the data object of binlog. To put it bluntly, we will build the incremental data of advertising in order to put advertising into the indexing service in the later stage to achieve the generation of incremental data to incremental index.
Define an interface to deliver incremental data (receive parameters for the conversion object of the binlog log we defined in the previous section)
/ * * ISender for delivery incremental data method definition API * * @ author Isaac.Zhang | Ruochu * / public interface ISender {void sender (MysqlRowData rowData);}
Create an incremental index listener
/ * * IncrementListener for incremental data monitoring * * @ author Isaac.Zhang | Ruochu * @ since 2019-6-27 * / @ Slf4j@Componentpublic class IncrementListener implements Ilistener {private final AggregationListener aggregationListener; @ Autowired public IncrementListener (AggregationListener aggregationListener) {this.aggregationListener = aggregationListener;} / / Select the delivery method to be injected @ Resource (name = "indexSender") private ISender sender according to the name / * * marked as {@ link PostConstruct}, * means to initialize * / @ Override @ PostConstruct public void register () {log.info ("IncrementListener register db and table info."); Constant.table2db.forEach ((tb, db)-> aggregationListener.register (db, tb, this) immediately after the service starts and the service is initialized. } @ Override public void onEvent (BinlogRowData eventData) {TableTemplate table = eventData.getTableTemplate (); EventType eventType = eventData.getEventType (); / / packaged into the last data to be delivered MysqlRowData rowData = new MysqlRowData (); rowData.setTableName (table.getTableName ()); rowData.setLevel (eventData.getTableTemplate (). GetLevel ()); / / convert EventType to OperationTypeEnum OperationTypeEnum operationType = OperationTypeEnum.convert (eventType) RowData.setOperationTypeEnum (operationType); / / get the list of fields corresponding to this operation in the template List fieldList = table.getOpTypeFieldSetMap () .get (operationType); if (null = = fieldList) {log.warn ("{} not support for {}.", operationType, table.getTableName ()); return } for (Map afterMap: eventData.getAfter ()) {Map _ afterMap = new HashMap (); for (Map.Entry entry: afterMap.entrySet ()) {String colName = entry.getKey (); String colValue = entry.getValue (); _ afterMap.put (colName, colValue) } rowData.getFieldValueMap () .add (_ afterMap);} sender.sender (rowData);}}
Turn on binlog snooping
First, configure the database connection information that listens to the binlog.
Adconf: mysql: host: 127.0.0.1 port: 3306 username: root password: 12345678 binlogName: "" position:-1 # start listening from the current location
Write configuration classes:
/ * BinlogConfig for defines the configuration information of listening Binlog * * @ author Isaac.Zhang | Ruochu * / @ Component@ConfigurationProperties (prefix = "adconf.mysql") @ Data@AllArgsConstructor@NoArgsConstructorpublic class BinlogConfig {private String host; private Integer port; private String username; private String password; private String binlogName; private Long position;}
In the section where we implement listening binlog, we have implemented a custom client CustomBinlogClient, which needs to implement binlog listening. The listening client must be a separate running thread and listen when the program starts. Let's implement the way to run the current client. Here we will use a new Runnerorg.springframework.boot.CommandLineRunner,let's code.
@ Slf4j@Componentpublic class BinlogRunner implements CommandLineRunner {@ Autowired private CustomBinlogClient binlogClient; @ Override public void run (String... Args) throws Exception {log.info ("BinlogRunner is running..."); binlogClient.connect ();}} incremental data delivery
In the process of binlog snooping, we see that there is no problem with the mysql record for data fields such as int and String, but for the time type, it is formatted into a string type: Fri Jun 21 15:07:53 CST 2019.
-Insert-WriteRowsEventData {tableId=91, includedColumns= {0,1,2,3,4,5,6,7}, rows= [[10,11, ad unit test binlog, 1,0,1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019]-Update-UpdateRowsEventData {tableId=81, includedColumnsBeforeUpdate= {0,1,2,3,4,5}, includedColumns= {0,1,2 3, 4, 5}, rows= [{before= [10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 0, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019], after= [10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 1, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019]}
For this time format, we need to pay attention to two points of information:
CST, this time format will be more than our time + 8h (China Standard time China Standard Time UT+8:00)
This date needs to be explained.
Of course, we can also change this behavior by setting the date format of mysql, where we parse the time format by coding:
/ * Thu Jun 27 08:00:00 CST 2019 * / public static Date parseBinlogString2Date (String dateString) {try {DateFormat dateFormat = new SimpleDateFormat ("EEE MMM dd HH:mm:ss zzz yyyy", Locale.US); return DateUtils.addHours (dateFormat.parse (dateString),-8) } catch (ParseException ex) {log.error ("parseString2Date error: {}", dateString); return null;}}
Because we define the index according to the hierarchical relationship between tables (Level), according to the code specification, Magic Number is not allowed, so we define a data level enumeration to express the data level.
/ * * AdDataLevel for advertising data level * * @ author Isaac.Zhang | Ruochu * / @ Getterpublic enum AdDataLevel {LEVEL2 ("2", "level 2"), LEVEL3 ("3", "level 3"), LEVEL4 ("4", "level 4"); private String level; private String desc; AdDataLevel (String level, String desc) {this.level = level; this.desc = desc;}}
Realize data delivery
Because incremental data can be delivered to different locations and uses, we previously implemented a delivery interface com.sxzhongf.ad.sender.ISender, and then we implemented a delivery class:
@ Slf4j@Component ("indexSender") public class IndexSender implements ISender {/ * deliver Binlog data * / @ Override public void sender (MysqlRowData rowData) {if (AdDataLevel.LEVEL2.getLevel (). Equals (rowData.getLevel () {Level2RowData (rowData);} else if (AdDataLevel.LEVEL3.getLevel (). Equals (rowData.getLevel () {Level3RowData (rowData) } else if (AdDataLevel.LEVEL4.getLevel (). Equals (rowData.getLevel () {Level4RowData (rowData);} else {log.error ("Binlog MysqlRowData error: {}", JSON.toJSONString (rowData)) }} private void Level2RowData (MysqlRowData rowData) {if (rowData.getTableName (). Equals (Constant.AD_PLAN_TABLE_INFO.TABLE_NAME)) {List planTables = new ArrayList (); for (Map fieldValueMap: rowData.getFieldValueMap ()) {AdPlanTable planTable = new AdPlanTable () / / the second cycle mode of Map fieldValueMap.forEach ((k, v)-> {switch (k) {case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_ID: planTable.setPlanId (Long.valueOf (v)); break Case Constant.AD_PLAN_TABLE_INFO.COLUMN_USER_ID: planTable.setUserId (Long.valueOf (v)); break; case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_STATUS: planTable.setPlanStatus (Integer.valueOf (v)) Break; case Constant.AD_PLAN_TABLE_INFO.COLUMN_START_DATE: planTable.setStartDate (CommonUtils.parseBinlogString2Date (v)); break Case Constant.AD_PLAN_TABLE_INFO.COLUMN_END_DATE: planTable.setEndDate (CommonUtils.parseBinlogString2Date (v)); break;}}); planTables.add (planTable) } / / delivery promotion plan planTables.forEach (p-> AdLevelDataHandler.handleLevel2Index (p, rowData.getOperationTypeEnum ());} else if (rowData.getTableName (). Equals (Constant.AD_CREATIVE_TABLE_INFO.TABLE_NAME)) {List creativeTables = new LinkedList (); rowData.getFieldValueMap () .forEach (afterMap-> {AdCreativeTable creativeTable = new AdCreativeTable ()) AfterMap.forEach ((k, v)-> {switch (k) {case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_CREATIVE_ID: creativeTable.setAdId (Long.valueOf (v)); break Case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_TYPE: creativeTable.setType (Integer.valueOf (v)); break; case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_MATERIAL_TYPE: creativeTable.setMaterialType (Integer.valueOf (v)) Break; case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_HEIGHT: creativeTable.setHeight (Integer.valueOf (v)); break Case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_WIDTH: creativeTable.setWidth (Integer.valueOf (v)); break; case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_AUDIT_STATUS: creativeTable.setAuditStatus (Integer.valueOf (v)) Break; case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_URL: creativeTable.setAdUrl (v); break;}}); creativeTables.add (creativeTable);}) / / deliver Advertising creativity creativeTables.forEach (c-> AdLevelDataHandler.handleLevel2Index (c, rowData.getOperationTypeEnum ());}} private void Level3RowData (MysqlRowData rowData) {.} / * handle level 4 ads * / private void Level4RowData (MysqlRowData rowData) {.}}
Put incremental data into MQ (kafka)
In order to make our data delivery more flexible and facilitate data statistics, analysis and other system requirements, let's implement an interface to put into messages. Other services can subscribe to the current MQ TOPIC to implement data subscription.
TOPICadconf: kafka: topic: ad-search-mysql-data--/** * KafkaSender for is configured in the configuration file to deliver Binlog incremental data to kafka message queue * * @ author Isaac.Zhang | Ruochu * @ since 2019-7-1 * / @ Component (value = "kafkaSender") public class KafkaSender implements ISender {@ Value ("${adconf.kafka.topic}") private String topic @ Autowired private KafkaTemplate kafkaTemplate; / * send data to kafka queue * / @ Override public void sender (MysqlRowData rowData) {kafkaTemplate.send (topic, JSON.toJSONString (rowData)) } / * Test consumption kafka message * / @ KafkaListener (topics = {"ad-search-mysql-data"}, groupId = "ad-search") public void processMysqlRowData (ConsumerRecord record) {Optional kafkaMsg = Optional.ofNullable (record.value ()); if (kafkaMsg.isPresent ()) {Object message = kafkaMsg.get () MysqlRowData rowData = JSON.parseObject (message.toString (), MysqlRowData.class); System.out.println ("kafka process MysqlRowData:" + JSON.toJSONString (rowData)); / / sender.sender () } ```After reading the above, do you have any further understanding of what is the implementation of incremental index and the delivery of data to MQ kafka? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.