Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to make spark sql support update operation when writing mysql

2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/01 Report--

Most people do not understand the knowledge points of this article "how to make spark sql support update operation when writing mysql", so the editor summarizes the following content, detailed content, clear steps, and has a certain reference value. I hope you can get something after reading this article. Let's take a look at this "how to let spark sql support update operation when writing mysql" article.

In addition to supporting Append, Overwrite, ErrorIfExists, and Ignore;, update operations are also supported.

1. First understand the background

Spark provides an enumeration class to support the mode of operation of interfacing data sources

Looking at the source code, it is clear that spark does not support update operations.

2. How to make sparkSQL support update

The key points of knowledge are:

Normally, when sparkSQL writes data to mysql:

The approximate api is:

Dataframe.write .format ("sql.execution.customDatasource.jdbc") .option ("jdbc.driver", "com.mysql.jdbc.Driver") .option ("jdbc.url", "jdbc:mysql://localhost:3306/test?user=root&password=&useUnicode=true&characterEncoding=gbk&autoReconnect=true&failOverReadOnly=false") .option ("jdbc.db", "test") .save ()

So at the bottom, spark translates the data we want to insert into: through the JDBC dialect JdbcDialect:

Insert into student (columns_1, columns_2,...) Values (?)

Then the sql statement parsed through the dialect submits the sql statement to mysql through PrepareStatement's executeBatch (), and then inserts the data.

So it's obvious that the above sql statement is completely inserting code, and it doesn't have the update operation we expect, like:

UPDATE table_name SET field1=new-value1, field2=new-value2

However, mysql exclusively supports such sql statements:

INSERT INTO student (columns_1,columns_2) VALUES ('first field value', 'second field value') ON DUPLICATE KEY UPDATE columns_1 ='ha ha', columns_2 ='ha ha da'

It roughly means that if the data does not exist, insert it, and if the data exists, perform the update operation.

Therefore, our starting point is to generate this kind of sql when sparkSQL is interfacing with JdbcDialect internally:

INSERT INTO table name (columns_1,columns_2) VALUES ('first field value', 'second field value') ON DUPLICATE KEY UPDATE columns_1 ='ha ha', columns_2 ='ha ha'. 3. Before transforming the source code, you need to understand the overall code design and execution process.

The first is:

Dataframe.write

The write method is called to return a class: DataFrameWriter

This is mainly because DataFrameWriter is an entry carrying class written by sparksql docking external data sources. The following contents are carrying information for DataFrameWriter registration.

Then, after initiating the save () operation, the data is written to

Next, take a look at the save () source code:

In the above source code, the main thing is to register the DataSource instance, and then use the write method of DataSource to write data.

When instantiating DataSource:

Def save (): Unit = {assertNotBucketed ("save") val dataSource = DataSource (df.sparkSession, className = package path of source,// custom data source partitionColumns = partitioningColumns.getOrElse (Nil), / / partition field bucketSpec = getBucketSpec,// sub-bucket (for hive) options = extraOptions.toMap) / / incoming registration information / / mode: insert data method SaveMode Df: data dataSource.write (mode, df) to be inserted}

Then there are the details of dataSource.write (mode, df). The logic of the whole paragraph is:

Do pattern matching according to providingClass.newInstance (), and then execute the code wherever it is matched.

Then take a look at what providingClass is:

After getting the package path .DefaultSource, the program enters:

So if the database is used as the write target, it will go: dataSource.createRelation, directly follow up the source code:

It's obviously a trait, so where the trait is realized, the program will go.

The place to realize this feature is: package path .DefaultSource, and then implement data insertion and update support operation in it.

4. Transform the source code

According to the flow of the code, the final operation in which sparkSQL writes data to mysql will go into the class: package path .DefaultSource

That is to say, both the normal insert operation (SaveMode) of spark and update are supported in this class.

If you let sparksql support update operations, the most important thing is to make a judgment, such as:

If (isUpdate) {sql statement: INSERT INTO student (columns_1,columns_2) VALUES ('first field value', 'second field value') ON DUPLICATE KEY UPDATE columns_1 ='ha ha da', columns_2 ='ha ha da';} else {insert into student (columns_1,columns_2,...) Values (,...)}

However, in the source code of the spark production sql statement, it is written as follows:

There is no judgment logic, but the last one is generated:

INSERT INTO TABLE (field 1, field 2....) VALUES (?)

So the first task is how to get the current code to support: ON DUPLICATE KEY UPDATE

You can make a bold design, that is, make the following judgment in the insertStatement method

Def insertStatement (conn: Connection, savemode:CustomSaveMode, table: String, rddSchema: StructType, dialect: JdbcDialect): PreparedStatement = {val columns = rddSchema.fields.map (x = > dialect.quoteIdentifier (x.name)). MkString (",") val placeholders = rddSchema.fields.map (_ = > "?"). MkString (",") if (savemode = = CustomSaveMode.update) {/ / TODO if it is update The mode processing s "INSERT INTO $table ($columns) VALUES ($placeholders) ON DUPLICATE KEY UPDATE $duplicateSetting"} esle {val sql = s "INSERT INTO $table ($columns) VALUES ($placeholders)" conn.prepareStatement (sql)}

In this way, we verify the savemode mode passed in by the user, and if it is a update operation, we return the corresponding sql statement!

So according to the above logic, our code reads as follows:

So we get the corresponding sql statement.

However, this sql statement is not allowed, because the prepareStatement operation of jdbc is performed in spark, which involves cursors.

That is, when jdbc traverses the sql, the source code does this:

Take a look at makeSetter:

The so-called pit is:

Insert into table (field 1, field 2, field 3) values (?)

Then the array length currently returned in the source code should be 3:

Val setters: Array [JDBCValueSetter] = rddSchema.fields.map (_ .dataType) .map (makeSetter (conn, dialect, _)) .toArray

But if we support the update operation at this time, both:

Insert into table (field 1, field 2, field 3) values (?) ON DUPLICATE KEY UPDATE field 1 =?, field 2 =?, field 3?

So it's obvious that the above sql statement provides six? But only 3 when the length of the field is specified

In this case, the following update operation cannot be performed, and the program reports an error!

So we need to have an identification mechanism, both:

If (isupdate) {val numFields = rddSchema.fields.length * 2} else {val numFields = rddSchema.fields.length}

Row [1mage2) setter / / index of setter, index of row setter (1) setter (2) setter (3) setter (4) setter (5)

So the placeholder in prepareStatment should be twice as much as row, and it should be a logic like this.

So, what the code looked like before the transformation:

What it looks like after the transformation:

Try {if (supportsTransactions) {conn.setAutoCommit (false) / / Everything in the same db transaction. Conn.setTransactionIsolation (finalIsolationLevel)} / / val stmt = insertStatement (conn, table, rddSchema, dialect) / / the latest sql statement is used here Encapsulated as prepareStatement val stmt = conn.prepareStatement (sqlStmt) println (sqlStmt) / * there is such an operation in mysql: * INSERT INTO user_admin_t (_ id,password) VALUES (password inserted for the first time in '1Zongjie') * INSERT INTO user_admin_t (_ id,password) VALUES (password inserted for the first time in '1Zongjie') ON DUPLICATE KEY UPDATE _ id = 'UpId',password =' upPassword' * if it is the following ON DUPLICATE KEY operation, then the cursor in prepareStatement will be doubled * and if there is no update operation, then his cursor will be counted from 0 * if it is a update operation, the previous insert operation * * / / makeSetter should also be adapted to the update operation. That is, the cursor problem val isUpdate = saveMode = = CustomSaveMode.Updateval setters: Array [JDBCValueSetter] = isUpdate match {case true = > val setters: Array [JDBCValueSetter] = rddSchema.fields.map (_ .dataType) .map (makeSetter (conn, dialect) ToArray Array.fill (2) (setters). Flatten case _ = > rddSchema.fields.map (_ .dataType) val numFieldsLength = rddSchema.fields.length val numFields = isUpdate match {case true = > numFieldsLength * 2 case _ = > numFieldsLength val cursorBegin = numFields / 2 try {var rowCount = 0 while (iterator.hasNext) {val row = iterator.next ( ) var I = 0 while (I

< numFields) { if(isUpdate){ //需要判断当前游标是否走到了ON DUPLICATE KEY UPDATE i < cursorBegin match{ //说明还没走到update阶段 case true =>

/ / if row.isNullAt is null, set the null value if (row.isNullAt (I)) {stmt.setNull (I + 1, nullTypes (I))} else {setters (I) .apply (stmt, row, I) 0)} / / indicates that case false = > if (row.isNullAt (I-cursorBegin)) {/ / pos-offset stmt.setNull (I + 1, nullTypes (I-cursorBegin)) setters (I) .apply (stmt, row, I) CursorBegin)}} else {if (row.isNullAt (I)) {stmt.setNull (I + 1, nullTypes (I))} else {setters (I) .apply (stmt, row, I 0)} / / scroll cursor I = I + 1} stmt.addBatch () rowCount + = 1 if (rowCount% batchSize = = 0) {stmt.executeBatch () rowCount = 0} if (rowCount > 0) {stmt.executeBatch () } finally {stmt.close () conn.commit () committed = true Iterator.empty} catch {case e: SQLException = > val cause = e.getNextException if (cause! = null & & e.getCause! = cause) {if (e.getCause = = null) {e.initCause (cause)} else {e.addSuppressed (cause) ) throw e} finally {if (! committed) {/ / The stage must fail. We got here through an exception path, so / / let the exception through unless rollback () or close () want to / / tell the user about another problem. If (supportsTransactions) {conn.rollback () conn.close ()} else {/ / The stage must succeed. We cannot propagate any exception close () might throw. Try {conn.close ()} catch {case e: Exception = > logWarning ("Transaction succeeded, but closing failed", e) / A `JDBCValueSetter`Row` into a field for / / `PreparedStatement`. The last argument `Int` means the index for the value to be set / / in the SQL statement and also used for the value in `Row`. / PreparedStatement, Row, position, cursor private type JDBCValueSetter = (PreparedStatement, Row, Int, Int) = > Unit private def makeSetter (conn: Connection, dialect: JdbcDialect, dataType: DataType): JDBCValueSetter = dataType match {case IntegerType = > (stmt: PreparedStatement, row: Row, pos: Int,cursor:Int) = > stmt.setInt (pos + 1, row.getInt (pos-cursor) case LongType = > stmt.setLong (pos + 1) Row.getLong (pos-cursor) case DoubleType = > stmt.setDouble (pos + 1, row.getDouble (pos-cursor)) case FloatType = > stmt.setFloat (pos + 1, row.getFloat (pos-cursor)) case ShortType = > stmt.setInt (pos + 1, row.getShort (pos-cursor)) case ByteType = > stmt.setInt (pos + 1, row.getByte (pos-cursor)) case BooleanType = > stmt.setBoolean (pos + 1) Row.getBoolean (pos-cursor) case StringType = > / / println (row.getString (pos)) stmt.setString (pos + 1, row.getString (pos-cursor)) case BinaryType = > stmt.setBytes (pos + 1, row.getas [Array [byte]] (pos-cursor) case TimestampType = > stmt.setTimestamp (pos + 1, row.getAs [java.sql.Timestamp] (pos-cursor)) case DateType = > stmt.setDate (pos + 1) Row.getAs [java.sql.Date] (pos-cursor) case t: DecimalType = > stmt.setBigDecimal (pos + 1, row.getDecimal (pos-cursor)) case ArrayType (et, _) = > / / remove type length parameters from end of type nameval typeName = getJdbcType (et, dialect). DatabaseTypeDefinition .toLowerCase.split ("\ (") (0) val array = conn.createArrayOf (typeName) Row.getSeq [AnyRef] (pos-cursor) .toArray) stmt.setArray (pos + 1, array) case _ = > (_: PreparedStatement, _: Row, pos: Int,cursor:Int) > throw new IllegalArgumentException (s "Can't translate non-null value for field $pos")} above is the content of this article on "how to make spark sql support update operation when writing mysql" I believe we all have a certain understanding. I hope the content shared by the editor will be helpful to you. If you want to know more about the relevant knowledge, please pay attention to the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report