In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces the relevant knowledge of fescar distributed transaction implementation principle case analysis, the content is detailed and easy to understand, the operation is simple and fast, and it has a certain reference value. I believe you will gain something after reading this fescar distributed transaction implementation example analysis article. Let's take a look at it.
Project description
The code described in this blog is the 0.1.2-SNAPSHOT version of fescar, and its project structure and module implementation may change greatly according to the later iteration plan of fescar.
Fescar's TXC model
The picture above is a schematic diagram of the TXC model made by fescar officially. I have to say that the drawings of the big factories are really good. Combined with the schematic diagram, we can see the full picture of the implementation of TXC. The implementation of TXC is accomplished by three components. That is, the three dark yellow parts of the image above, and their functions are as follows:
TM: global transaction manager, opened on the server side marked to enable fescar distributed transactions, and sent global transactions to the TC transaction controller for management
TC: transaction control center that controls the commit or rollback of global transactions. This component needs to be deployed and maintained independently. At present, only the stand-alone version is supported, and the cluster version is planned for subsequent iterations.
RM: resource manager, mainly responsible for reporting branch transactions and managing local transactions
A brief description of its implementation process: the service initiator initiates a global transaction and registers with TC. When invoking the collaborative service, the transaction branch transaction of the collaborative service will first complete the transaction commit or rollback of the first phase, generate the undo_log log of the transaction rollback, register the current collaborative service to the TC and report its transaction status, and merge into the global transaction of the same business. At this point, if there is no problem to continue the next collaborative service call, any branch transaction rollback of the collaborative service will notify TC,TC that all branch transactions that have completed one-phase commit are rolled back when the global transaction is notified. If all branch transactions are normal, when you finally go back to the initiator of the global transaction, you will also be notified that TC,TC deletes the rollback log when notifying all branches contained in the global transaction. In order to solve the problem of write isolation and degree isolation, the global lock managed by TC will be involved in this process.
The goal of this blog is to delve into the code details and explore how the basic ideas are implemented. First of all, we will briefly describe the role of each module from the structure of the project, and then explore the implementation process of the whole distributed transaction with the official examples example.
Analysis of project structure
After the project is pulled down and opened with IDE, the directory structure is as follows. Let's take a look at the implementation of each module first.
Common: common components, providing common helper classes, static variables, extension mechanism class loaders, and defining global exceptions, etc.
Config: configuration load parsing module, which provides the basic interface for configuration. At present, only file configuration is implemented, followed by the implementation of configuration centers such as nacos.
Core: the core module mainly encapsulates the contents related to RPC for TM, RM and TC communication
Distrbution: this module is currently empty. Distrbution is a high-performance queue, which should be applied to transaction log landing later.
Dubbo: the dubbo module is mainly adapted to the dubbo communication framework, using dubbo's filter mechanism to branch the information of traditional global transactions.
Examples: a simple demonstration example module, which will be explored later.
Rm-datasource: resource management module, compared to the core module, I think it is more reasonable to name this module core. Proxies some classes of JDBC to parse sql to generate rollback logs and coordinate the management of local transactions
Server: the TC component, which mainly coordinates and manages global transactions, commits or rolls global transactions, and manages and maintains global locks.
Spring: the module integrated with spring, mainly aop logic, is the entrance to the entire distributed transaction and a breakthrough in the study of fescar.
Tm: global transaction management module, which manages the boundary of the global transaction. The rollback point of the global transaction is controlled by this module.
Take a look at the first step of the effect through the example of the [examples] module
Start the TC module first, that is, the [Server] module. Just start the main method directly. The default service port is 8091.
The second step,
Go back to the examples module and configure the configuration files of the four services: order, business, account and warehouse, mainly mysql data source and zookeeper connection address. Note here that whether the zk registry of the default dubbo is dependent or not, and the exception that cannot be found in class is thrown back when startup, and the following dependencies need to be added:
Step 3 of com.101tec zkclient 0.10 slf4j-log4j12 org.slf4j,
Make a breakpoint at the place where the simulation throws an exception in BusinessServiceImpl, start the four services OrderServiceImpl, StorageServiceImpl, AccountServiceImpl and BusinessServiceImpl in turn, and check the database account_tbl table after waiting for the breakpoint. Then release the breakpoint, the exception trigger simulated by the BusinessServiceImpl module, the global transaction is rolled back, and the amount of the account_tbl table is rolled back to 999 yuan.
As above, we have experienced the control of fescar transactions, let's take a look at how it is controlled.
Fescar transaction process analysis first analyzes the configuration file
This is an iron rule, any technology or framework to integrate, the configuration file must be a breakthrough. We know from the above example that a global transaction scanner instance is configured in the configuration file of the instance module, such as:
This instance scans all instances when the project starts. For more information, please see [spring] module. And weave the methods annotated with @ GlobalTransactional annotations into the invoke method logic of GlobalTransactionalInterceptor. When the application is started at the same time, the instances of TM (TmRpcClient) and RM (RmRpcClient) are initialized. By this time, the service has hooked up with the TC transaction control center. Looking down, it involves the transaction template class TransactionalTemplate of the TM module.
[TM] module starts a global transaction
The opening, commit and rollback of a global transaction are encapsulated in TransactionlTemplate, as shown in the following code:
Public Object execute (TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {/ / 1.get or create a transaction GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate (); / / 2. Begin transaction try {tx.begin (business.timeout (), business.name ());} catch (TransactionException txe) {throw new TransactionalExecutor.ExecutionException (tx, txe, TransactionalExecutor.Code.BeginFailure);} Object rs = null Try {/ / Do Your Business rs = business.execute ();} catch (Throwable ex) {/ / 3.any business exception, rollback. Try {tx.rollback (); / / 3.1Successfully rolled back throw new TransactionalExecutor.ExecutionException (tx, TransactionalExecutor.Code.RollbackDone, ex);} catch (TransactionException txe) {/ / 3.2Failed to rollback throw new TransactionalExecutor.ExecutionException (tx, txe, TransactionalExecutor.Code.RollbackFailure, ex);} / / 4.everything is fine, commit. Try {tx.commit ();} catch (TransactionException txe) {/ / 4.1 Failed to commit throw new TransactionalExecutor.ExecutionException (tx, txe, TransactionalExecutor.Code.CommitFailure);} return rs;}
The more detailed implementation is divided into two Class implementations in the [TM] module, as follows:
DefaultGlobalTransaction: specific opening, commit and rollback actions of global transactions
DefaultTransactionManager: responsible for sending instructions to the TC control center using TmRpcClient, such as opening global transaction (GlobalBeginRequest), commit (GlobalCommitRequest), rollback (GlobalRollbackRequest), query status (GlobalStatusRequest), etc.
These are the core content points of the TM module. After the TM module completes the global transaction, it starts to look at how the global transaction iD,xid is passed and how the RM component intervenes.
[DUBBO] delivery of global transaction XID
The first is the delivery of xid, which has been implemented in the micro-service architecture implemented by the dubbo framework, and others such as spring cloud and motan are also easy to implement. Through the filter mechanism of the general RPC communication framework, the xid is transferred from the initiating node of the global transaction to the service slave node, and then bound to the current thread online text environment when the branch transaction executes sql. For the implementation of fescar, please see [dubbo] module as follows:
@ Activate (group = {Constants.PROVIDER, Constants.CONSUMER}, order = 100) public class TransactionPropagationFilter implements Filter {private static final Logger LOGGER = LoggerFactory.getLogger (TransactionPropagationFilter.class); @ Override public Result invoke (Invoker invoker, Invocation invocation) throws RpcException {String xid = RootContext.getXID (); String rpcXid = RpcContext.getContext () .getAttachment (RootContext.KEY_XID) If (LOGGER.isDebugEnabled ()) {LOGGER.debug ("xid in RootContext [" + xid + "] xid in RpcContext [" + rpcXid + "]");} boolean bind = false; if (xid! = null) {RpcContext.getContext () .setAttachment (RootContext.KEY_XID, xid) } else {if (rpcXid! = null) {RootContext.bind (rpcXid); bind = true; if (LOGGER.isDebugEnabled ()) {LOGGER.debug ("bind [" + rpcXid + "] to RootContext") } try {return invoker.invoke (invocation);} finally {if (bind) {String unbindXid = RootContext.unbind (); if (LOGGER.isDebugEnabled ()) {LOGGER.debug ("unbind [" + unbindXid + "] from RootContext") } if (! rpcXid.equalsIgnoreCase (unbindXid)) {LOGGER.warn ("xid in change during RPC from" + rpcXid + "to" + unbindXid); if (unbindXid! = null) {RootContext.bind (unbindXid); LOGGER.warn ("bind [" + unbindXid + "] back to RootContext") }}}
When the above code rpcXid is not empty, it is added to the ContextCore of RootContext. Let's go a little deeper here. ContextCore is an extensible implementation interface, currently the default implementation is ThreadLocalContextCore, based on ThreadLocal to save and maintain the current xid. Here fescar provides an extensible mechanism to load the service class that needs to be extended through a custom class loader EnhancedServiceLoader in the [common] module, so that you only need to annotate the extension class with @ LoadLevel. The purpose of extending the implementation can be achieved by marking the order attribute to declare a high priority.
The intervention of local resource management in [RM] module
Fescar implements proxy classes through the proxy mechanism for local transaction-related interfaces, such as data sources (DataSourceProxy), ConnectionProxy, StatementProxy and so on. This can also be seen in the configuration file, that is, if we want to use fescar distributed transactions, we must configure the proxy data source provided by fescar. Such as:
After configuring the proxy data source, we can control all the operations of the database locally from DataSourceProxy. From the xid pass above, you already know that xid is stored in RootContext, so take a look at the following code, which is very clear:
First, take a look at a piece of code from StatementProxy.
Looking at the code in ExecuteTemplate
Similar to the transaction management template class TransactionlTemplate in the [TM] module, the critical logical agent is also encapsulated in the ExecuteTemplate template class. Due to the rewrite of Statement and the StatementProxy implementation, the execute logic of ExecuteTemplate is called when the executeUpdate method of the original JDBC is executed. Before the sql is actually executed, it is determined whether the xid is included in the current context of the RootCOntext, that is, whether it is currently a globally distributed transaction. If not, use local transactions directly, and if so, RM will add some logic related to distributed transactions. Here, according to the different types of sql, fescar encapsulates five different executors, namely UpdateExecutor, DeleteExecutor, InsertExecutor, SelectForUpdateExecutor and PlainExecutor. The structure is shown below:
PLAINEXECUTOR:
Native JDBC interface implementation, without any processing, for ordinary select queries in global transactions to use
UPDATEEXECUTOR 、 DELETEEXECUTOR 、 INSERTEXECUTOR:
Three implementations of DML add, delete and modify executors, mainly parsing sql statements before and after sql execution, and implementing the following two abstract interface methods:
Protected abstract TableRecords beforeImage () throws SQLException;protected abstract TableRecords afterImage (TableRecords beforeImage) throws SQLException
In this process, the undo_log log that provides the rollback operation is generated by parsing the sql, which is currently saved in the msyql and shares the same transaction as the business sql operation. The table is structured as follows:
The undo_log details saved by rollback_info are of type longblob and are structured as follows:
{"branchId": 3958194, "sqlUndoLogs": [{"afterImage": {"rows": [{"fields": [{"keyType": "PrimaryKey" "name": "ID", "type": 4, "value": 10}, {"keyType": "NULL" "name": "COUNT", "type": 4, "value": 98}]] "tableName": "storage_tbl"}, "beforeImage": {"rows": [{"fields": [{"keyType": "PrimaryKey" "name": "ID", "type": 4, "value": 10}, {"keyType": "NULL" "name": "COUNT", "type": 4, "value": 100}]] "tableName": "storage_tbl"}, "sqlType": "UPDATE", "tableName": "storage_tbl"}], "xid": "192.168.7.77 virtual 8091 virtual 3958193"}
What is posted here is a update operation. Undo_log records are very detailed. It associates branchid through the global transaction xid, records the table name of the data operation, the operation field name, and the number of records before and after sql execution, such as this record, table name = id=10,count=98 before storage_tbl,sql execution and after ID=10,count=100,sql execution. If the entire global transaction fails, it can be generated when a rollback is needed:
Update storage_tbl set count = 100where id = 10
Such a rollback sql statement is executed.
SELECTFORUPDATEEXECUTOR:
Fescar's AT mode supports reading uncommitted isolation levels by default over local transactions, but through the SelectForUpdateExecutor executor, it is possible to read committed isolation levels. Codes such as:
@ Overridepublic Object doExecute (Object... Args) throws Throwable {SQLSelectRecognizer recognizer = (SQLSelectRecognizer) sqlRecognizer; Connection conn = statementProxy.getConnection (); ResultSet rs = null; Savepoint sp = null; LockRetryController lockRetryController = new LockRetryController (); boolean originalAutoCommit = conn.getAutoCommit (); StringBuffer selectSQLAppender = new StringBuffer ("SELECT"); selectSQLAppender.append (getTableMeta (). GetPkName ()); selectSQLAppender.append ("FROM" + getTableMeta (). GetTableName ()); String whereCondition = null; ArrayList paramAppender = new ArrayList () If (statementProxy instanceof ParametersHolder) {whereCondition = recognizer.getWhereCondition ((ParametersHolder) statementProxy, paramAppender);} else {whereCondition = recognizer.getWhereCondition ();} if (! StringUtils.isEmpty (whereCondition)) {selectSQLAppender.append ("WHERE" + whereCondition);} selectSQLAppender.append ("FOR UPDATE"); String selectPKSQL = selectSQLAppender.toString (); try {if (originalAutoCommit) {conn.setAutoCommit (false) } sp = conn.setSavepoint (); rs = statementCallback.execute (statementProxy.getTargetStatement (), args); while (true) {/ / Try to get global lock of those rows selected Statement stPK = null; PreparedStatement pstPK = null; ResultSet rsPK = null Try {if (paramAppender.isEmpty ()) {stPK = statementProxy.getConnection (). CreateStatement (); rsPK = stPK.executeQuery (selectPKSQL);} else {pstPK = statementProxy.getConnection (). PrepareStatement (selectPKSQL); for (int I = 0; I < paramAppender.size () ) {pstPK.setObject (I + 1, paramAppender.get (I));} rsPK = pstPK.executeQuery ();} TableRecords selectPKRows = TableRecords.buildRecords (getTableMeta (), rsPK); statementProxy.getConnectionProxy () .checkLock (selectPKRows); break } catch (LockConflictException lce) {conn.rollback (sp); lockRetryController.sleep (lce);} finally {if (rsPK! = null) {rsPK.close ();} if (stPK! = null) {stPK.close () } if (pstPK! = null) {pstPK.close ();}} finally {if (sp! = null) {conn.releaseSavepoint (sp);} if (originalAutoCommit) {conn.setAutoCommit (true) }} return rs;}
The key codes are as follows:
TableRecords selectPKRows = TableRecords.buildRecords (getTableMeta (), rsPK); statementProxy.getConnectionProxy () .checkLock (selectPKRows)
Get the lockKeys through the operation record of the selectPKRows table, then go to the TC controller to query whether it is globally locked, and if so, try again until the lock is released and the query result is returned.
Registration and escalation of branch transactions
Before the local transaction is committed, fescar registers the information related to the branch transaction, as shown in the commit code of the ConnectionProxy class:
Overridepublic void commit () throws SQLException {if (context.inGlobalTransaction ()) {try {register ();} catch (TransactionException e) {recognizeLockKeyConflictException (e);} try {if (context.hasUndoLog ()) {UndoLogManager.flushUndoLogs (this);} targetConnection.commit () } catch (Throwable ex) {report (false); if (ex instanceof SQLException) {throw (SQLException) ex;} else {throw new SQLException (ex);}} report (true); context.reset ();} else {targetConnection.commit ();}}
From this code, we can see that the first thing is to determine whether it is a global transaction, if not, commit it directly, if so, register the branch transaction with the TC controller first. In order to write isolation, the acquisition of the global lock will be involved on the TC side. The undo_log log for the rollback operation is then saved, the local transaction is actually committed, and the transaction status is reported to the TC controller. At this point, the local transaction for phase one is complete.
[SERVER] the module coordinates the overall situation
With regard to the server module, we can focus on the class DefaultCoordinator, which is the default implementation of the AbstractTCInboundHandler control processor. It mainly implements global transaction opening, commit, rollback, status query, branch transaction registration, reporting, lock checking and other interfaces, such as:
Back to the initial TransactionlTemplate, if the entire distributed transaction fails and needs to be rolled back, first, TM initiates a rollback to TC, and then TC receives it, and then the parsing request is routed to the doGlobalRollback method of the default controller class. Finally, the code executed on the TC controller is as follows:
Overridepublic void doGlobalRollback (GlobalSession globalSession, boolean retrying) throws TransactionException {for (BranchSession branchSession: globalSession.getReverseSortedBranches ()) {BranchStatus currentBranchStatus = branchSession.getStatus (); if (currentBranchStatus = = BranchStatus.PhaseOne_Failed) {continue;} try {BranchStatus branchStatus = resourceManagerInbound.branchRollback (XID.generateXID (branchSession.getTransactionId ()), branchSession.getBranchId (), branchSession.getResourceId (), branchSession.getApplicationData ()) Switch (branchStatus) {case PhaseTwo_Rollbacked: globalSession.removeBranch (branchSession); LOGGER.error ("Successfully rolled back branch" + branchSession); continue; case PhaseTwo_RollbackFailed_Unretryable: GlobalStatus currentStatus = globalSession.getStatus () If (currentStatus.name (). StartsWith ("Timeout")) {globalSession.changeStatus (GlobalStatus.TimeoutRollbackFailed);} else {globalSession.changeStatus (GlobalStatus.RollbackFailed);} globalSession.end () LOGGER.error ("Failed to rollback global [" + globalSession.getTransactionId () + "] since branch [" + branchSession.getBranchId () + "] rollback failed"); return; default: LOGGER.info ("Failed to rollback branch" + branchSession); if (! retrying) {queueToRetryRollback (globalSession) } return;}} catch (Exception ex) {LOGGER.info ("Exception rollbacking branch" + branchSession, ex); if (! retrying) {queueToRetryRollback (globalSession); if (ex instanceof TransactionException) {throw (TransactionException) ex } else {throw new TransactionException (ex);}} GlobalStatus currentStatus = globalSession.getStatus (); if (currentStatus.name (). StartsWith ("Timeout")) {globalSession.changeStatus (GlobalStatus.TimeoutRollbacked);} else {globalSession.changeStatus (GlobalStatus.Rollbacked);} globalSession.end ();}
As you can see in the code above, each branch transaction is iterated from the global transaction session during the rollback, and each branch transaction is then notified to roll back. After receiving the request, the branch service is first routed to the doBranchRollback method in RMHandlerAT, and then the branchRollback method in RM is called as follows:
@ Overridepublic BranchStatus branchRollback (String xid, long branchId, String resourceId, String applicationData) throws TransactionException {DataSourceProxy dataSourceProxy = get (resourceId); if (dataSourceProxy = = null) {throw new ShouldNeverHappenException ();} try {UndoLogManager.undo (dataSourceProxy, xid, branchId);} catch (TransactionException te) {if (te.getCode () = = TransactionExceptionCode.BranchRollbackFailed_Unretriable) {return BranchStatus.PhaseTwo_RollbackFailed_Unretryable } else {return BranchStatus.PhaseTwo_RollbackFailed_Retryable;}} return BranchStatus.PhaseTwo_Rollbacked;}
The last part of the RM branch transaction side executes the undo method of UndoLogManager, which queries the rollback log from the database through xid and branchid, and completes the data rollback operation. The whole process is completed synchronously. If the global transaction is successful, TC will have a similar coordination process, but asynchronously clear the undo_log associated with the global transaction. At this point, the two phases of commit or rollback are completed, and the control of the complete global transaction is completed.
This is the end of the article on "example Analysis of the principle of fescar distributed transaction implementation". Thank you for reading! I believe you all have a certain understanding of the knowledge of "fescar distributed transaction implementation principle example Analysis". If you want to learn more knowledge, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.