diff --git a/all/pom.xml b/all/pom.xml index 0c10a348302..a32e859b001 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -325,6 +325,12 @@ ${project.version} + + org.apache.seata + seata-saga-annotation + ${project.version} + + org.springframework diff --git a/bom/pom.xml b/bom/pom.xml index 224c84d1f61..d7007908e1e 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -338,6 +338,11 @@ seata-saga-engine-store ${project.version} + + org.apache.seata + seata-saga-annotation + ${project.version} + org.apache.seata seata-sqlparser-core diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index b831f723a45..cd7e9d54299 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -6,6 +6,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6370](https://github.com/seata/seata/pull/6370)] seata saga decouple spring, optimize architecture. - [[#6205](https://github.com/apache/incubator-seata/pull/6205)] mock server - [[#6169](https://github.com/apache/incubator-seata/pull/6169)] full support for states in the refactored state machine designer +- [[#5300](https://github.com/apache/incubator-seata/pull/5300)] support annotation usage in saga mode ### bugfix: - [[#6090](https://github.com/apache/incubator-seata/pull/6090)] fix the TCC aspect exception handling process, do not wrapping the internal call exceptions diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 2997bbf7dde..7d61b32b095 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -6,6 +6,7 @@ - [[#6370](https://github.com/seata/seata/pull/6370)] seata saga spring接耦、架构优化。 - [[#6205](https://github.com/apache/incubator-seata/pull/6205)] 提供mock server - [[#6169](https://github.com/apache/incubator-seata/pull/6169)] 支持新版本状态机设计器 +- [[#5300](https://github.com/apache/incubator-seata/pull/5300)] 支持saga模式的注解化使用方式 ### bugfix: - [[#6090](https://github.com/apache/incubator-seata/pull/6090)] 修复tcc切面异常处理过程,不对内部调用异常做包装处理,直接向外抛出 diff --git a/compatible/src/main/java/io/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java b/compatible/src/main/java/io/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java index 25d19a588a7..eb13d16ebe0 100644 --- a/compatible/src/main/java/io/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java +++ b/compatible/src/main/java/io/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java @@ -16,6 +16,12 @@ */ package io.seata.rm.tcc.interceptor; +import java.lang.annotation.Annotation; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + import io.seata.core.context.RootContext; import io.seata.core.model.BranchType; import io.seata.rm.tcc.api.TwoPhaseBusinessAction; @@ -27,12 +33,6 @@ import org.apache.seata.integration.tx.api.interceptor.TwoPhaseBusinessActionParam; import org.slf4j.MDC; -import java.lang.annotation.Annotation; -import java.lang.reflect.Method; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - import static org.apache.seata.common.Constants.BEAN_NAME_SPRING_FENCE_CONFIG; public class TccActionInterceptorHandler extends org.apache.seata.rm.tcc.interceptor.TccActionInterceptorHandler { @@ -91,7 +91,7 @@ protected Object doInvoke(InvocationWrapper invocation) throws Throwable { } private TwoPhaseBusinessAction parseAnnotation(Method methodKey) throws NoSuchMethodException { - TwoPhaseBusinessAction result = convertIoSeata(parseAnnotationCache.computeIfAbsent(methodKey, method -> { + TwoPhaseBusinessAction result = convertIoSeata((org.apache.seata.rm.tcc.api.TwoPhaseBusinessAction )parseAnnotationCache.computeIfAbsent(methodKey, method -> { TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class); if (businessAction == null && targetBean.getClass() != null) { Set> interfaceClasses = ReflectionUtil.getInterfaces(targetBean.getClass()); diff --git a/core/src/main/java/org/apache/seata/core/model/BranchType.java b/core/src/main/java/org/apache/seata/core/model/BranchType.java index f3a868f93d3..37ae5342b75 100644 --- a/core/src/main/java/org/apache/seata/core/model/BranchType.java +++ b/core/src/main/java/org/apache/seata/core/model/BranchType.java @@ -38,6 +38,11 @@ public enum BranchType { */ SAGA, + /** + * The SAGA_ANNOTATION. + */ + SAGA_ANNOTATION, + /** * The XA. */ diff --git a/saga/pom.xml b/saga/pom.xml index 68185fed444..0d05dbb5fa3 100644 --- a/saga/pom.xml +++ b/saga/pom.xml @@ -38,6 +38,7 @@ seata-saga-rm seata-saga-engine-store seata-saga-spring + seata-saga-annotation diff --git a/saga/seata-saga-annotation/pom.xml b/saga/seata-saga-annotation/pom.xml new file mode 100644 index 00000000000..5cb503a47ac --- /dev/null +++ b/saga/seata-saga-annotation/pom.xml @@ -0,0 +1,42 @@ + + + + + seata-saga + org.apache.seata + ${revision} + + 4.0.0 + seata-saga-annotation + seata-saga-annotation ${project.version} + saga annotation module for Seata built with Maven + + + + ${project.groupId} + seata-tcc + ${project.version} + + + + + \ No newline at end of file diff --git a/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/RMHandlerSagaAnnotation.java b/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/RMHandlerSagaAnnotation.java new file mode 100644 index 00000000000..72c556e0351 --- /dev/null +++ b/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/RMHandlerSagaAnnotation.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.saga.rm; + + +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.core.model.BranchType; +import org.apache.seata.core.model.ResourceManager; +import org.apache.seata.core.protocol.transaction.BranchCommitRequest; +import org.apache.seata.core.protocol.transaction.BranchCommitResponse; +import org.apache.seata.rm.AbstractRMHandler; +import org.apache.seata.rm.DefaultResourceManager; + +/** + * The type Rm handler SAGA_ANNOTATION. + */ +public class RMHandlerSagaAnnotation extends AbstractRMHandler { + + + @Override + protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException { + //do nothing, impossible to reach here + } + + /** + * get SAGA resource manager + * + * @return the resource manager + */ + @Override + protected ResourceManager getResourceManager() { + return DefaultResourceManager.get().getResourceManager(BranchType.SAGA_ANNOTATION); + } + + @Override + public BranchType getBranchType() { + return BranchType.SAGA_ANNOTATION; + } + +} diff --git a/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/SagaAnnotationResource.java b/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/SagaAnnotationResource.java new file mode 100644 index 00000000000..1375ece7546 --- /dev/null +++ b/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/SagaAnnotationResource.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.saga.rm; + +import java.lang.reflect.Method; +import org.apache.seata.core.model.BranchType; +import org.apache.seata.core.model.Resource; + +/** + * The type Saga annotation resource. + */ +public class SagaAnnotationResource implements Resource { + + private String resourceGroupId = "DEFAULT"; + + private String appName; + + private String actionName; + + private Object targetBean; + + private String compensationMethodName; + + private Method compensationMethod; + + private Class[] compensationArgsClasses; + + private String[] phaseTwoRollbackKeys; + + @Override + public String getResourceGroupId() { + return resourceGroupId; + } + + /** + * Sets resource group id. + * + * @param resourceGroupId the resource group id + */ + public void setResourceGroupId(String resourceGroupId) { + this.resourceGroupId = resourceGroupId; + } + + @Override + public String getResourceId() { + return actionName; + } + + @Override + public BranchType getBranchType() { + return BranchType.SAGA_ANNOTATION; + } + + /** + * Gets app name. + * + * @return the app name + */ + public String getAppName() { + return appName; + } + + /** + * Sets app name. + * + * @param appName the app name + */ + public void setAppName(String appName) { + this.appName = appName; + } + + /** + * Gets action name. + * + * @return the action name + */ + public String getActionName() { + return actionName; + } + + /** + * Sets action name. + * + * @param actionName the action name + */ + public void setActionName(String actionName) { + this.actionName = actionName; + } + + /** + * Gets target bean. + * + * @return the target bean + */ + public Object getTargetBean() { + return targetBean; + } + + /** + * Sets target bean. + * + * @param targetBean the target bean + */ + public void setTargetBean(Object targetBean) { + this.targetBean = targetBean; + } + + /** + * Gets compensation method. + * + * @return the rollback method + */ + public Method getCompensationMethod() { + return compensationMethod; + } + + /** + * Sets compensation method. + * + * @param compensationMethod the rollback method + */ + public void setCompensationMethod(Method compensationMethod) { + this.compensationMethod = compensationMethod; + } + + /** + * Gets compensation method name. + * + * @return the rollback method name + */ + public String getCompensationMethodName() { + return compensationMethodName; + } + + /** + * Sets compensation method name. + * + * @param compensationMethodName the rollback method name + */ + public void setCompensationMethodName(String compensationMethodName) { + this.compensationMethodName = compensationMethodName; + } + + /** + * get compensation method args + * + * @return class array + */ + public Class[] getCompensationArgsClasses() { + return compensationArgsClasses; + } + + /** + * set compensation method args + * + * @param compensationArgsClasses rollbackArgsClasses + */ + public void setCompensationArgsClasses(Class[] compensationArgsClasses) { + this.compensationArgsClasses = compensationArgsClasses; + } + + /** + * get compensation method args keys + * + * @return keys array + */ + public String[] getPhaseTwoRollbackKeys() { + return phaseTwoRollbackKeys; + } + + /** + * set compensation method args key + * + * @param phaseTwoRollbackKeys phaseTwoRollbackKeys + */ + public void setPhaseTwoRollbackKeys(String[] phaseTwoRollbackKeys) { + this.phaseTwoRollbackKeys = phaseTwoRollbackKeys; + } + + @Override + public int hashCode() { + return actionName.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof SagaAnnotationResource)) { + return false; + } + return this.actionName.equals(((SagaAnnotationResource) obj).actionName); + } + +} diff --git a/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/SagaAnnotationResourceManager.java b/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/SagaAnnotationResourceManager.java new file mode 100644 index 00000000000..b170984706f --- /dev/null +++ b/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/SagaAnnotationResourceManager.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.saga.rm; + +import java.lang.reflect.Method; +import org.apache.seata.core.model.BranchStatus; +import org.apache.seata.core.model.BranchType; +import org.apache.seata.core.model.Resource; +import org.apache.seata.rm.tcc.TCCResourceManager; +import org.apache.seata.rm.tcc.api.BusinessActionContext; +import org.apache.seata.rm.tcc.api.BusinessActionContextUtil; + + +/** + * Saga annotation resource manager + */ +public class SagaAnnotationResourceManager extends TCCResourceManager { + + /** + * saga branch commit + * + * @param branchType + * @param xid Transaction id. + * @param branchId Branch id. + * @param resourceId Resource id. + * @param applicationData Application data bind with this branch. + * @return BranchStatus + */ + @Override + public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) { + //impossible to reach here + return BranchStatus.PhaseTwo_Committed; + } + + protected Object[] getTwoPhaseRollbackArgs(Resource resource, BusinessActionContext businessActionContext) { + String[] keys = ((SagaAnnotationResource) resource).getPhaseTwoRollbackKeys(); + Class[] argsRollbackClasses = ((SagaAnnotationResource) resource).getCompensationArgsClasses(); + return BusinessActionContextUtil.getTwoPhaseMethodParams(keys, argsRollbackClasses, businessActionContext); + } + + protected Object getTargetBean(Resource resource) { + Object targetBean = ((SagaAnnotationResource) resource).getTargetBean(); + return targetBean; + } + + protected Method getRollbackMethod(Resource resource) { + Method rollbackMethod = ((SagaAnnotationResource) resource).getCompensationMethod(); + return rollbackMethod; + } + + protected String resolveActionName(Resource resource) { + return ((SagaAnnotationResource) resource).getActionName(); + } + + @Override + public BranchType getBranchType() { + return BranchType.SAGA_ANNOTATION; + } +} \ No newline at end of file diff --git a/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/api/CompensationBusinessAction.java b/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/api/CompensationBusinessAction.java new file mode 100644 index 00000000000..6c09ebb877c --- /dev/null +++ b/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/api/CompensationBusinessAction.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.saga.rm.api; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import org.apache.seata.rm.tcc.api.BusinessActionContext; + + +/** + * Saga annotation. + * Define a saga interface, which added on the commit method, if occurs rollback, compensation will be called. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD}) +@Inherited +public @interface CompensationBusinessAction { + + /** + * saga bean name, must be unique + * + * @return the string + */ + String name(); + + /** + * compensation method name + * + * @return the string + */ + String compensationMethod() default "compensation"; + + /** + * delay branch report while sharing params to phase 2 to enhance performance + * + * @return isDelayReport + */ + boolean isDelayReport() default false; + + /** + * whether use fence (idempotent,non_rollback,suspend) + * + * @return the boolean + */ + boolean useFence() default false; + + /** + * compensation method's args + * + * @return the Class[] + */ + Class[] compensationArgsClasses() default {BusinessActionContext.class}; +} diff --git a/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/interceptor/SagaActionInterceptorHandler.java b/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/interceptor/SagaActionInterceptorHandler.java new file mode 100644 index 00000000000..5eb00448b2b --- /dev/null +++ b/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/interceptor/SagaActionInterceptorHandler.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.saga.rm.interceptor; + +import java.lang.annotation.Annotation; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.seata.common.Constants; +import org.apache.seata.core.model.BranchType; +import org.apache.seata.integration.tx.api.interceptor.TwoPhaseBusinessActionParam; +import org.apache.seata.rm.tcc.interceptor.TccActionInterceptorHandler; +import org.apache.seata.saga.rm.api.CompensationBusinessAction; + +/** + * saga-annotation invocationHandler, extended from TccActionInterceptorHandler. + */ +public class SagaActionInterceptorHandler extends TccActionInterceptorHandler { + + + public SagaActionInterceptorHandler(Object target, Set methodsToProxy) { + super(target, methodsToProxy); + } + + @Override + protected TwoPhaseBusinessActionParam createTwoPhaseBusinessActionParam(Annotation annotation) { + CompensationBusinessAction businessAction = (CompensationBusinessAction) annotation; + + TwoPhaseBusinessActionParam businessActionParam = new TwoPhaseBusinessActionParam(); + businessActionParam.setActionName(businessAction.name()); + businessActionParam.setDelayReport(businessAction.isDelayReport()); + businessActionParam.setUseCommonFence(businessAction.useFence()); + businessActionParam.setBranchType(BranchType.SAGA_ANNOTATION); + + Map businessActionContextMap = new HashMap<>(4); + businessActionContextMap.put(Constants.ROLLBACK_METHOD, businessAction.compensationMethod()); + businessActionContextMap.put(Constants.ACTION_NAME, businessAction.name()); + businessActionContextMap.put(Constants.USE_COMMON_FENCE, businessAction.useFence()); + businessActionParam.setBusinessActionContext(businessActionContextMap); + + return businessActionParam; + } + + @Override + protected BranchType getBranchType() { + return BranchType.SAGA_ANNOTATION; + } + + @Override + protected Class getAnnotationClass() { + return CompensationBusinessAction.class; + } +} diff --git a/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/interceptor/parser/SagaActionInterceptorParser.java b/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/interceptor/parser/SagaActionInterceptorParser.java new file mode 100644 index 00000000000..897d87154a8 --- /dev/null +++ b/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/interceptor/parser/SagaActionInterceptorParser.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.saga.rm.interceptor.parser; + +import java.lang.annotation.Annotation; +import java.util.Set; +import org.apache.seata.integration.tx.api.interceptor.handler.ProxyInvocationHandler; +import org.apache.seata.rm.tcc.interceptor.parser.TccActionInterceptorParser; +import org.apache.seata.saga.rm.api.CompensationBusinessAction; +import org.apache.seata.saga.rm.interceptor.SagaActionInterceptorHandler; + +/** + * saga-annotation proxyInvocationHandler parser, extended from TccActionInterceptorParser, used to identify the saga annotation {@link CompensationBusinessAction} and return the proxy handler. + */ +public class SagaActionInterceptorParser extends TccActionInterceptorParser { + + @Override + protected ProxyInvocationHandler createProxyInvocationHandler(Object target, Set methodsToProxy) { + ProxyInvocationHandler proxyInvocationHandler = new SagaActionInterceptorHandler(target, methodsToProxy); + return proxyInvocationHandler; + } + + @Override + protected Class getAnnotationClass() { + return CompensationBusinessAction.class; + } +} diff --git a/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/parser/SagaRegisterResourceParser.java b/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/parser/SagaRegisterResourceParser.java new file mode 100644 index 00000000000..cb149d57c4c --- /dev/null +++ b/saga/seata-saga-annotation/src/main/java/org/apache/seata/saga/rm/parser/SagaRegisterResourceParser.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.saga.rm.parser; + +import java.lang.annotation.Annotation; +import java.lang.reflect.Method; +import org.apache.seata.core.model.Resource; +import org.apache.seata.rm.tcc.resource.parser.TccRegisterResourceParser; +import org.apache.seata.saga.rm.SagaAnnotationResource; +import org.apache.seata.saga.rm.api.CompensationBusinessAction; + +/** + * saga-annotation ResourceRegister parser, extended from TccRegisterResourceParser. + */ +public class SagaRegisterResourceParser extends TccRegisterResourceParser { + + @Override + protected Resource createResource(Object targetBean, Class serviceClass, Method method, Annotation annotation) throws NoSuchMethodException { + CompensationBusinessAction compensationBusinessAction = (CompensationBusinessAction) annotation; + SagaAnnotationResource sagaAnnotationResource = new SagaAnnotationResource(); + sagaAnnotationResource.setActionName(compensationBusinessAction.name()); + sagaAnnotationResource.setTargetBean(targetBean); + sagaAnnotationResource.setCompensationMethodName(compensationBusinessAction.compensationMethod()); + sagaAnnotationResource.setCompensationMethod(serviceClass.getMethod(compensationBusinessAction.compensationMethod(), compensationBusinessAction.compensationArgsClasses())); + sagaAnnotationResource.setCompensationArgsClasses(compensationBusinessAction.compensationArgsClasses()); + sagaAnnotationResource.setPhaseTwoRollbackKeys(this.getTwoPhaseArgs(sagaAnnotationResource.getCompensationMethod(), compensationBusinessAction.compensationArgsClasses())); + + return sagaAnnotationResource; + } + + @Override + protected Class getAnnotationClass() { + return CompensationBusinessAction.class; + } + +} diff --git a/saga/seata-saga-annotation/src/main/resources/META-INF/services/org.apache.seata.core.model.ResourceManager b/saga/seata-saga-annotation/src/main/resources/META-INF/services/org.apache.seata.core.model.ResourceManager new file mode 100644 index 00000000000..f717083de20 --- /dev/null +++ b/saga/seata-saga-annotation/src/main/resources/META-INF/services/org.apache.seata.core.model.ResourceManager @@ -0,0 +1 @@ +org.apache.seata.saga.rm.SagaAnnotationResourceManager \ No newline at end of file diff --git a/saga/seata-saga-annotation/src/main/resources/META-INF/services/org.apache.seata.integration.tx.api.interceptor.parser.InterfaceParser b/saga/seata-saga-annotation/src/main/resources/META-INF/services/org.apache.seata.integration.tx.api.interceptor.parser.InterfaceParser new file mode 100644 index 00000000000..53582485de2 --- /dev/null +++ b/saga/seata-saga-annotation/src/main/resources/META-INF/services/org.apache.seata.integration.tx.api.interceptor.parser.InterfaceParser @@ -0,0 +1 @@ +org.apache.seata.saga.rm.interceptor.parser.SagaActionInterceptorParser \ No newline at end of file diff --git a/saga/seata-saga-annotation/src/main/resources/META-INF/services/org.apache.seata.integration.tx.api.interceptor.parser.RegisterResourceParser b/saga/seata-saga-annotation/src/main/resources/META-INF/services/org.apache.seata.integration.tx.api.interceptor.parser.RegisterResourceParser new file mode 100644 index 00000000000..3dbe12f4f84 --- /dev/null +++ b/saga/seata-saga-annotation/src/main/resources/META-INF/services/org.apache.seata.integration.tx.api.interceptor.parser.RegisterResourceParser @@ -0,0 +1 @@ +org.apache.seata.saga.rm.parser.SagaRegisterResourceParser \ No newline at end of file diff --git a/saga/seata-saga-annotation/src/main/resources/META-INF/services/org.apache.seata.rm.AbstractRMHandler b/saga/seata-saga-annotation/src/main/resources/META-INF/services/org.apache.seata.rm.AbstractRMHandler new file mode 100644 index 00000000000..c4ada3af884 --- /dev/null +++ b/saga/seata-saga-annotation/src/main/resources/META-INF/services/org.apache.seata.rm.AbstractRMHandler @@ -0,0 +1 @@ +org.apache.seata.saga.rm.RMHandlerSagaAnnotation \ No newline at end of file diff --git a/saga/seata-saga-annotation/src/test/java/org/apache/seata/saga/BranchSessionMock.java b/saga/seata-saga-annotation/src/test/java/org/apache/seata/saga/BranchSessionMock.java new file mode 100644 index 00000000000..e6afa4e9a38 --- /dev/null +++ b/saga/seata-saga-annotation/src/test/java/org/apache/seata/saga/BranchSessionMock.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.saga; + + +import org.apache.seata.core.model.BranchType; + +public class BranchSessionMock { + + private String xid; + + private long branchId; + + private String resourceGroupId; + + private String resourceId; + + + private BranchType branchType; + + + private String applicationData; + + + public String getXid() { + return xid; + } + + public void setXid(String xid) { + this.xid = xid; + } + + public long getBranchId() { + return branchId; + } + + public void setBranchId(long branchId) { + this.branchId = branchId; + } + + public String getResourceGroupId() { + return resourceGroupId; + } + + public void setResourceGroupId(String resourceGroupId) { + this.resourceGroupId = resourceGroupId; + } + + public String getResourceId() { + return resourceId; + } + + public void setResourceId(String resourceId) { + this.resourceId = resourceId; + } + + public BranchType getBranchType() { + return branchType; + } + + public void setBranchType(BranchType branchType) { + this.branchType = branchType; + } + + public String getApplicationData() { + return applicationData; + } + + public void setApplicationData(String applicationData) { + this.applicationData = applicationData; + } +} diff --git a/saga/seata-saga-annotation/src/test/java/org/apache/seata/saga/NormalSagaAction.java b/saga/seata-saga-annotation/src/test/java/org/apache/seata/saga/NormalSagaAction.java new file mode 100644 index 00000000000..ea41d6a35b0 --- /dev/null +++ b/saga/seata-saga-annotation/src/test/java/org/apache/seata/saga/NormalSagaAction.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.saga; + +import org.apache.seata.rm.tcc.api.BusinessActionContext; +import org.apache.seata.rm.tcc.api.BusinessActionContextParameter; +import org.apache.seata.rm.tcc.api.LocalTCC; +import org.apache.seata.saga.rm.api.CompensationBusinessAction; + +import java.util.List; + +/** + * The interface saga action. + */ +@LocalTCC +public interface NormalSagaAction { + + /** + * Prepare boolean. + * + * @param actionContext the action context + * @param a the a + * @param b the b + * @param sagaParam the saga param + * @return the boolean + */ + @CompensationBusinessAction(name = "sagaActionForTest", compensationMethod = "compensation", compensationArgsClasses = {BusinessActionContext.class, SagaParam.class}) + boolean commit(BusinessActionContext actionContext, + @BusinessActionContextParameter("a") int a, + @BusinessActionContextParameter(paramName = "b", index = 0) List b, + @BusinessActionContextParameter(isParamInProperty = true) SagaParam sagaParam); + + /** + * Rollback boolean. + * + * @param actionContext the action context + * @return the boolean + */ + boolean compensation(BusinessActionContext actionContext, @BusinessActionContextParameter("sagaParam") SagaParam param); +} diff --git a/saga/seata-saga-annotation/src/test/java/org/apache/seata/saga/NormalSagaActionImpl.java b/saga/seata-saga-annotation/src/test/java/org/apache/seata/saga/NormalSagaActionImpl.java new file mode 100644 index 00000000000..354bdcfd62b --- /dev/null +++ b/saga/seata-saga-annotation/src/test/java/org/apache/seata/saga/NormalSagaActionImpl.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.saga; + +import java.util.List; +import org.apache.seata.rm.tcc.api.BusinessActionContext; + +/** + * + */ +public class NormalSagaActionImpl implements NormalSagaAction { + + private boolean isCommit; + + + @Override + public boolean commit(BusinessActionContext actionContext, int a, List b, SagaParam sagaParam) { + isCommit = true; + return a > 1; + } + + @Override + public boolean compensation(BusinessActionContext actionContext, SagaParam param) { + isCommit = false; + return true; + } + + public boolean isCommit() { + return isCommit; + } +} diff --git a/saga/seata-saga-annotation/src/test/java/org/apache/seata/saga/SagaParam.java b/saga/seata-saga-annotation/src/test/java/org/apache/seata/saga/SagaParam.java new file mode 100644 index 00000000000..a0dc7111cf1 --- /dev/null +++ b/saga/seata-saga-annotation/src/test/java/org/apache/seata/saga/SagaParam.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.saga; + + +import org.apache.seata.rm.tcc.api.BusinessActionContextParameter; + +/** + * The type param. + */ +public class SagaParam { + + /** + * The Num. + */ + protected int num; + + /** + * The Email. + */ + @BusinessActionContextParameter(paramName = "email") + protected String email; + + /** + * Instantiates a new param. + * + * @param num the num + * @param email the email + */ + public SagaParam(int num, String email) { + this.num = num; + this.email = email; + } + + /** + * Gets num. + * + * @return the num + */ + public int getNum() { + return num; + } + + /** + * Sets num. + * + * @param num the num + */ + public void setNum(int num) { + this.num = num; + } +} diff --git a/saga/seata-saga-annotation/src/test/java/org/apache/seata/saga/rm/interceptor/parser/SagaActionInterceptorParserTest.java b/saga/seata-saga-annotation/src/test/java/org/apache/seata/saga/rm/interceptor/parser/SagaActionInterceptorParserTest.java new file mode 100644 index 00000000000..d8f63f62e85 --- /dev/null +++ b/saga/seata-saga-annotation/src/test/java/org/apache/seata/saga/rm/interceptor/parser/SagaActionInterceptorParserTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.saga.rm.interceptor.parser; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.core.model.BranchType; +import org.apache.seata.core.model.GlobalStatus; +import org.apache.seata.core.model.ResourceManager; +import org.apache.seata.core.model.TransactionManager; +import org.apache.seata.integration.tx.api.interceptor.handler.ProxyInvocationHandler; +import org.apache.seata.integration.tx.api.util.ProxyUtil; +import org.apache.seata.rm.DefaultResourceManager; +import org.apache.seata.saga.BranchSessionMock; +import org.apache.seata.saga.NormalSagaAction; +import org.apache.seata.saga.NormalSagaActionImpl; +import org.apache.seata.saga.SagaParam; +import org.apache.seata.saga.rm.SagaAnnotationResourceManager; +import org.apache.seata.tm.TransactionManagerHolder; +import org.apache.seata.tm.api.GlobalTransaction; +import org.apache.seata.tm.api.GlobalTransactionContext; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * + */ +public class SagaActionInterceptorParserTest { + + public static String DEFAULT_XID = "default_xid"; + + + @BeforeAll + public static void init() throws IOException { + System.setProperty("config.type", "file"); + System.setProperty("config.file.name", "file.conf"); + System.setProperty("txServiceGroup", "default_tx_group"); + System.setProperty("service.vgroupMapping.default_tx_group", "default"); + } + + + @Test + void parserInterfaceToProxy() { + //given + NormalSagaActionImpl sagaAction = new NormalSagaActionImpl(); + + SagaActionInterceptorParser sagaActionInterceptorParser = new SagaActionInterceptorParser(); + + //when + ProxyInvocationHandler proxyInvocationHandler = sagaActionInterceptorParser.parserInterfaceToProxy(sagaAction, "sagaAction"); + + //then + Assertions.assertNotNull(proxyInvocationHandler); + + } + + @Test + public void testSagaAnnotation_should_commit() throws TransactionException { + //given + + DefaultResourceManager.get(); + DefaultResourceManager.mockResourceManager(BranchType.SAGA_ANNOTATION, resourceManager); + + TransactionManagerHolder.set(transactionManager); + + NormalSagaActionImpl sagaAction = new NormalSagaActionImpl(); + NormalSagaAction tccActionProxy = ProxyUtil.createProxy(sagaAction); + + SagaParam sagaParam = new SagaParam(2, "abc@163.com"); + List listB = Arrays.asList("b"); + + + GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); + + try { + tx.begin(60000, "testBiz"); + + boolean result = tccActionProxy.commit(null, 2, listB, sagaParam); + + Assertions.assertTrue(result); + + if (result) { + tx.commit(); + } else { + tx.rollback(); + } + } catch (Exception exx) { + tx.rollback(); + throw exx; + } + + Assertions.assertTrue(sagaAction.isCommit()); + + } + + @Test + public void testSagaAnnotation_should_rollback() throws TransactionException { + //given + + DefaultResourceManager.get(); + DefaultResourceManager.mockResourceManager(BranchType.SAGA_ANNOTATION, resourceManager); + + TransactionManagerHolder.set(transactionManager); + + NormalSagaActionImpl sagaAction = new NormalSagaActionImpl(); + NormalSagaAction tccActionProxy = ProxyUtil.createProxy(sagaAction); + + SagaParam sagaParam = new SagaParam(1, "abc@163.com"); + List listB = Arrays.asList("b"); + + + GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); + + try { + tx.begin(60000, "testBiz"); + + boolean result = tccActionProxy.commit(null, 1, listB, sagaParam); + + Assertions.assertFalse(result); + + if (result) { + tx.commit(); + } else { + tx.rollback(); + } + } catch (Exception exx) { + tx.rollback(); + throw exx; + } + + Assertions.assertFalse(sagaAction.isCommit()); + } + + private static Map> applicationDataMap = new ConcurrentHashMap<>(); + + + private static TransactionManager transactionManager = new TransactionManager() { + @Override + public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { + return DEFAULT_XID; + } + + @Override + public GlobalStatus commit(String xid) throws TransactionException { + return GlobalStatus.Committed; + } + + @Override + public GlobalStatus rollback(String xid) throws TransactionException { + + rollbackAll(xid); + + return GlobalStatus.Rollbacked; + } + + @Override + public GlobalStatus getStatus(String xid) throws TransactionException { + return GlobalStatus.Begin; + } + + @Override + public GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException { + return globalStatus; + } + }; + + + private static ResourceManager resourceManager = new SagaAnnotationResourceManager() { + + @Override + public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException { + + long branchId = System.currentTimeMillis(); + + List branches = applicationDataMap.computeIfAbsent(xid, s -> new ArrayList<>()); + BranchSessionMock branchSessionMock = new BranchSessionMock(); + branchSessionMock.setXid(xid); + branchSessionMock.setBranchType(branchType); + branchSessionMock.setResourceId(resourceId); + branchSessionMock.setApplicationData(applicationData); + branchSessionMock.setBranchId(branchId); + + branches.add(branchSessionMock); + + return branchId; + } + }; + + + public static void rollbackAll(String xid) throws TransactionException { + + List branches = applicationDataMap.computeIfAbsent(xid, s -> new ArrayList<>()); + for (BranchSessionMock branch : branches) { + resourceManager.branchRollback(branch.getBranchType(), branch.getXid(), branch.getBranchId(), branch.getResourceId(), branch.getApplicationData()); + } + } + +} \ No newline at end of file diff --git a/saga/seata-saga-annotation/src/test/resources/file.conf b/saga/seata-saga-annotation/src/test/resources/file.conf new file mode 100644 index 00000000000..46c3e0401cc --- /dev/null +++ b/saga/seata-saga-annotation/src/test/resources/file.conf @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +service { + #transaction service group mapping + vgroupMapping.default_tx_group = "default" + #only support when registry.type=file, please don't set multiple addresses + default.grouplist = "127.0.0.1:8091" + #disable seata + disableGlobalTransaction = false +} \ No newline at end of file diff --git a/saga/seata-saga-annotation/src/test/resources/registry.conf b/saga/seata-saga-annotation/src/test/resources/registry.conf new file mode 100644 index 00000000000..5ad014bf55a --- /dev/null +++ b/saga/seata-saga-annotation/src/test/resources/registry.conf @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +registry { + # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa + type = "file" + + file { + name = "file.conf" + } +} + +config { + # file、nacos 、apollo、zk、consul、etcd3 + type = "file" + + file { + name = "file.conf" + } +} \ No newline at end of file diff --git a/server/src/main/java/org/apache/seata/server/transaction/saga/SagaAnnotationCore.java b/server/src/main/java/org/apache/seata/server/transaction/saga/SagaAnnotationCore.java new file mode 100644 index 00000000000..62bd1f5f9e5 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/transaction/saga/SagaAnnotationCore.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.transaction.saga; + +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.core.model.BranchStatus; +import org.apache.seata.core.model.BranchType; +import org.apache.seata.core.rpc.RemotingServer; +import org.apache.seata.server.coordinator.AbstractCore; +import org.apache.seata.server.session.BranchSession; +import org.apache.seata.server.session.GlobalSession; + +/** + * The type SAGA_ANNOTATION core. + */ +public class SagaAnnotationCore extends AbstractCore { + + public SagaAnnotationCore(RemotingServer remotingServer) { + super(remotingServer); + } + + @Override + public BranchStatus branchCommit(GlobalSession globalSession, BranchSession branchSession) throws TransactionException { + //SAGA_ANNOTATION branch type, just mock commit + return BranchStatus.PhaseTwo_Committed; + } + + @Override + public BranchType getHandleBranchType() { + return BranchType.SAGA_ANNOTATION; + } +} diff --git a/server/src/main/resources/META-INF/services/org.apache.seata.server.coordinator.AbstractCore b/server/src/main/resources/META-INF/services/org.apache.seata.server.coordinator.AbstractCore index d04b0aa1903..ec1ab54d8a3 100644 --- a/server/src/main/resources/META-INF/services/org.apache.seata.server.coordinator.AbstractCore +++ b/server/src/main/resources/META-INF/services/org.apache.seata.server.coordinator.AbstractCore @@ -1,4 +1,5 @@ org.apache.seata.server.transaction.at.ATCore org.apache.seata.server.transaction.tcc.TccCore org.apache.seata.server.transaction.saga.SagaCore -org.apache.seata.server.transaction.xa.XACore \ No newline at end of file +org.apache.seata.server.transaction.xa.XACore +org.apache.seata.server.transaction.saga.SagaAnnotationCore \ No newline at end of file diff --git a/tcc/src/main/java/org/apache/seata/rm/tcc/TCCResourceManager.java b/tcc/src/main/java/org/apache/seata/rm/tcc/TCCResourceManager.java index 83c1d70e5c9..d8747fcf836 100644 --- a/tcc/src/main/java/org/apache/seata/rm/tcc/TCCResourceManager.java +++ b/tcc/src/main/java/org/apache/seata/rm/tcc/TCCResourceManager.java @@ -43,7 +43,7 @@ public class TCCResourceManager extends AbstractResourceManager { /** * TCC resource cache */ - private Map tccResourceCache = new ConcurrentHashMap<>(); + private Map resourceCache = new ConcurrentHashMap<>(); /** * Instantiates a new Tcc resource manager. @@ -53,20 +53,19 @@ public TCCResourceManager() { } /** - * registry TCC resource + * registry resource * * @param resource The resource to be managed. */ @Override public void registerResource(Resource resource) { - TCCResource tccResource = (TCCResource)resource; - tccResourceCache.put(tccResource.getResourceId(), tccResource); - super.registerResource(tccResource); + resourceCache.put(resource.getResourceId(), resource); + super.registerResource(resource); } @Override public Map getManagedResources() { - return tccResourceCache; + return resourceCache; } /** @@ -83,7 +82,7 @@ public Map getManagedResources() { @Override public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { - TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId); + TCCResource tccResource = (TCCResource)resourceCache.get(resourceId); if (tccResource == null) { throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId)); } @@ -142,27 +141,27 @@ public BranchStatus branchCommit(BranchType branchType, String xid, long branchI @Override public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { - TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId); - if (tccResource == null) { - throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId)); + Resource resource = resourceCache.get(resourceId); + if (resource == null) { + throw new ShouldNeverHappenException(String.format("%s resource is not exist, resourceId: %s", getBranchType(), resourceId)); } - Object targetTCCBean = tccResource.getTargetBean(); - Method rollbackMethod = tccResource.getRollbackMethod(); + Object targetTCCBean = getTargetBean(resource); + Method rollbackMethod = getRollbackMethod(resource); if (targetTCCBean == null || rollbackMethod == null) { - throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId)); + throw new ShouldNeverHappenException(String.format("%s resource is not available, resourceId: %s", getBranchType(), resourceId)); } try { //BusinessActionContext BusinessActionContext businessActionContext = BusinessActionContextUtil.getBusinessActionContext(xid, branchId, resourceId, applicationData); - Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext); + Object[] args = this.getTwoPhaseRollbackArgs(resource, businessActionContext); Object ret; boolean result; // add idempotent and anti hanging if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_COMMON_FENCE))) { try { result = DefaultCommonFenceHandler.get().rollbackFence(rollbackMethod, targetTCCBean, xid, branchId, - args, tccResource.getActionName()); + args, resolveActionName(resource)); } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) { throw e.getCause(); } @@ -178,10 +177,10 @@ public BranchStatus branchRollback(BranchType branchType, String xid, long branc result = true; } } - LOGGER.info("TCC resource rollback result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId); + LOGGER.info("{} resource rollback result : {}, xid: {}, branchId: {}, resourceId: {}", getBranchType(), result, xid, branchId, resourceId); return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable; } catch (Throwable t) { - String msg = String.format("rollback TCC resource error, resourceId: %s, xid: %s.", resourceId, xid); + String msg = String.format("rollback %s resource error, resourceId: %s, xid: %s.", getBranchType(), resourceId, xid); LOGGER.error(msg, t); return BranchStatus.PhaseTwo_RollbackFailed_Retryable; } @@ -201,16 +200,30 @@ private Object[] getTwoPhaseCommitArgs(TCCResource tccResource, BusinessActionCo /** * get phase two rollback method's args - * @param tccResource tccResource + * @param resource resource * @param businessActionContext businessActionContext * @return args */ - private Object[] getTwoPhaseRollbackArgs(TCCResource tccResource, BusinessActionContext businessActionContext) { - String[] keys = tccResource.getPhaseTwoRollbackKeys(); - Class[] argsRollbackClasses = tccResource.getRollbackArgsClasses(); + protected Object[] getTwoPhaseRollbackArgs(Resource resource, BusinessActionContext businessActionContext) { + String[] keys = ((TCCResource) resource).getPhaseTwoRollbackKeys(); + Class[] argsRollbackClasses = ((TCCResource) resource).getRollbackArgsClasses(); return BusinessActionContextUtil.getTwoPhaseMethodParams(keys, argsRollbackClasses, businessActionContext); } + protected Object getTargetBean(Resource resource) { + Object targetTCCBean = ((TCCResource) resource).getTargetBean(); + return targetTCCBean; + } + + protected Method getRollbackMethod(Resource resource) { + Method rollbackMethod = ((TCCResource) resource).getRollbackMethod(); + return rollbackMethod; + } + + protected String resolveActionName(Resource resource) { + return ((TCCResource) resource).getActionName(); + } + @Override public BranchType getBranchType() { return BranchType.TCC; diff --git a/tcc/src/main/java/org/apache/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java b/tcc/src/main/java/org/apache/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java index 82a7d6a7dc0..056ee895773 100644 --- a/tcc/src/main/java/org/apache/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java +++ b/tcc/src/main/java/org/apache/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java @@ -16,6 +16,7 @@ */ package org.apache.seata.rm.tcc.interceptor; +import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; @@ -39,10 +40,10 @@ import org.apache.seata.rm.tcc.api.TwoPhaseBusinessAction; import org.slf4j.MDC; - import static org.apache.seata.common.ConfigurationKeys.TCC_ACTION_INTERCEPTOR_ORDER; import static org.apache.seata.common.Constants.BEAN_NAME_SPRING_FENCE_CONFIG; + public class TccActionInterceptorHandler extends AbstractProxyInvocationHandler { private static final int ORDER_NUM = ConfigurationFactory.getInstance().getInt(TCC_ACTION_INTERCEPTOR_ORDER, @@ -53,7 +54,7 @@ public class TccActionInterceptorHandler extends AbstractProxyInvocationHandler private Set methodsToProxy; protected Object targetBean; - protected Map parseAnnotationCache = new ConcurrentHashMap<>(); + protected Map parseAnnotationCache = new ConcurrentHashMap<>(); public TccActionInterceptorHandler(Object targetBean, Set methodsToProxy) { this.targetBean = targetBean; @@ -67,7 +68,7 @@ protected Object doInvoke(InvocationWrapper invocation) throws Throwable { return invocation.proceed(); } Method method = invocation.getMethod(); - TwoPhaseBusinessAction businessAction = parseAnnotation(method); + Annotation businessAction = parseAnnotation(method); //try method if (businessAction != null) { @@ -76,28 +77,17 @@ protected Object doInvoke(InvocationWrapper invocation) throws Throwable { //save the previous branchType BranchType previousBranchType = RootContext.getBranchType(); //if not TCC, bind TCC branchType - if (BranchType.TCC != previousBranchType) { - RootContext.bindBranchType(BranchType.TCC); + if (getBranchType() != previousBranchType) { + RootContext.bindBranchType(getBranchType()); } try { - TwoPhaseBusinessActionParam businessActionParam = new TwoPhaseBusinessActionParam(); - businessActionParam.setActionName(businessAction.name()); - businessActionParam.setDelayReport(businessAction.isDelayReport()); - businessActionParam.setUseCommonFence(businessAction.useTCCFence()); - businessActionParam.setBranchType(BranchType.TCC); - Map businessActionContextMap = new HashMap<>(4); - //the phase two method name - businessActionContextMap.put(Constants.COMMIT_METHOD, businessAction.commitMethod()); - businessActionContextMap.put(Constants.ROLLBACK_METHOD, businessAction.rollbackMethod()); - businessActionContextMap.put(Constants.ACTION_NAME, businessAction.name()); - businessActionContextMap.put(Constants.USE_COMMON_FENCE, businessAction.useTCCFence()); - businessActionParam.setBusinessActionContext(businessActionContextMap); + TwoPhaseBusinessActionParam businessActionParam = createTwoPhaseBusinessActionParam(businessAction); //Handler the TCC Aspect, and return the business result return actionInterceptorHandler.proceed(method, invocation.getArguments(), xid, businessActionParam, invocation::proceed); } finally { //if not TCC, unbind branchType - if (BranchType.TCC != previousBranchType) { + if (getBranchType() != previousBranchType) { RootContext.unbindBranchType(); } //MDC remove branchId @@ -109,16 +99,16 @@ protected Object doInvoke(InvocationWrapper invocation) throws Throwable { return invocation.proceed(); } - private TwoPhaseBusinessAction parseAnnotation(Method methodKey) throws NoSuchMethodException { - TwoPhaseBusinessAction result = parseAnnotationCache.computeIfAbsent(methodKey, method -> { - TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class); + private Annotation parseAnnotation(Method methodKey) throws NoSuchMethodException { + Annotation result = parseAnnotationCache.computeIfAbsent(methodKey, method -> { + Annotation businessAction = method.getAnnotation(getAnnotationClass()); if (businessAction == null && targetBean.getClass() != null) { Set> interfaceClasses = ReflectionUtil.getInterfaces(targetBean.getClass()); if (interfaceClasses != null) { for (Class interClass : interfaceClasses) { try { Method m = interClass.getMethod(method.getName(), method.getParameterTypes()); - businessAction = m.getAnnotation(TwoPhaseBusinessAction.class); + businessAction = m.getAnnotation(getAnnotationClass()); if (businessAction != null) { // init common fence clean task if enable useTccFence initCommonFenceCleanTask(businessAction); @@ -140,12 +130,12 @@ private TwoPhaseBusinessAction parseAnnotation(Method methodKey) throws NoSuchMe * * @param twoPhaseBusinessAction the twoPhaseBusinessAction */ - private void initCommonFenceCleanTask(TwoPhaseBusinessAction twoPhaseBusinessAction) { + private void initCommonFenceCleanTask(Annotation twoPhaseBusinessAction) { CommonFenceConfig commonFenceConfig = (CommonFenceConfig) ObjectHolder.INSTANCE.getObject(BEAN_NAME_SPRING_FENCE_CONFIG); if (commonFenceConfig == null || commonFenceConfig.getInitialized().get()) { return; } - if (twoPhaseBusinessAction != null && twoPhaseBusinessAction.useTCCFence()) { + if (twoPhaseBusinessAction != null && parserCommonFenceConfig(twoPhaseBusinessAction)) { if (commonFenceConfig.getInitialized().compareAndSet(false, true)) { // init common fence clean task if enable useTccFence commonFenceConfig.init(); @@ -177,4 +167,38 @@ public int order() { public String type() { return InvocationHandlerType.TwoPhaseAnnotation.name(); } + + protected TwoPhaseBusinessActionParam createTwoPhaseBusinessActionParam(Annotation annotation) { + TwoPhaseBusinessAction businessAction = (TwoPhaseBusinessAction) annotation; + TwoPhaseBusinessActionParam businessActionParam = new TwoPhaseBusinessActionParam(); + businessActionParam.setActionName(businessAction.name()); + businessActionParam.setDelayReport(businessAction.isDelayReport()); + businessActionParam.setUseCommonFence(businessAction.useTCCFence()); + businessActionParam.setBranchType(getBranchType()); + Map businessActionContextMap = new HashMap<>(4); + //the phase two method name + businessActionContextMap.put(Constants.COMMIT_METHOD, businessAction.commitMethod()); + businessActionContextMap.put(Constants.ROLLBACK_METHOD, businessAction.rollbackMethod()); + businessActionContextMap.put(Constants.ACTION_NAME, businessAction.name()); + businessActionContextMap.put(Constants.USE_COMMON_FENCE, businessAction.useTCCFence()); + businessActionParam.setBusinessActionContext(businessActionContextMap); + return businessActionParam; + } + + protected boolean parserCommonFenceConfig(Annotation annotation) { + if (annotation == null) { + return false; + } + TwoPhaseBusinessAction businessAction = (TwoPhaseBusinessAction) annotation; + return businessAction.useTCCFence(); + } + + protected BranchType getBranchType() { + return BranchType.TCC; + } + + protected Class getAnnotationClass() { + return TwoPhaseBusinessAction.class; + } + } diff --git a/tcc/src/main/java/org/apache/seata/rm/tcc/interceptor/parser/TccActionInterceptorParser.java b/tcc/src/main/java/org/apache/seata/rm/tcc/interceptor/parser/TccActionInterceptorParser.java index 740690b19db..b4fd3ca6d8f 100644 --- a/tcc/src/main/java/org/apache/seata/rm/tcc/interceptor/parser/TccActionInterceptorParser.java +++ b/tcc/src/main/java/org/apache/seata/rm/tcc/interceptor/parser/TccActionInterceptorParser.java @@ -16,6 +16,7 @@ */ package org.apache.seata.rm.tcc.interceptor.parser; +import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.util.Arrays; import java.util.Collections; @@ -40,13 +41,14 @@ public class TccActionInterceptorParser implements InterfaceParser { @Override public ProxyInvocationHandler parserInterfaceToProxy(Object target, String objectName) { // eliminate the bean without two phase annotation. - Set methodsToProxy = this.tccProxyTargetMethod(target); + Set methodsToProxy = this.parseProxyTargetMethod(target); if (methodsToProxy.isEmpty()) { return null; } // register resource and enhance with interceptor DefaultResourceRegisterParser.get().registerResource(target, objectName); - return new TccActionInterceptorHandler(target, methodsToProxy); + ProxyInvocationHandler proxyInvocationHandler = createProxyInvocationHandler(target, methodsToProxy); + return proxyInvocationHandler; } @Override @@ -66,7 +68,7 @@ public IfNeedEnhanceBean parseIfNeedEnhancement(Class beanClass) { * @return boolean boolean */ - private Set tccProxyTargetMethod(Object target) { + protected Set parseProxyTargetMethod(Object target) { Set methodsToProxy = new HashSet<>(); //check if it is TCC bean Class tccServiceClazz = target.getClass(); @@ -78,10 +80,9 @@ private Set tccProxyTargetMethod(Object target) { } } - TwoPhaseBusinessAction twoPhaseBusinessAction; + Class twoPhaseBusinessAction = getAnnotationClass(); for (Method method : methods) { - twoPhaseBusinessAction = method.getAnnotation(TwoPhaseBusinessAction.class); - if (twoPhaseBusinessAction != null) { + if (method.isAnnotationPresent(twoPhaseBusinessAction)) { methodsToProxy.add(method.getName()); } } @@ -92,4 +93,13 @@ private Set tccProxyTargetMethod(Object target) { // sofa:reference / dubbo:reference, AOP return methodsToProxy; } + + protected ProxyInvocationHandler createProxyInvocationHandler(Object target, Set methodsToProxy) { + ProxyInvocationHandler proxyInvocationHandler = new TccActionInterceptorHandler(target, methodsToProxy); + return proxyInvocationHandler; + } + + protected Class getAnnotationClass() { + return TwoPhaseBusinessAction.class; + } } diff --git a/tcc/src/main/java/org/apache/seata/rm/tcc/resource/parser/TccRegisterResourceParser.java b/tcc/src/main/java/org/apache/seata/rm/tcc/resource/parser/TccRegisterResourceParser.java index d95e54e9d36..34aba10b692 100644 --- a/tcc/src/main/java/org/apache/seata/rm/tcc/resource/parser/TccRegisterResourceParser.java +++ b/tcc/src/main/java/org/apache/seata/rm/tcc/resource/parser/TccRegisterResourceParser.java @@ -25,6 +25,7 @@ import org.apache.seata.common.exception.FrameworkException; import org.apache.seata.common.util.ReflectionUtil; import org.apache.seata.common.util.StringUtils; +import org.apache.seata.core.model.Resource; import org.apache.seata.integration.tx.api.interceptor.ActionContextUtil; import org.apache.seata.integration.tx.api.interceptor.parser.RegisterResourceParser; import org.apache.seata.rm.DefaultResourceManager; @@ -52,31 +53,11 @@ public void registerResource(Object target, String beanName) { private void executeRegisterResource(Object target, Set methods, Class targetServiceClass) throws NoSuchMethodException { for (Method m : methods) { - TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class); - if (twoPhaseBusinessAction != null) { - TCCResource tccResource = new TCCResource(); - if (StringUtils.isBlank(twoPhaseBusinessAction.name())) { - throw new FrameworkException("TCC bean name cannot be null or empty"); - } - tccResource.setActionName(twoPhaseBusinessAction.name()); - tccResource.setTargetBean(target); - tccResource.setPrepareMethod(m); - tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod()); - tccResource.setCommitMethod(targetServiceClass.getMethod(twoPhaseBusinessAction.commitMethod(), - twoPhaseBusinessAction.commitArgsClasses())); - tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod()); - tccResource.setRollbackMethod(targetServiceClass.getMethod(twoPhaseBusinessAction.rollbackMethod(), - twoPhaseBusinessAction.rollbackArgsClasses())); - // set argsClasses - tccResource.setCommitArgsClasses(twoPhaseBusinessAction.commitArgsClasses()); - tccResource.setRollbackArgsClasses(twoPhaseBusinessAction.rollbackArgsClasses()); - // set phase two method's keys - tccResource.setPhaseTwoCommitKeys(this.getTwoPhaseArgs(tccResource.getCommitMethod(), - twoPhaseBusinessAction.commitArgsClasses())); - tccResource.setPhaseTwoRollbackKeys(this.getTwoPhaseArgs(tccResource.getRollbackMethod(), - twoPhaseBusinessAction.rollbackArgsClasses())); - //registry tcc resource - DefaultResourceManager.get().registerResource(tccResource); + Annotation annotation = m.getAnnotation(getAnnotationClass()); + if (annotation != null) { + Resource resource = createResource(target, targetServiceClass, m, annotation); + //registry resource + DefaultResourceManager.get().registerResource(resource); } } } @@ -107,4 +88,35 @@ protected String[] getTwoPhaseArgs(Method method, Class[] argsClasses) { return keys; } + + protected Resource createResource(Object target, Class targetServiceClass, Method method, Annotation annotation) throws NoSuchMethodException { + TwoPhaseBusinessAction twoPhaseBusinessAction = (TwoPhaseBusinessAction) annotation; + if (StringUtils.isBlank(twoPhaseBusinessAction.name())) { + throw new FrameworkException("TCC bean name cannot be null or empty"); + } + TCCResource tccResource = new TCCResource(); + tccResource.setActionName(twoPhaseBusinessAction.name()); + tccResource.setTargetBean(target); + tccResource.setPrepareMethod(method); + tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod()); + tccResource.setCommitMethod(targetServiceClass.getMethod(twoPhaseBusinessAction.commitMethod(), + twoPhaseBusinessAction.commitArgsClasses())); + tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod()); + tccResource.setRollbackMethod(targetServiceClass.getMethod(twoPhaseBusinessAction.rollbackMethod(), + twoPhaseBusinessAction.rollbackArgsClasses())); + // set argsClasses + tccResource.setCommitArgsClasses(twoPhaseBusinessAction.commitArgsClasses()); + tccResource.setRollbackArgsClasses(twoPhaseBusinessAction.rollbackArgsClasses()); + // set phase two method's keys + tccResource.setPhaseTwoCommitKeys(this.getTwoPhaseArgs(tccResource.getCommitMethod(), + twoPhaseBusinessAction.commitArgsClasses())); + tccResource.setPhaseTwoRollbackKeys(this.getTwoPhaseArgs(tccResource.getRollbackMethod(), + twoPhaseBusinessAction.rollbackArgsClasses())); + return tccResource; + } + + protected Class getAnnotationClass() { + return TwoPhaseBusinessAction.class; + } + }