Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to execute jdbc in sharding

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

Sharding how to implement jdbc, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain for you in detail, people with this need can come to learn, I hope you can gain something.

Memory limit mode: the premise for using this mode is that ShardingSphere does not limit the number of database connections consumed by a single operation. If the actual SQL needs to operate on 200 tables in a database instance, a new database connection is created for each table and processed concurrently in a multi-threaded way to maximize execution efficiency. And when SQL meets the conditions, streaming merging is preferred to prevent memory overflow or avoid frequent garbage collection.

Connection restriction mode: the premise for using this mode is that ShardingSphere strictly controls the number of database connections consumed for a single operation. If the actual SQL needs to operate on 200 tables in a database instance, only a unique database connection will be created and its 200 tables will be serialized. If the fragments in one operation are scattered in different databases, multithreading is still used to handle operations on different libraries, but only one unique database connection is created for each operation of each library. This prevents problems caused by taking up too much of a database connection for a single request. This mode always selects memory merging

Case: this paper mainly uses a simple query statement of SELECT I. * FROM t_order o, t_order_item I WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2 to analyze how ss executes sql roughly, according to the rewritten sql in fragments Should be demo_ds_slave_0:SELECT * FROM t_order_0 I, t_order_item_0 o WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2 to execute

Preparation stage

1. Initialize PreparedStatementExecutor#init, encapsulating Statement execution unit

Public final class PreparedStatementExecutor extends AbstractStatementExecutor {@ Getter private final boolean returnGeneratedKeys; public PreparedStatementExecutor (final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys, final ShardingConnection shardingConnection) {super (resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection); this.returnGeneratedKeys = returnGeneratedKeys;} / * * Initialize executor. * * @ param routeResult route result * @ throws SQLException SQL exception * / public void init (final SQLRouteResult routeResult) throws SQLException {setSqlStatement (routeResult.getOptimizedStatement (). GetSQLStatement ()); / / add a routing unit, that is, getExecuteGroups () .addAll (obtainExecuteGroups (routeResult.getRouteUnits () corresponding to the data source; / / cache statement, parameter cacheStatements () } private Collection obtainExecuteGroups (final Collection routeUnits) throws SQLException {/ / Encapsulation Statement execution unit return getSqlExecutePrepareTemplate () .getExecuteUnitGroups (routeUnits, new SQLExecutePrepareCallback () {@ Override public List getConnections (final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {return PreparedStatementExecutor.super.getConnection () .getConnections (connectionMode, dataSourceName, connectionSize) } @ Override public StatementExecuteUnit createStatementExecuteUnit (final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException {return new StatementExecuteUnit (routeUnit, createPreparedStatement (connection, routeUnit.getSqlUnit (). GetSql (), connectionMode);}});} @ SuppressWarnings ("MagicConstant") private PreparedStatement createPreparedStatement (final Connection connection, final String sql) throws SQLException {return returnGeneratedKeys? Connection.prepareStatement (sql, Statement.RETURN_GENERATED_KEYS): connection.prepareStatement (sql, getResultSetType (), getResultSetConcurrency (), getResultSetHoldability ());}...}

two。 Execute wrapper Statement execution unit getSqlExecutePrepareTemplate (). GetExecuteUnitGroups

@ RequiredArgsConstructorpublic final class SQLExecutePrepareTemplate {private final int maxConnectionsSizePerQuery; / * Get execute unit groups. * * @ param routeUnits route units * @ param callback SQL execute prepare callback * @ return statement execute unit groups * @ throws SQLException SQL exception * / public Collection getExecuteUnitGroups (final Collection routeUnits, final SQLExecutePrepareCallback callback) throws SQLException {return getSynchronizedExecuteUnitGroups (routeUnits, callback) } private Collection getSynchronizedExecuteUnitGroups (final Collection routeUnits, final SQLExecutePrepareCallback callback) throws SQLException {/ / data source corresponds to a collection of sql units, that is, demo_ds_0: [select i.* from t_order_0 I, t_order_item_0 o where i.order_id = o.order_id and i.order_id =?] Map sqlUnitGroups = getSQLUnitGroups (routeUnits); Collection result = new LinkedList (); for (Entry entry: sqlUnitGroups.entrySet ()) {/ / add sharding execution group result.addAll (getSQLExecuteGroups (entry.getKey (), entry.getValue (), callback));} return result;} private Map getSQLUnitGroups (final Collection routeUnits) {Map result = new LinkedHashMap (routeUnits.size (), 1) For (RouteUnit each: routeUnits) {if (! result.containsKey (each.getDataSourceName () {result.put (each.getDataSourceName (), new LinkedList ());} result.get (each.getDataSourceName ()) .add (each.getSqlUnit ());} return result } private List getSQLExecuteGroups (final String dataSourceName, final List sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {List result = new LinkedList (); / / within the range allowed by maxConnectionSizePerQuery, when the number of requests to be executed by a connection is greater than 1, which means that the current database connection cannot hold the corresponding data result set, memory merging must be used. / / conversely, when the number of requests to be executed by a connection is equal to 1, which means that the current database connection can hold the corresponding data result set, the streaming merge / / TODO scenario can be used: in the case of tables regardless of libraries. There will be a data source corresponding to multiple sql units / / calculate the partition size required int desiredPartitionSize = Math.max (0 = = sqlUnits.size ()% maxConnectionsSizePerQuery? SqlUnits.size () / maxConnectionsSizePerQuery: sqlUnits.size () / maxConnectionsSizePerQuery + 1,1); / / partitioning by partition size / / case: / / sqlUnits = [1,2,3 desiredPartitionSize 4,5] / / desiredPartitionSize = 2 / / then the result is: [[1,2], [3jin4], [5]] List sqlUnitPartitions = Lists.partition (sqlUnits, desiredPartitionSize) / / maxConnectionsSizePerQuery this parameter represents the maximum number of connections allowed per database in a query / / determine the use of connection mode / / CONNECTION_STRICTLY connection restriction mode / / MEMORY_STRICTLY memory restriction mode ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size () according to maxConnectionsSizePerQuery? ConnectionMode.CONNECTION_STRICTLY: ConnectionMode.MEMORY_STRICTLY; / / get the connection of partition size List connections = callback.getConnections (connectionMode, dataSourceName, sqlUnitPartitions.size ()); int count = 0; / / traverse the partition and put the partitioned sql unit to the specified connection to execute for (List each: sqlUnitPartitions) {result.add (getSQLExecuteGroup (connectionMode, connections.get (count++), dataSourceName, each, callback)) } return result;} private ShardingExecuteGroup getSQLExecuteGroup (final ConnectionMode connectionMode, final Connection connection, final String dataSourceName, final List sqlUnitGroup, final SQLExecutePrepareCallback callback) throws SQLException {List result = new LinkedList () / / traverse the sql unit for (SQLUnit each: sqlUnitGroup) {/ / callback, and create the statement execution unit result.add (callback.createStatementExecuteUnit (connection, new RouteUnit (dataSourceName, each), connectionMode);} / / encapsulated into a sharding execution group return new ShardingExecuteGroup (result);}} execution phase

1. Execute query sql

Public final class PreparedStatementExecutor extends AbstractStatementExecutor {... / * Execute query. * * @ return result set list * @ throws SQLException SQL exception * / public List executeQuery () throws SQLException {/ / get whether the current outlier final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown () / / create a callback instance / / execute the execute method of SQLExecuteCallback SQLExecuteCallback executeCallback = new SQLExecuteCallback (getDatabaseType (), isExceptionThrown) {@ Override protected QueryResult executeSQL (final RouteUnit routeUnit, final Statement statement, final ConnectionMode connectionMode) throws SQLException {return getQueryResult (statement, connectionMode);}; return executeCallback (executeCallback);}. Protected final List executeCallback (final SQLExecuteCallback executeCallback) throws SQLException {List result = sqlExecuteTemplate.executeGroup ((Collection) executeGroups, executeCallback); / / refresh sharded metadata after execution, such as creating and modifying tables etc. RefreshShardingMetaDataIfNeeded (connection.getShardingContext (), sqlStatement); return result;}.

two。 Execute through thread pool grouping and call back to callback

@ RequiredArgsConstructorpublic abstract class SQLExecuteCallback implements ShardingGroupExecuteCallback {/ / Database type private final DatabaseType databaseType; / / whether private final boolean isExceptionThrown; @ Override public final Collection execute (final Collection statementExecuteUnits, final boolean isTrunkThread, final Map shardingExecuteDataMap) throws SQLException {Collection result = new LinkedList () / / traverse the statement execution unit for (StatementExecuteUnit each: statementExecuteUnits) {/ / add the returned result T result.add (execute0 (each, isTrunkThread, shardingExecuteDataMap));} return result;} private T execute0 (final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread, final Map shardingExecuteDataMap) throws SQLException {/ / sets whether the current thread is abnormal ExecutorExceptionHandler.setExceptionThrown (isExceptionThrown) / / get data source metadata according to url DataSourceMetaData dataSourceMetaData = databaseType.getDataSourceMetaData (statementExecuteUnit.getStatement (). GetConnection (). GetMetaData (). GetURL ()); / / sql execution hook SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook (); try {sqlExecutionHook.start (statementExecuteUnit.getRouteUnit (), dataSourceMetaData, isTrunkThread, shardingExecuteDataMap) / execute sql T result = executeSQL (statementExecuteUnit.getRouteUnit (), statementExecuteUnit.getStatement (), statementExecuteUnit.getConnectionMode ()); sqlExecutionHook.finishSuccess (); return result;} catch (final SQLException ex) {sqlExecutionHook.finishFailure (ex); ExecutorExceptionHandler.handleException (ex); return null }} protected abstract T executeSQL (RouteUnit routeUnit, Statement statement, ConnectionMode connectionMode) throws SQLException;}

3. Execute executeSQL, call executeSQL in the callback of step 3, and encapsulate ResultSet

Public final class PreparedStatementExecutor extends AbstractStatementExecutor {...... Private QueryResult getQueryResult (final Statement statement, final ConnectionMode connectionMode) throws SQLException {PreparedStatement preparedStatement = (PreparedStatement) statement; ResultSet resultSet = preparedStatement.executeQuery (); ShardingRule shardingRule = getConnection (). GetShardingContext (). GetShardingRule (); / / cache resultSet getResultSets (). Add (resultSet); / / determine ConnectionMode / / if it is MEMORY_STRICTLY, use streaming StreamQueryResult; or use memory MemoryQueryResult return ConnectionMode.MEMORY_STRICTLY = = connectionMode? New StreamQueryResult (resultSet, shardingRule): new MemoryQueryResult (resultSet, shardingRule);}. Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report