In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article mainly explains the "Saga mode source method tutorial", interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Next let the editor to take you to learn the "Saga mode source method tutorial" it!
State machine definition
Taking a typical e-commerce shopping process as an example, we define three services, order service (OrderServer), account service (AccountService) and inventory service (StorageService). Here we treat the order service as an aggregation service, that is, TM.
When an external order is placed, the order service first creates an order, then invokes the account service to deduct the amount, and finally invokes the inventory service to deduct inventory. The process is shown in the following figure:
The saga pattern of seata is based on the state machine, which needs a JSON file to control the state. The JSON file is defined as follows:
{"Name": "buyGoodsOnline", "Comment": "buy a goods on line, add order, deduct account, deduct storage", "StartState": "SaveOrder", "Version": "0.0.1", "States": {"SaveOrder": {"Type": "ServiceTask", "ServiceName": "orderSave" "ServiceMethod": "saveOrder", "CompensateState": "DeleteOrder", "Next": "ChoiceAccountState", "Input": ["$. [businessKey]", "$. [order]"] "Output": {"SaveOrderResult": "$. # root"}, "Status": {"# root = = true": "SU", "# root = = false": "FA" "$Exception {java.lang.Throwable}": "UN"}, "ChoiceAccountState": {"Type": "Choice", "Choices": [{"Expression": "[SaveOrderResult] = = true" "Next": "ReduceAccount"}], "Default": "Fail"}, "ReduceAccount": {"Type": "ServiceTask", "ServiceName": "accountService", "ServiceMethod": "decrease" "CompensateState": "CompensateReduceAccount", "Next": "ChoiceStorageState", "Input": ["$. [businessKey]", "$. [userId]", "$. [money]" {"throwException": "$. [mockReduceAccountFail]"}], "Output": {"ReduceAccountResult": "$. # root"}, "Status": {"# root = = true": "SU" "# root = = false": "FA", "$Exception {java.lang.Throwable}": "UN"}, "Catch": [{"Exceptions": ["java.lang.Throwable"] "Next": "CompensationTrigger"}]}, "ChoiceStorageState": {"Type": "Choice", "Choices": [{"Expression": "[ReduceAccountResult] = = true" "Next": "ReduceStorage"}], "Default": "Fail"}, "ReduceStorage": {"Type": "ServiceTask", "ServiceName": "storageService", "ServiceMethod": "decrease" "CompensateState": "CompensateReduceStorage", "Input": ["$. [businessKey]", "$. [productId]", "$. [count]" {"throwException": "$. [mockReduceStorageFail]"}], "Output": {"ReduceStorageResult": "$. # root"}, "Status": {"# root = = true": "SU" "# root = = false": "FA", "$Exception {java.lang.Throwable}": "UN"}, "Catch": [{"Exceptions": ["java.lang.Throwable"] "Next": "CompensationTrigger"}], "Next": "Succeed"}, "DeleteOrder": {"Type": "ServiceTask", "ServiceName": "orderSave", "ServiceMethod": "deleteOrder" "Input": ["$. [businessKey]", "$. [order]"]}, "CompensateReduceAccount": {"Type": "ServiceTask", "ServiceName": "accountService", "ServiceMethod": "compensateDecrease" "Input": ["$. [businessKey]", "$. [userId]", "$. [money]"]}, "CompensateReduceStorage": {"Type": "ServiceTask", "ServiceName": "storageService" "ServiceMethod": "compensateDecrease", "Input": ["$. [businessKey]", "$. [productId]", "$. [count]"]}, "CompensationTrigger": {"Type": "CompensationTrigger" "Next": "Fail", "Succeed": {"Type": "Succeed"}, "Fail": {"Type": "Fail", "ErrorCode": "PURCHASE_FAILED", "Message": "purchase failed"}
The state machine runs in TM, which is the order service we defined above. When the order service creates an order, you need to open a global transaction, and then you need to start the state machine. The code is as follows:
StateMachineEngine stateMachineEngine = (StateMachineEngine) ApplicationContextUtils.getApplicationContext (). GetBean ("stateMachineEngine"); Map startParams = new HashMap (3); String businessKey = String.valueOf (System.currentTimeMillis ()); startParams.put ("businessKey", businessKey); startParams.put ("order", order); startParams.put ("mockReduceAccountFail", "true"); startParams.put ("userId", order.getUserId ()); startParams.put ("money", order.getPayAmount ()); startParams.put ("productId", order.getProductId ()) StartParams.put ("count", order.getCount ()); / / sync test StateMachineInstance inst = stateMachineEngine.startWithBusinessKey ("buyGoodsOnline", null, businessKey, startParams)
As you can see, the buyGoodsOnline defined in the above code is the property value of name in the JSON file.
State machine initialization
So where is the stateMachineEngine bean defined in the order creation code above? A class StateMachineConfiguration is defined in the demo of the order service, with the following code:
Public class StateMachineConfiguration {@ Bean public ThreadPoolExecutorFactoryBean threadExecutor () {ThreadPoolExecutorFactoryBean threadExecutor = new ThreadPoolExecutorFactoryBean (); threadExecutor.setThreadNamePrefix ("SAGA_ASYNC_EXE_"); threadExecutor.setCorePoolSize (1); threadExecutor.setMaxPoolSize (20); return threadExecutor;} @ Bean public DbStateMachineConfig dbStateMachineConfig (ThreadPoolExecutorFactoryBean threadExecutor, DataSource hikariDataSource) throws IOException {DbStateMachineConfig dbStateMachineConfig = new DbStateMachineConfig (); dbStateMachineConfig.setDataSource (hikariDataSource) DbStateMachineConfig.setThreadPoolExecutor ((ThreadPoolExecutor) threadExecutor.getObject ()) / * the path to the json file is configured here. During initialization, TM parses the json file into a StateMachineImpl class. If the database does not save this state machine, it is stored in the database seata_state_machine_def table. * if the database has a record, take the latest record and register to StateMachineRepositoryImpl. * there are two registered Map, one is stateMachineMapByNameAndTenant. Key format is (stateMachineName + "_" + tenantId), * one is stateMachineMapById,key is stateMachine.getId () * see StateMachineRepositoryImpl class registryStateMachine method * this registered trigger method is in DefaultStateMachineConfig initialization method init (), this class is the parent class of DbStateMachineConfig * / dbStateMachineConfig.setResources (new PathMatchingResourcePatternResolver (). GetResources ("classpath*:statelang/*.json")) / / json file dbStateMachineConfig.setEnableAsync (true); dbStateMachineConfig.setApplicationId ("order-server"); dbStateMachineConfig.setTxServiceGroup ("my_test_tx_group"); return dbStateMachineConfig;} @ Bean public ProcessCtrlStateMachineEngine stateMachineEngine (DbStateMachineConfig dbStateMachineConfig) {ProcessCtrlStateMachineEngine stateMachineEngine = new ProcessCtrlStateMachineEngine (); stateMachineEngine.setStateMachineConfig (dbStateMachineConfig); return stateMachineEngine } @ Bean public StateMachineEngineHolder stateMachineEngineHolder (ProcessCtrlStateMachineEngine stateMachineEngine) {StateMachineEngineHolder stateMachineEngineHolder = new StateMachineEngineHolder (); stateMachineEngineHolder.setStateMachineEngine (stateMachineEngine); return stateMachineEngineHolder;}}
As you can see, we configured the json file of the state machine in DbStateMachineConfig, as well as applicationId and txServiceGroup. When DbStateMachineConfig is initialized, the init method of the subclass DefaultStateMachineConfig parses the json file into a state machine and registers it.
During the registration process, a record is inserted into the seata_state_machine_def table. The content field in the table stores the contents of our JOSON file. Other field value data is shown below:
Attachment: according to the previous JSON file, the StateMachineImpl tracked by our debug is as follows:
Id = null tenantId = null appName = "SEATA" name = "buyGoodsOnline" comment = "buy a goods on line, add order, deduct account Deduct storage "version =" 0.0.1 "startState =" SaveOrder "status = {StateMachine$Status@9135}" AC "recoverStrategy = true type =" STATE_LANG "content = null gmtCreate = null states = {LinkedHashMap@9137} size = 11" SaveOrder "- > {ServiceTaskStateImpl@9153}" ChoiceAccountState "- > {ChoiceStateImpl@9155}" ReduceAccount "- > {ServiceTaskStateImpl@9157}" ChoiceStorageState "- > {ChoiceStateImpl@9159}" ReduceStorage " -> {ServiceTaskStateImpl@9161} "DeleteOrder"-> {ServiceTaskStateImpl@9163} "CompensateReduceAccount"-> {ServiceTaskStateImpl@9165} "CompensateReduceStorage"-> {ServiceTaskStateImpl@9167} "CompensationTrigger"-> {CompensationTriggerStateImpl@9169} "Succeed"-> {SucceedEndStateImpl@9171} "Fail"-> {FailEndStateImpl@9173}
Start the state machine
In the code for creating the order in the first section, the startWithBusinessKey method starts the entire transaction, and this method also has an asynchronous mode startWithBusinessKeyAsync. Here we only analyze the synchronous mode, and the source code is as follows:
Public StateMachineInstance startWithBusinessKey (String stateMachineName, String tenantId, String businessKey, Map startParams) throws EngineExecutionException {return startInternal (stateMachineName, tenantId, businessKey, startParams, false, null) } private StateMachineInstance startInternal (String stateMachineName, String tenantId, String businessKey, Map startParams, boolean async, AsyncCallback callback) throws EngineExecutionException {/ / omit part of the source code / / create a state machine instance / / default tenantId= "000001" StateMachineInstance instance = createMachineInstance (stateMachineName, tenantId, businessKey, startParams) / * ProcessType.STATE_LANG this enumeration has only one element * OPERATION_NAME_START = "start" * callback is null * getStateMachineConfig () returns DbStateMachineConfig * / ProcessContextBuilder contextBuilder = ProcessContextBuilder.create () .withProcessType (ProcessType.STATE_LANG) .withOperationName (DomainConstants.OPERATION_NAME_START) .withAsyncCallback (callback) .withInstruction (new StateInstruction (stateMachineName) TenantId) .withStateMachineInstance (instance) .withStateMachineConfig (getStateMachineConfig ()) .withStateMachineEngine (this) Map contextVariables; if (startParams! = null) {contextVariables = new ConcurrentHashMap (startParams.size ()); nullSafeCopy (startParams, contextVariables);} else {contextVariables = new ConcurrentHashMap ();} instance.setContext (contextVariables); / / assign startup parameters to context of state machine instance / / add parameters contextBuilder.withStateMachineContextVariables (contextVariables) to variables of ProcessContextImpl; contextBuilder.withIsAsyncExecution (async) / / the builder defined above creates a ProcessContextImpl ProcessContext processContext = contextBuilder.build (); / / this condition is true if (instance.getStateMachine (). IsPersist () & & stateMachineConfig.getStateLogStore ()! = null) {/ / record the state machine start state stateMachineConfig.getStateLogStore () .recordStateMachineStarted (instance, processContext) } if (StringUtils.isEmpty (instance.getId () {instance.setId (stateMachineConfig.getSeqGenerator (). Generate (DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));} if (async) {stateMachineConfig.getAsyncProcessCtrlEventPublisher () .publish (processContext) } else {/ / send messages to EventBus, where the consumer is ProcessCtrlEventConsumer, set stateMachineConfig.getProcessCtrlEventPublisher (). Publish (processContext);} return instance;} when DefaultStateMachineConfig initializes
In the above code, we can see that when the startup status is remembered, we mainly do two things, one is to record the state of the start of the state machine, and the other is to send a message to EventBus. Let's take a look at these two processes in detail.
Open a global transaction
In the above code analysis, there is a code that records the starting state of the state machine, as follows:
StateMachineConfig.getStateLogStore () .recordStateMachineStarted (instance processContext)
The recordStateMachineStarted method of class DbAndReportTcStateLogStore is called here. Let's take a look. The code is as follows:
Public void recordStateMachineStarted (StateMachineInstance machineInstance, ProcessContext context) {if (machineInstance! = null) {/ / if parentId is not null, machineInstance is a SubStateMachine, do not start a new global transaction, / / use parent transaction instead. String parentId = machineInstance.getParentId (); if (StringUtils.hasLength (parentId)) {if (StringUtils.isEmpty (machineInstance.getId () {machineInstance.setId (parentId) }} else {/ / take this branch, because there is no configuration of sub-state machine / * the beginTransaction here starts the global transaction, * here you call TC to start the global transaction * / beginTransaction (machineInstance, context) } if (StringUtils.isEmpty (machineInstance.getId ()) & & seqGenerator! = null) {machineInstance.setId (seqGenerator.generate (DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));} / save to db / / dbType = "MySQL" machineInstance.setSerializedStartParams (paramsSerializer.serialize (machineInstance.getStartParams () ExecuteUpdate (stateLogStoreSqls.getRecordStateMachineStartedSql (dbType), STATE_MACHINE_INSTANCE_TO_STATEMENT_FOR_INSERT, machineInstance);}}
The above executeUpdate method can be seen in the subclass AbstractStore,debug under the executeUpdate method. The sql executed here is as follows:
INSERT INTO seata_state_machine_inst (id, machine_id, tenant_id, parent_id, gmt_started, business_key, start_params, is_running, status, gmt_updated) VALUES ('192.168.59.146) VALUES (' 192.168.59.146) 8091 VALUES 6585349147990016, '06a098cab53241ca7ed09433342e9f07seven,' 000001, null, '2020-10-31 17purl 1824.773,' 1604135904773,'{"@ type": "java.util.HashMap", "money": 50.00, "productId": 1L "_ business_key_": "1604135904773", "businessKey": "1604135904773", "count": 1, "mockReduceAccountFail": "true", "userId": 1L, "order": {"@ type": "io.seata.sample.entity.Order", "count": 1, "payAmount": 50, "productId": 1, "userId": 1}}', 1, 'RU',' 2020-10-31 1718 purve24.773')
As you can see, this global transaction is recorded in the table seata_state_machine_inst, which records the parameters that we start the state machine, and the state of the status record is "RU", that is, RUNNING.
Branch transaction processing
As we mentioned in the previous section, after starting the state machine, we sent a message to EventBus. The consumer of this message is ProcessCtrlEventConsumer. Let's take a look at the code for this class:
Public class ProcessCtrlEventConsumer implements EventConsumer {private ProcessController processController; @ Override public void process (ProcessContext event) throws FrameworkException {/ / where processController is ProcessControllerImpl processController.process (event);} @ Override public boolean accept (Class clazz) {return ProcessContext.class.isAssignableFrom (clazz);} public void setProcessController (ProcessController processController) {this.processController = processController;}}
The process method of the ProcessControllerImpl class has two processing logic, process and route, with the following code:
Public void process (ProcessContext context) throws FrameworkException {try {/ / where businessProcessor is CustomizeBusinessProcessor businessProcessor.process (context); businessProcessor.route (context);} catch (FrameworkException fex) {throw fex;} catch (Exception ex) {LOGGER.error ("Unknown exception occurred, context = {}", context, ex); throw new FrameworkException (ex, "Unknown exception occurred", FrameworkErrorCode.UnknownAppError);}}
The processing logic here is a bit complicated. First, we have a UML class diagram. Following this diagram, we can clarify the calling logic of the code:
Let's first look at the process method in CustomizeBusinessProcessor:
Public void process (ProcessContext context) throws FrameworkException {/ * processType = {ProcessType@10310} "STATE_LANG" * code = "STATE_LANG" * message = "SEATA State Language" * name = "STATE_LANG" * ordinal = 0 * / ProcessType processType = matchProcessType (context) If (processType = = null) {if (LOGGER.isWarnEnabled ()) {LOGGER.warn ("Process type not found, context= {}", context);} throw new FrameworkException (FrameworkErrorCode.ProcessTypeNotFound);} ProcessHandler processor = processHandlers.get (processType.getCode ()); if (processor = = null) {LOGGER.error ("Cannot find process handler by type {}, context= {}", processType.getCode (), context) Throw new FrameworkException (FrameworkErrorCode.ProcessHandlerNotFound);} / / here is StateMachineProcessHandler processor.process (context);}
The code here is difficult to understand, so we will study it in four steps.
As a first step, let's take a look at the process method in the StateMachineProcessHandler class, which proxies the process method of ServiceTaskStateHandler. The code is as follows:
Public void process (ProcessContext context) throws FrameworkException {/ * instruction = {StateInstruction@11057} * stateName = null * stateMachineName = "buyGoodsOnline" * tenantId = "000001" * end = false * temporaryState = null * / StateInstruction instruction = context.getInstruction (StateInstruction.class); / / the state implementation class here is ServiceTaskStateImpl State state = instruction.getState (context); String stateType = state.getType () / / here the stateHandler implementation class is ServiceTaskStateHandler StateHandler stateHandler = stateHandlers.get (stateType); List interceptors = null; if (stateHandler instanceof InterceptableStateHandler) {/ / list has an element ServiceTaskHandlerInterceptor interceptors = ((InterceptableStateHandler) stateHandler) .getInterceptors ();} List executedInterceptors = null; Exception exception = null Try {if (interceptors! = null & & interceptors.size () > 0) {executedInterceptors = new ArrayList (interceptors.size ()); for (StateHandlerInterceptor interceptor: interceptors) {executedInterceptors.add (interceptor); interceptor.preProcess (context);}} stateHandler.process (context) } catch (Exception e) {exception = e; throw e;} finally {if (executedInterceptors! = null & & executedInterceptors.size () > 0) {for (int I = executedInterceptors.size ()-1; I > = 0; iMel -) {StateHandlerInterceptor interceptor = executedInterceptors.get (I); interceptor.postProcess (context, exception) }
From this method, we can see that the agent adds pre-and post-enhancements to stateHandler.process, the enhancement class is ServiceTaskHandlerInterceptor, and the pre-and post-enhancements call interceptor's preProcess and postProcess, respectively.
Second, let's take a look at enhanced logic. The preProcess and postProcess methods of ServiceTaskHandlerInterceptor, the code is as follows:
Public class ServiceTaskHandlerInterceptor implements StateHandlerInterceptor {/ / omit part of the code @ Override public void preProcess (ProcessContext context) throws EngineExecutionException {StateInstruction instruction = context.getInstruction (StateInstruction.class); StateMachineInstance stateMachineInstance = (StateMachineInstance) context.getVariable (DomainConstants.VAR_NAME_STATEMACHINE_INST); StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable (DomainConstants.VAR_NAME_STATEMACHINE_CONFIG) / / if timeout occurs, modify the state machine to FA if (EngineUtils.isTimeout (stateMachineInstance.getGmtUpdated (), stateMachineConfig.getTransOperationTimeout () {String message = "Saga Transaction [stateMachineInstanceId:" + stateMachineInstance.getId () + "] has timed out, stop execution now."; EngineUtils.failStateMachine (context, exception); throw exception } StateInstanceImpl stateInstance = new StateInstanceImpl (); Map contextVariables = (Map) context.getVariable (DomainConstants.VAR_NAME_STATEMACHINE_CONTEXT); ServiceTaskStateImpl state = (ServiceTaskStateImpl) instruction.getState (context); List serviceInputParams = null; Object isForCompensation = state.isForCompensation (); if (isForCompensation! = null & & (Boolean) isForCompensation) {CompensationHolder compensationHolder = CompensationHolder.getCurrent (context, true) StateInstance stateToBeCompensated = compensationHolder.getStatesNeedCompensation () .get (state.getName ()); if (stateToBeCompensated! = null) {stateToBeCompensated.setCompensationState (stateInstance); stateInstance.setStateIdCompensatedFor (stateToBeCompensated.getId ()) } else {LOGGER.error ("CompensationState [{}] has no state to compensate, maybe this is a bug.", state.getName ());} / / add compensation set CompensationHolder.getCurrent (context, true) .addForCompensationState (stateInstance.getName (), stateInstance) } / / omit part of the code stateInstance.setInputParams (serviceInputParams) If (stateMachineInstance.getStateMachine (). IsPersist () & & state.isPersist () & & stateMachineConfig.getStateLogStore ()! = null) {try {/ / record the status of a branch transaction RU to the database / * INSERT INTO seata_state_inst (id, machine_inst_id, name, type, gmt_started, service_name, service_method, service_type) Is_for_update, input_params, status, business_key, state_id_compensated_for, state_id_retried_for) * VALUES ('4fe5f602452c84ba5e88fd2ee9c13b35', '192.168.59.146 status 8091VV 6585349147990016', 'SaveOrder',' ServiceTask', '2020-10-31 1718 purse 40.84', 'orderSave', *' saveOrder', null, 1,'["1604135904773" {"@ type": "io.seata.sample.entity.Order", "count": 1, "payAmount": 50, "productId": 1, "userId": 1}]', 'RU', null, null, null) * / stateMachineConfig.getStateLogStore () .recordStateStarted (stateInstance, context) }} / / omit part of the code stateMachineInstance.putStateInstance (stateInstance.getId (), stateInstance); / / put the stateMap into StateMachineInstanceImpl for retry or transaction compensation ((HierarchicalProcessContext) context) .setVariableLocally (DomainConstants.VAR_NAME_STATE_INST, stateInstance) / / record the status and pass it to TaskStateRouter to judge the end of the global transaction} @ Override public void postProcess (ProcessContext context, Exception exp) throws EngineExecutionException {StateInstruction instruction = context.getInstruction (StateInstruction.class); ServiceTaskStateImpl state = (ServiceTaskStateImpl) instruction.getState (context); StateMachineInstance stateMachineInstance = (StateMachineInstance) context.getVariable (DomainConstants.VAR_NAME_STATEMACHINE_INST) StateInstance stateInstance = (StateInstance) context.getVariable (DomainConstants.VAR_NAME_STATE_INST); if (stateInstance = = null | |! stateMachineInstance.isRunning ()) {LOGGER.warn ("StateMachineInstance [id:" + stateMachineInstance.getId () + "] is end. Stop running "); return;} StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable (DomainConstants.VAR_NAME_STATEMACHINE_CONFIG); if (exp = = null) {exp = (Exception) context.getVariable (DomainConstants.VAR_NAME_CURRENT_EXCEPTION);} stateInstance.setException (exp) / / set transaction status decideExecutionStatus (context, stateInstance, state, exp); / / omit partial code Map contextVariables = (Map) context.getVariable (DomainConstants.VAR_NAME_STATEMACHINE_CONTEXT); / / omit partial code context.removeVariable (DomainConstants.VAR_NAME_OUTPUT_PARAMS); context.removeVariable (DomainConstants.VAR_NAME_INPUT_PARAMS) StateInstance.setGmtEnd (new Date ()) If (stateMachineInstance.getStateMachine (). IsPersist () & & state.isPersist () & & stateMachineConfig.getStateLogStore ()! = null) {/ / the status of the updated branch transaction is successful / * UPDATE seata_state_inst SET gmt_end = '2020-10-31 17-18 isPersist 49.919, excep = null, status =' SU' * output_params = 'true' WHERE id =' 4fe5f602452c84ba5e88fd2ee9c13b35' AND * machine_inst_id = '192.168.59.146 AND * machine_inst_id =' 1853497147990016'/ stateMachineConfig.getStateLogStore (). RecordStateFinished (stateInstance, context) } / / omit some codes}}
From this code, we can see that before the branch transaction is executed, a StateInstanceImpl assignment is encapsulated to the ProcessContext, and after the branch transaction is executed, the StateInstanceImpl is modified. The StateInstanceImpl has three functions:
The stateMap passed into StateMachineInstanceImpl is used for retry or transaction compensation
Records the execution of branch transactions and supports persistence to the seata_state_ insttable
Incoming TaskStateRouter is used to determine the end of a global transaction
In the third step, let's take a look at the proxied method stateHandler.process (context). The implementation class of stateHandler in the normal execution logic is ServiceTaskStateHandler. The code is as follows:
Public void process (ProcessContext context) throws EngineExecutionException {StateInstruction instruction = context.getInstruction (StateInstruction.class); ServiceTaskStateImpl state = (ServiceTaskStateImpl) instruction.getState (context); StateInstance stateInstance = (StateInstance) context.getVariable (DomainConstants.VAR_NAME_STATE_INST); Object result Try {/ * the input here is defined in JSON, such as the ServiceTask of orderSave. The input is as follows: * 0 = "1608714480316" * 1 = {Order@11271} "Order (id=null, userId=1, productId=1, count=1, payAmount=50, status=null)" * JSON is defined as follows: * "Input": [* "$. [businessKey]" * "$. [order]" *] * / List input = (List) context.getVariable (DomainConstants.VAR_NAME_INPUT_PARAMS) / / Set the current task execution status to RU (Running) stateInstance.setStatus (ExecutionStatus.RU); / / set status if (state instanceof CompensateSubStateMachineState) {/ / Research on omitted sub-state machine} else {StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable (DomainConstants.VAR_NAME_STATEMACHINE_CONFIG) / / the state.getServiceType here is springBean ServiceInvoker serviceInvoker = stateMachineConfig.getServiceInvokerManager (). GetServiceInvoker (state.getServiceType ()); if (serviceInvoker = = null) {throw new EngineExecutionException ("No such ServiceInvoker [" + state.getServiceType () + "]", FrameworkErrorCode.ObjectNotExists) } if (serviceInvoker instanceof ApplicationContextAware) {((ApplicationContextAware) serviceInvoker) .setApplicationContext (stateMachineConfig.getApplicationContext ());} / / this triggers us to define methods in ServiceTask in JSON, such as saveOrder method result = serviceInvoker.invoke (state, input.toArray ()) in orderSave } if (LOGGER.isDebugEnabled ()) {LOGGER.debug ("
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.