Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

The method of archiving RDS and POLARDB to X-Pack Spark calculation

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article introduces the relevant knowledge of "the method of archiving RDS and POLARDB to X-Pack Spark calculation". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

X-Pack Spark services provide complex analysis, streaming, storage and machine learning capabilities for Redis, Cassandra, MongoDB, HBase and RDS storage services by means of external computing resources, so as to better solve user data processing related scenario problems.

RDS & POLARDB sub-table archiving to X-Pack Spark step

Associate POLARDB to Spark cluster with one click

POLARDB table storage

In database 'test1', a table is generated every 5 minutes, which is assumed to be tables' test1', 'test2',' test2',.

The specific table sentences are as follows:

* Please swipe left and right to read

CREATE TABLE `test1` (`a` int (11) NOT NULL, `b` time DEFAULT NULL, `c` double DEFAULT NULL, PRIMARY KEY (`a`)) ENGINE=InnoDB DEFAULT CHARSET=utf8

Debugging archived to Spark

X-pack spark provides interactive query mode to support the submission of sql, python scripts and scala code directly in the console for debugging.

1. First create an interactive query session and add the jar package of mysql-connector to it.

2. Create an interactive query

Taking pyspark as an example, the following is the code for archiving demo:

* Please swipe left and right to read

Spark.sql ("drop table sparktest"). Show () # creates a spark table with three levels of partitions: days, hours, minutes, and the last minute is used to store a specific 5-minute polardb representation of data. Field and polardb have the same type spark.sql ("CREATE table sparktest (an int, b timestamp, c double, dt string,hh string,mm string)"USING parquet PARTITIONED BY (dt, hh, mm)". Show () # this example creates databse test1 in polardb, with three tables test1 and test2,test3, which are traversed here Each table stores a 5min partition of spark # CREATE TABLE `test1` (# `a` int (11) NOT NULL,# `b` time DEFAULT NULL,# `c` double DEFAULT NULL,# PRIMARY KEY (`a`) #) ENGINE=InnoDB DEFAULT CHARSET=utf8for num in range (1 4): # construct the table name of polardb dbtable = "test1." + "test" + str (num) # spark table externalPolarDBTableNow = spark.read\ .format ("jdbc")\ .option ("driver", "com.mysql.jdbc.Driver")\ .option ("url") "jdbc:mysql://pc-xxx.mysql.polardb.rds.aliyuncs.com:3306")\ .option ("dbtable", dbtable)\ .option ("user", "name")\ .option ("password", "xxx*")\ .load () .registerTempTable ("polardbTableTemp") # generate the partition information (dtValue, hhValue, mmValue) of the spark table to be written to this polardb table data = ("20191015") "13", str (05 * num) # execute derived data sql spark.sql ("insert into sparktest partition (dt=% s, hh=% s, mm=%s)"select * from polardbTableTemp"% (dtValue, hhValue, mmValue). Show () # Delete the catalog spark.catalog.dropTempView ("polardbTableTemp") of the temporary spark mapping polardb table ("polardbTableTemp") # View the partition and count the data Mainly used for testing and verification, the actual running process can delete spark.sql ("show partitions sparktest"). Show (1000, False) spark.sql ("select count (*) from sparktest") .show ()

Production on archival operation

Interactive query is positioned as temporary query and debugging, and production jobs are recommended to be run in the way of spark jobs, using documentation references. Take the pyspark job as an example:

/ polardb/polardbArchiving.py is as follows:

* Please swipe left and right to read

#-*-coding: UTF-8-*-from _ _ future__ import print_functionimport sysfrom operator import addfrom pyspark.sql import SparkSessionif _ _ name__ = = "_ _ main__": spark = SparkSession\ .builder\ .appna me ("PolardbArchiving")\ .enableHiveSupport ()\ .getOrCreate () spark.sql ("drop table sparktest"). Show () # create a spark table with three levels of partition They are days, hours, minutes, and the last minute is used to store a specific 5-minute polardb expression of data. Field and polardb have the same type spark.sql ("CREATE table sparktest (an int, b timestamp, c double, dt string,hh string,mm string)"USING parquet PARTITIONED BY (dt, hh, mm)". Show () # this example creates databse test1 in polardb, with three tables test1 and test2,test3, which are traversed here Each table stores a 5min partition of spark # CREATE TABLE `test1` (# `a` int (11) NOT NULL, # `b` time DEFAULT NULL, # `c` double DEFAULT NULL, # PRIMARY KEY (`a`) #) ENGINE=InnoDB DEFAULT CHARSET=utf8 for num in range (1 4): # the table name of constructing polardb dbtable = "test1." > "the method of archiving RDS and POLARDB to X-Pack Spark calculation" ends here. Thank you for your reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report