diff --git a/pom.xml b/pom.xml index b93c6d794..840e7f088 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.codingapi tx-lcn - 4.1.0 + 4.2.0-SNAPSHOT pom tx-lcn @@ -32,7 +32,7 @@ 1.7 3.6.0 - 4.1.0 + 4.2.0-SNAPSHOT diff --git a/transaction-dubbo/pom.xml b/transaction-dubbo/pom.xml index 29e9a600d..064d77314 100644 --- a/transaction-dubbo/pom.xml +++ b/transaction-dubbo/pom.xml @@ -8,7 +8,7 @@ com.codingapi tx-lcn - 4.1.0 + 4.2.0-SNAPSHOT com.codingapi diff --git a/transaction-dubbo/src/main/java/com/codingapi/tx/dubbo/filter/TransactionFilter.java b/transaction-dubbo/src/main/java/com/codingapi/tx/dubbo/filter/TransactionFilter.java index 2a6ad0a26..362f9facf 100644 --- a/transaction-dubbo/src/main/java/com/codingapi/tx/dubbo/filter/TransactionFilter.java +++ b/transaction-dubbo/src/main/java/com/codingapi/tx/dubbo/filter/TransactionFilter.java @@ -1,5 +1,7 @@ package com.codingapi.tx.dubbo.filter; +import com.alibaba.dubbo.common.Constants; +import com.alibaba.dubbo.common.extension.Activate; import com.alibaba.dubbo.rpc.*; import com.codingapi.tx.aop.bean.TxTransactionLocal; import org.slf4j.Logger; @@ -8,6 +10,7 @@ /** * Created by lorne on 2017/6/30. */ +@Activate(group = Constants.CONSUMER, order = -10001) public class TransactionFilter implements Filter { @@ -23,6 +26,7 @@ public Result invoke(Invoker invoker, Invocation invocation) throws RpcExcept if(txTransactionLocal!=null){ RpcContext.getContext().setAttachment("tx-group",groupId); + RpcContext.getContext().setAttachment("tx-mode",txTransactionLocal.getMode().name()); } return invoker.invoke(invocation); diff --git a/transaction-dubbo/src/main/java/com/codingapi/tx/dubbo/interceptor/TransactionAspect.java b/transaction-dubbo/src/main/java/com/codingapi/tx/dubbo/interceptor/TransactionAspect.java index da7a70141..963c5bcb3 100644 --- a/transaction-dubbo/src/main/java/com/codingapi/tx/dubbo/interceptor/TransactionAspect.java +++ b/transaction-dubbo/src/main/java/com/codingapi/tx/dubbo/interceptor/TransactionAspect.java @@ -1,5 +1,6 @@ package com.codingapi.tx.dubbo.interceptor; +import com.codingapi.tx.Constants; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; @@ -45,7 +46,7 @@ public Object around(ProceedingJoinPoint point)throws Throwable{ @Override public int getOrder() { - return HIGHEST_PRECEDENCE; + return Constants.ASPECT_ORDER; } diff --git a/transaction-dubbo/src/main/java/com/codingapi/tx/dubbo/interceptor/TxManagerInterceptor.java b/transaction-dubbo/src/main/java/com/codingapi/tx/dubbo/interceptor/TxManagerInterceptor.java index 2e684e77b..bc1fe84a8 100644 --- a/transaction-dubbo/src/main/java/com/codingapi/tx/dubbo/interceptor/TxManagerInterceptor.java +++ b/transaction-dubbo/src/main/java/com/codingapi/tx/dubbo/interceptor/TxManagerInterceptor.java @@ -22,10 +22,12 @@ public class TxManagerInterceptor { public Object around(ProceedingJoinPoint point) throws Throwable { String groupId = null; + String mode = null; try { groupId = RpcContext.getContext().getAttachment("tx-group"); + mode = RpcContext.getContext().getAttachment("tx-mode"); }catch (Exception e){} - return aspectBeforeService.around(groupId,point); + return aspectBeforeService.around(groupId,point,mode); } } diff --git a/transaction-dubbo/src/main/java/com/codingapi/tx/dubbo/listener/TransactionSocketListener.java b/transaction-dubbo/src/main/java/com/codingapi/tx/dubbo/listener/TransactionSocketListener.java index 817edf9b0..6dbdd7078 100644 --- a/transaction-dubbo/src/main/java/com/codingapi/tx/dubbo/listener/TransactionSocketListener.java +++ b/transaction-dubbo/src/main/java/com/codingapi/tx/dubbo/listener/TransactionSocketListener.java @@ -20,7 +20,15 @@ public class TransactionSocketListener implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext event) throws BeansException { - initService.start(); + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + // 若连接不上txmanager start()方法将阻塞 + initService.start(); + } + }); + thread.setName("TxInit-thread"); + thread.start(); } } diff --git a/transaction-motan/pom.xml b/transaction-motan/pom.xml index 9bdcbfa7b..a922488a5 100644 --- a/transaction-motan/pom.xml +++ b/transaction-motan/pom.xml @@ -8,7 +8,7 @@ com.codingapi tx-lcn - 4.1.0 + 4.2.0-SNAPSHOT com.codingapi diff --git a/transaction-motan/src/main/java/com/codingapi/tx/motan/interceptor/TransactionAspect.java b/transaction-motan/src/main/java/com/codingapi/tx/motan/interceptor/TransactionAspect.java index 74574604d..0d91a2e9f 100644 --- a/transaction-motan/src/main/java/com/codingapi/tx/motan/interceptor/TransactionAspect.java +++ b/transaction-motan/src/main/java/com/codingapi/tx/motan/interceptor/TransactionAspect.java @@ -1,5 +1,6 @@ package com.codingapi.tx.motan.interceptor; +import com.codingapi.tx.Constants; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; @@ -43,7 +44,7 @@ public Object around(ProceedingJoinPoint point)throws Throwable{ @Override public int getOrder() { - return HIGHEST_PRECEDENCE; + return Constants.ASPECT_ORDER; } diff --git a/transaction-motan/src/main/java/com/codingapi/tx/motan/interceptor/TxManagerInterceptor.java b/transaction-motan/src/main/java/com/codingapi/tx/motan/interceptor/TxManagerInterceptor.java index 3e4b1cbd2..20f71e3df 100644 --- a/transaction-motan/src/main/java/com/codingapi/tx/motan/interceptor/TxManagerInterceptor.java +++ b/transaction-motan/src/main/java/com/codingapi/tx/motan/interceptor/TxManagerInterceptor.java @@ -23,10 +23,12 @@ public class TxManagerInterceptor { public Object around(ProceedingJoinPoint point) throws Throwable { String groupId = null; + String mode = null; try { groupId = (String) RpcContext.getContext().getAttribute("tx-group"); + mode = (String) RpcContext.getContext().getAttribute("tx-mode"); } catch (Exception e) { } - return aspectBeforeService.around(groupId, point); + return aspectBeforeService.around(groupId, point, mode); } } diff --git a/transaction-motan/src/main/java/com/codingapi/tx/motan/listener/TransactionSocketListener.java b/transaction-motan/src/main/java/com/codingapi/tx/motan/listener/TransactionSocketListener.java index b0ae2bff1..4a3f9fc5f 100644 --- a/transaction-motan/src/main/java/com/codingapi/tx/motan/listener/TransactionSocketListener.java +++ b/transaction-motan/src/main/java/com/codingapi/tx/motan/listener/TransactionSocketListener.java @@ -20,7 +20,15 @@ public class TransactionSocketListener implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext event) throws BeansException { - initService.start(); + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + // 若连接不上txmanager start()方法将阻塞 + initService.start(); + } + }); + thread.setName("TxInit-thread"); + thread.start(); } } diff --git a/transaction-springcloud/pom.xml b/transaction-springcloud/pom.xml index 5342e94ae..d5f04dcfe 100644 --- a/transaction-springcloud/pom.xml +++ b/transaction-springcloud/pom.xml @@ -6,7 +6,7 @@ com.codingapi tx-lcn - 4.1.0 + 4.2.0-SNAPSHOT com.codingapi diff --git a/transaction-springcloud/src/main/java/com/codingapi/tx/springcloud/interceptor/TransactionAspect.java b/transaction-springcloud/src/main/java/com/codingapi/tx/springcloud/interceptor/TransactionAspect.java index db649e382..1b33e9c6b 100644 --- a/transaction-springcloud/src/main/java/com/codingapi/tx/springcloud/interceptor/TransactionAspect.java +++ b/transaction-springcloud/src/main/java/com/codingapi/tx/springcloud/interceptor/TransactionAspect.java @@ -1,5 +1,6 @@ package com.codingapi.tx.springcloud.interceptor; +import com.codingapi.tx.Constants; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; @@ -43,7 +44,7 @@ public Object around(ProceedingJoinPoint point)throws Throwable{ @Override public int getOrder() { - return HIGHEST_PRECEDENCE; + return Constants.ASPECT_ORDER; } diff --git a/transaction-springcloud/src/main/java/com/codingapi/tx/springcloud/interceptor/TxManagerInterceptor.java b/transaction-springcloud/src/main/java/com/codingapi/tx/springcloud/interceptor/TxManagerInterceptor.java index 497175330..1898d7498 100644 --- a/transaction-springcloud/src/main/java/com/codingapi/tx/springcloud/interceptor/TxManagerInterceptor.java +++ b/transaction-springcloud/src/main/java/com/codingapi/tx/springcloud/interceptor/TxManagerInterceptor.java @@ -22,11 +22,13 @@ public class TxManagerInterceptor { public Object around(ProceedingJoinPoint point) throws Throwable { String groupId = null; + String mode = null; try { RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes(); HttpServletRequest request = requestAttributes == null ? null : ((ServletRequestAttributes) requestAttributes).getRequest(); groupId = request == null ? null : request.getHeader("tx-group"); + mode = request == null ? null : request.getHeader("tx-mode"); }catch (Exception e){} - return aspectBeforeService.around(groupId, point); + return aspectBeforeService.around(groupId, point, mode); } } diff --git a/transaction-springcloud/src/main/java/com/codingapi/tx/springcloud/listener/ServerListener.java b/transaction-springcloud/src/main/java/com/codingapi/tx/springcloud/listener/ServerListener.java index a8a77fb9a..c6dab85f1 100644 --- a/transaction-springcloud/src/main/java/com/codingapi/tx/springcloud/listener/ServerListener.java +++ b/transaction-springcloud/src/main/java/com/codingapi/tx/springcloud/listener/ServerListener.java @@ -24,7 +24,15 @@ public void onApplicationEvent(EmbeddedServletContainerInitializedEvent event) { logger.info("onApplicationEvent -> onApplicationEvent. "+event.getEmbeddedServletContainer()); this.serverPort = event.getEmbeddedServletContainer().getPort(); - initService.start(); + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + // 若连接不上txmanager start()方法将阻塞 + initService.start(); + } + }); + thread.setName("TxInit-thread"); + thread.start(); } public int getPort() { diff --git a/tx-client/pom.xml b/tx-client/pom.xml index 08574c890..7f206d268 100644 --- a/tx-client/pom.xml +++ b/tx-client/pom.xml @@ -6,7 +6,7 @@ com.codingapi tx-lcn - 4.1.0 + 4.2.0-SNAPSHOT com.codingapi diff --git a/tx-client/src/main/java/com/codingapi/tx/Constants.java b/tx-client/src/main/java/com/codingapi/tx/Constants.java index c7c8fb771..b4650b9b0 100644 --- a/tx-client/src/main/java/com/codingapi/tx/Constants.java +++ b/tx-client/src/main/java/com/codingapi/tx/Constants.java @@ -15,4 +15,12 @@ public class Constants { */ public static TxServer txServer; + /** + * 主切面的 order值 + * 主切面一定要在 @Transaction 切面的外层(ASPECT_ORDER 小于 标签中的order ) + * 主切面需要能接受到异常。接收到异常才会触发回滚 + * 这意味着自定义的切面若catch了异常且不向外传递,那么这个切面需要在主切面的外层(自定义切面order 小于 ASPECT_ORDER) + */ + public static final int ASPECT_ORDER = 1000; + } diff --git a/tx-client/src/main/java/com/codingapi/tx/annotation/TxTransaction.java b/tx-client/src/main/java/com/codingapi/tx/annotation/TxTransaction.java index 6540ddc45..4584b13ad 100644 --- a/tx-client/src/main/java/com/codingapi/tx/annotation/TxTransaction.java +++ b/tx-client/src/main/java/com/codingapi/tx/annotation/TxTransaction.java @@ -32,4 +32,17 @@ */ Class[] noRollbackFor() default {}; + /** + * 事务模式 仅在事务发起方配置有效 + * @return + */ + TxTransactionMode mode() default TxTransactionMode.TX_MODE_LCN; + + /** + * 标示本服务是否是只读 + * 若为true : 不会加入事务组; Connection 不会被 Wrap; 事务信息能正常传递 + * 在本服务无DB操作或仅有查询时请配置 true 将提高性能 + * 若应用都没有DB配置,此配置无意义不用设值 + */ + boolean readOnly() default false; } diff --git a/tx-client/src/main/java/com/codingapi/tx/annotation/TxTransactionMode.java b/tx-client/src/main/java/com/codingapi/tx/annotation/TxTransactionMode.java new file mode 100644 index 000000000..1a549edea --- /dev/null +++ b/tx-client/src/main/java/com/codingapi/tx/annotation/TxTransactionMode.java @@ -0,0 +1,22 @@ +package com.codingapi.tx.annotation; + +/** + * + * + * @author caican + * 2018/7/24 + */ +public enum TxTransactionMode { + /** LCN 模式 */ + TX_MODE_LCN("LCN 模式,2阶段提交 读提交"), + + /** TXC 模式 */ + TX_MODE_TXC("TXC 模式,未提交读(READ UNCOMMITTED)"); + + + private String description; + + TxTransactionMode(String description) { + this.description = description; + } +} diff --git a/tx-client/src/main/java/com/codingapi/tx/aop/bean/TxTransactionInfo.java b/tx-client/src/main/java/com/codingapi/tx/aop/bean/TxTransactionInfo.java index 7c4c87c41..6d229b286 100644 --- a/tx-client/src/main/java/com/codingapi/tx/aop/bean/TxTransactionInfo.java +++ b/tx-client/src/main/java/com/codingapi/tx/aop/bean/TxTransactionInfo.java @@ -1,6 +1,7 @@ package com.codingapi.tx.aop.bean; import com.codingapi.tx.annotation.TxTransaction; +import com.codingapi.tx.annotation.TxTransactionMode; import com.codingapi.tx.model.TransactionInvocation; @@ -24,6 +25,7 @@ public class TxTransactionInfo { private TransactionInvocation invocation; + private TxTransactionMode mode; public TxTransactionInfo(TxTransaction transaction, TxTransactionLocal txTransactionLocal, TransactionInvocation invocation, String txGroupId) { this.transaction = transaction; @@ -32,6 +34,13 @@ public TxTransactionInfo(TxTransaction transaction, TxTransactionLocal txTransac this.invocation = invocation; } + public TxTransactionMode getMode() { + return mode; + } + + public void setMode(TxTransactionMode mode) { + this.mode = mode; + } public TxTransaction getTransaction() { return transaction; diff --git a/tx-client/src/main/java/com/codingapi/tx/aop/bean/TxTransactionLocal.java b/tx-client/src/main/java/com/codingapi/tx/aop/bean/TxTransactionLocal.java index d5998d716..e134b5ea8 100644 --- a/tx-client/src/main/java/com/codingapi/tx/aop/bean/TxTransactionLocal.java +++ b/tx-client/src/main/java/com/codingapi/tx/aop/bean/TxTransactionLocal.java @@ -1,6 +1,7 @@ package com.codingapi.tx.aop.bean; import com.alibaba.fastjson.JSONObject; +import com.codingapi.tx.annotation.TxTransactionMode; import com.codingapi.tx.framework.utils.SocketManager; import com.codingapi.tx.model.Request; import org.apache.commons.lang.StringUtils; @@ -48,6 +49,8 @@ public class TxTransactionLocal { private boolean readOnly = false; + private TxTransactionMode mode; + public boolean isHasIsGroup() { return hasIsGroup; } @@ -106,6 +109,13 @@ public static void setCurrent(TxTransactionLocal current) { currentLocal.set(current); } + public TxTransactionMode getMode() { + return mode; + } + + public void setMode(TxTransactionMode mode) { + this.mode = mode; + } public void putLoadBalance(String key, String data){ cacheModelInfo.put(key,data); @@ -157,4 +167,14 @@ public void setReadOnly(boolean readOnly) { this.readOnly = readOnly; } + public static boolean isInTxcTransaction() { + TxTransactionLocal local = current(); + if (local != null + && local.mode != null + && local.mode == TxTransactionMode.TX_MODE_TXC + && !local.isReadOnly()) { + return true; + } + return false; + } } diff --git a/tx-client/src/main/java/com/codingapi/tx/aop/service/AspectBeforeService.java b/tx-client/src/main/java/com/codingapi/tx/aop/service/AspectBeforeService.java index 419fbba93..f8dc69d7d 100644 --- a/tx-client/src/main/java/com/codingapi/tx/aop/service/AspectBeforeService.java +++ b/tx-client/src/main/java/com/codingapi/tx/aop/service/AspectBeforeService.java @@ -7,5 +7,5 @@ */ public interface AspectBeforeService { - Object around(String groupId, ProceedingJoinPoint point) throws Throwable; + Object around(String groupId, ProceedingJoinPoint point, String mode) throws Throwable; } diff --git a/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/AspectBeforeServiceImpl.java b/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/AspectBeforeServiceImpl.java index 098e911d5..7bc860b2f 100644 --- a/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/AspectBeforeServiceImpl.java +++ b/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/AspectBeforeServiceImpl.java @@ -1,6 +1,7 @@ package com.codingapi.tx.aop.service.impl; import com.codingapi.tx.annotation.TxTransaction; +import com.codingapi.tx.annotation.TxTransactionMode; import com.codingapi.tx.aop.bean.TxTransactionInfo; import com.codingapi.tx.aop.bean.TxTransactionLocal; import com.codingapi.tx.aop.service.AspectBeforeService; @@ -28,8 +29,8 @@ public class AspectBeforeServiceImpl implements AspectBeforeService { private Logger logger = LoggerFactory.getLogger(AspectBeforeServiceImpl.class); - - public Object around(String groupId, ProceedingJoinPoint point) throws Throwable { + @Override + public Object around(String groupId, ProceedingJoinPoint point, String mode) throws Throwable { MethodSignature signature = (MethodSignature) point.getSignature(); Method method = signature.getMethod(); @@ -46,6 +47,11 @@ public Object around(String groupId, ProceedingJoinPoint point) throws Throwable TransactionInvocation invocation = new TransactionInvocation(clazz, thisMethod.getName(), thisMethod.toString(), args, method.getParameterTypes()); TxTransactionInfo info = new TxTransactionInfo(transaction,txTransactionLocal,invocation,groupId); + try { + info.setMode(TxTransactionMode.valueOf(mode)); + } catch (Exception e) { + info.setMode(TxTransactionMode.TX_MODE_LCN); + } TransactionServer server = transactionServerFactoryService.createTransactionServer(info); diff --git a/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/TransactionServerFactoryServiceImpl.java b/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/TransactionServerFactoryServiceImpl.java index 229c89e92..8d1d06c98 100644 --- a/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/TransactionServerFactoryServiceImpl.java +++ b/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/TransactionServerFactoryServiceImpl.java @@ -5,8 +5,11 @@ import com.codingapi.tx.aop.service.TransactionServer; import com.codingapi.tx.aop.service.TransactionServerFactoryService; import com.codingapi.tx.datasource.ILCNTransactionControl; +import com.codingapi.tx.framework.utils.SocketManager; import com.codingapi.tx.netty.service.NettyService; import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -18,6 +21,7 @@ @Service public class TransactionServerFactoryServiceImpl implements TransactionServerFactoryService { + private Logger logger = LoggerFactory.getLogger(TransactionServerFactoryServiceImpl.class); @Autowired private TransactionServer txStartTransactionServer; @@ -40,16 +44,21 @@ public class TransactionServerFactoryServiceImpl implements TransactionServerFac public TransactionServer createTransactionServer(TxTransactionInfo info) throws Throwable { + if (!SocketManager.getInstance().isNetState()) { + logger.warn("tx-manager not connected."); + return txDefaultTransactionServer; + } /*********分布式事务处理逻辑*开始***********/ /** 尽当Transaction注解不为空,其他都为空时。表示分布式事务开始启动 **/ if (info.getTransaction() != null && info.getTransaction().isStart() && info.getTxTransactionLocal() == null && StringUtils.isEmpty(info.getTxGroupId())) { //检查socket通讯是否正常 (当启动事务的主业务方法执行完以后,再执行其他业务方法时将进入txInServiceTransactionServer业务处理) - if (nettyService.checkState()) { + if (SocketManager.getInstance().isNetState()) { return txStartTransactionServer; } else { - throw new Exception("tx-manager not connected ,please check tx-manager server "); + logger.warn("tx-manager not connected."); + return txDefaultTransactionServer; } } @@ -57,18 +66,20 @@ public TransactionServer createTransactionServer(TxTransactionInfo info) throws /** 分布式事务已经开启,业务进行中 **/ if (info.getTxTransactionLocal() != null || StringUtils.isNotEmpty(info.getTxGroupId())) { //检查socket通讯是否正常 (第一次执行时启动txRunningTransactionServer的业务处理控制,然后嵌套调用其他事务的业务方法时都并到txInServiceTransactionServer业务处理下) - if (nettyService.checkState()) { + if (SocketManager.getInstance().isNetState()) { if (info.getTxTransactionLocal() != null) { return txDefaultTransactionServer; } else { - if(!transactionControl.isNoTransactionOperation()) { //有事务业务的操作 - return txRunningTransactionServer; - }else { + if(transactionControl.isNoTransactionOperation() // 表示整个应用没有获取过DB连接 + || info.getTransaction().readOnly()) { //无事务业务的操作 return txRunningNoTransactionServer; + }else { + return txRunningTransactionServer; } } } else { - throw new Exception("tx-manager not connected ,please check tx-manager server "); + logger.warn("tx-manager not connected."); + return txDefaultTransactionServer; } } /*********分布式事务处理逻辑*结束***********/ diff --git a/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/TxRunningNoTransactionServerImpl.java b/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/TxRunningNoTransactionServerImpl.java index a26372a49..b38579a0e 100644 --- a/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/TxRunningNoTransactionServerImpl.java +++ b/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/TxRunningNoTransactionServerImpl.java @@ -25,7 +25,7 @@ public Object execute(final ProceedingJoinPoint point, final TxTransactionInfo i String kid = KidUtils.generateShortUuid(); String txGroupId = info.getTxGroupId(); - logger.debug("--->begin no db transaction, groupId: " + txGroupId); + logger.debug("--->begin readonly transaction, groupId: " + txGroupId); long t1 = System.currentTimeMillis(); @@ -34,6 +34,8 @@ public Object execute(final ProceedingJoinPoint point, final TxTransactionInfo i txTransactionLocal.setHasStart(false); txTransactionLocal.setKid(kid); txTransactionLocal.setMaxTimeOut(Constants.txServer.getCompensateMaxWaitTime()); + txTransactionLocal.setMode(info.getMode()); + txTransactionLocal.setReadOnly(true); TxTransactionLocal.setCurrent(txTransactionLocal); try { @@ -43,7 +45,7 @@ public Object execute(final ProceedingJoinPoint point, final TxTransactionInfo i } finally { TxTransactionLocal.setCurrent(null); long t2 = System.currentTimeMillis(); - logger.debug("<---end no db transaction,groupId:" + txGroupId+",execute time:"+(t2-t1)); + logger.debug("<---end readonly transaction,groupId:" + txGroupId+",execute time:"+(t2-t1)); } } diff --git a/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/TxRunningTransactionServerImpl.java b/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/TxRunningTransactionServerImpl.java index b2a606bf7..24e81e454 100644 --- a/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/TxRunningTransactionServerImpl.java +++ b/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/TxRunningTransactionServerImpl.java @@ -54,6 +54,7 @@ public Object execute(final ProceedingJoinPoint point, final TxTransactionInfo i txTransactionLocal.setKid(kid); txTransactionLocal.setHasIsGroup(isHasIsGroup); txTransactionLocal.setMaxTimeOut(Constants.txServer.getCompensateMaxWaitTime()); + txTransactionLocal.setMode(info.getMode()); TxTransactionLocal.setCurrent(txTransactionLocal); @@ -94,6 +95,19 @@ public Object execute(final ProceedingJoinPoint point, final TxTransactionInfo i return res; } catch (Throwable e) { + // 这里处理以下情况:当 point.proceed() 业务代码中 db事务正常提交,开始等待,后续处理发生异常。 + // 由于没有加入事务组,不会收到通知。这里唤醒并回滚 + if(!isHasIsGroup) { + String type = txTransactionLocal.getType(); + TxTask waitTask = TaskGroupManager.getInstance().getTask(kid, type); + // 有一定几率不能唤醒: wait的代码是在另一个线程,有可能线程还没执行到wait,先执行到了这里 + // TODO 要不要 sleep 1毫秒 + logger.warn("wake the waitTask: {}", (waitTask != null && waitTask.isAwait())); + if (waitTask != null && waitTask.isAwait()) { + waitTask.setState(-1); + waitTask.signalTask(); + } + } throw e; } finally { TxTransactionLocal.setCurrent(null); diff --git a/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/TxStartTransactionServerImpl.java b/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/TxStartTransactionServerImpl.java index ba79c537b..69d90bf07 100644 --- a/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/TxStartTransactionServerImpl.java +++ b/tx-client/src/main/java/com/codingapi/tx/aop/service/impl/TxStartTransactionServerImpl.java @@ -51,6 +51,8 @@ public Object execute(ProceedingJoinPoint point,final TxTransactionInfo info) th txTransactionLocal.setGroupId(groupId); txTransactionLocal.setHasStart(true); txTransactionLocal.setMaxTimeOut(Constants.txServer.getCompensateMaxWaitTime()); + txTransactionLocal.setMode(info.getTransaction().mode()); + txTransactionLocal.setReadOnly(info.getTransaction().readOnly()); TxTransactionLocal.setCurrent(txTransactionLocal); diff --git a/tx-client/src/main/java/com/codingapi/tx/control/service/impl/TransactionControlServiceImpl.java b/tx-client/src/main/java/com/codingapi/tx/control/service/impl/TransactionControlServiceImpl.java index 85fd9fc4d..47021636e 100644 --- a/tx-client/src/main/java/com/codingapi/tx/control/service/impl/TransactionControlServiceImpl.java +++ b/tx-client/src/main/java/com/codingapi/tx/control/service/impl/TransactionControlServiceImpl.java @@ -47,6 +47,6 @@ public void notifyTransactionMsg(ChannelHandlerContext ctx,JSONObject resObj, St SocketUtils.sendMsg(ctx, data.toString()); - logger.debug("send notify data ->" + data.toString()); + logger.info("send notify data ->" + data.toString()); } } diff --git a/tx-client/src/main/java/com/codingapi/tx/datasource/AbstractResourceProxy.java b/tx-client/src/main/java/com/codingapi/tx/datasource/AbstractResourceProxy.java index bcfe99551..69d21f1b4 100644 --- a/tx-client/src/main/java/com/codingapi/tx/datasource/AbstractResourceProxy.java +++ b/tx-client/src/main/java/com/codingapi/tx/datasource/AbstractResourceProxy.java @@ -1,6 +1,7 @@ package com.codingapi.tx.datasource; +import com.codingapi.tx.annotation.TxTransactionMode; import com.codingapi.tx.aop.bean.TxTransactionLocal; import com.codingapi.tx.datasource.service.DataSourceService; import com.lorne.core.framework.utils.task.Task; @@ -64,6 +65,8 @@ public void close(ILCNResource connection) { protected abstract C createLcnConnection(C connection, TxTransactionLocal txTransactionLocal); + protected abstract C createTxcConnection(C connection, TxTransactionLocal txTransactionLocal); + protected abstract void initDbType(); @@ -76,6 +79,10 @@ protected ILCNResource loadConnection(){ logger.debug("loadConnection -> null !"); return null; } + if (txTransactionLocal.isReadOnly()) { + logger.debug("readonly tx don't reuse connection."); + return null; + } //是否获取旧连接的条件:同一个模块下被多次调用时第一次的事务操作 ILCNResource old = pools.get(txTransactionLocal.getGroupId()); @@ -95,6 +102,11 @@ protected ILCNResource loadConnection(){ private C createConnection(TxTransactionLocal txTransactionLocal, C connection){ + if (txTransactionLocal.getMode() != null + && txTransactionLocal.getMode() == TxTransactionMode.TX_MODE_TXC) { + // txc 模式下没有maxCount的限制 直接创建 + return createTxcConnection(connection, txTransactionLocal); + } if (nowCount == maxCount) { for (int i = 0; i < maxWaitTime; i++) { for(int j=0;j<100;j++){ @@ -124,7 +136,8 @@ protected C initLCNConnection(C connection) { C lcnConnection = connection; TxTransactionLocal txTransactionLocal = TxTransactionLocal.current(); - if (txTransactionLocal != null&&!txTransactionLocal.isHasConnection()) { + if (txTransactionLocal != null&&!txTransactionLocal.isHasConnection() + && !txTransactionLocal.isReadOnly()) { logger.debug("lcn datasource transaction control "); diff --git a/tx-client/src/main/java/com/codingapi/tx/framework/thread/NamedThreadFactory.java b/tx-client/src/main/java/com/codingapi/tx/framework/thread/NamedThreadFactory.java new file mode 100644 index 000000000..13bc4abbc --- /dev/null +++ b/tx-client/src/main/java/com/codingapi/tx/framework/thread/NamedThreadFactory.java @@ -0,0 +1,55 @@ +package com.codingapi.tx.framework.thread; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * [类描述] + * + * @author caican + * 17/9/12 + */ +public class NamedThreadFactory implements ThreadFactory +{ + private static final AtomicInteger POOL_SEQ = new AtomicInteger(1); + + private final AtomicInteger mThreadNum = new AtomicInteger(1); + + private final String mPrefix; + + private final boolean mDaemo; + + private final ThreadGroup mGroup; + + public NamedThreadFactory() + { + this("pool-" + POOL_SEQ.getAndIncrement(),false); + } + + public NamedThreadFactory(String prefix) + { + this(prefix,false); + } + + public NamedThreadFactory(String prefix, boolean daemo) + { + mPrefix = prefix + "-thread-"; + mDaemo = daemo; + SecurityManager s = System.getSecurityManager(); + mGroup = ( s == null ) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup(); + } + + @Override + public Thread newThread(Runnable runnable) + { + String name = mPrefix + mThreadNum.getAndIncrement(); + Thread ret = new Thread(mGroup,runnable,name,0); + ret.setDaemon(mDaemo); + return ret; + } + + public ThreadGroup getThreadGroup() + { + return mGroup; + } +} diff --git a/tx-client/src/main/java/com/codingapi/tx/framework/utils/SocketManager.java b/tx-client/src/main/java/com/codingapi/tx/framework/utils/SocketManager.java index 836073148..fa870bfff 100644 --- a/tx-client/src/main/java/com/codingapi/tx/framework/utils/SocketManager.java +++ b/tx-client/src/main/java/com/codingapi/tx/framework/utils/SocketManager.java @@ -1,5 +1,6 @@ package com.codingapi.tx.framework.utils; +import com.codingapi.tx.framework.thread.NamedThreadFactory; import com.codingapi.tx.model.Request; import com.lorne.core.framework.utils.task.ConditionUtils; import com.lorne.core.framework.utils.task.IBack; @@ -34,7 +35,7 @@ public class SocketManager { private static SocketManager manager = null; - private ExecutorService threadPool = Executors.newFixedThreadPool(max_size); + private ExecutorService threadPool = Executors.newFixedThreadPool(max_size, new NamedThreadFactory("sender")); private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(max_size); diff --git a/tx-client/src/main/java/com/codingapi/tx/netty/service/impl/NettyControlServiceImpl.java b/tx-client/src/main/java/com/codingapi/tx/netty/service/impl/NettyControlServiceImpl.java index 052a647b8..77b963f9e 100644 --- a/tx-client/src/main/java/com/codingapi/tx/netty/service/impl/NettyControlServiceImpl.java +++ b/tx-client/src/main/java/com/codingapi/tx/netty/service/impl/NettyControlServiceImpl.java @@ -13,6 +13,8 @@ import com.lorne.core.framework.utils.task.Task; import io.netty.channel.ChannelHandlerContext; import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -22,6 +24,7 @@ @Service public class NettyControlServiceImpl implements NettyControlService { + private Logger logger = LoggerFactory.getLogger(NettyControlServiceImpl.class); @Autowired private NettyService nettyService; @@ -71,12 +74,15 @@ public void executeService(final ChannelHandlerContext ctx,final String json) { JSONObject resObj = JSONObject.parseObject(json); if (resObj.containsKey("a")) { // tm发送数据给tx模块的处理指令 - + logger.info("receive cmd -> {}", json); transactionControlService.notifyTransactionMsg(ctx,resObj,json); }else{ //tx发送数据给tm的响应返回数据 String key = resObj.getString("k"); + if (!"h".equals(key)) { + logger.info("receive response -> {}", json); + } responseMsg(key,resObj); } } diff --git a/tx-client/src/main/java/com/codingapi/tx/netty/service/impl/NettyServiceImpl.java b/tx-client/src/main/java/com/codingapi/tx/netty/service/impl/NettyServiceImpl.java index 599e38878..d633fed26 100644 --- a/tx-client/src/main/java/com/codingapi/tx/netty/service/impl/NettyServiceImpl.java +++ b/tx-client/src/main/java/com/codingapi/tx/netty/service/impl/NettyServiceImpl.java @@ -1,6 +1,7 @@ package com.codingapi.tx.netty.service.impl; import com.codingapi.tx.Constants; +import com.codingapi.tx.framework.thread.NamedThreadFactory; import com.codingapi.tx.framework.utils.SocketManager; import com.codingapi.tx.netty.handler.TransactionHandler; import com.codingapi.tx.netty.service.NettyControlService; @@ -47,7 +48,7 @@ public class NettyServiceImpl implements NettyService ,DisposableBean { private Logger logger = LoggerFactory.getLogger(NettyServiceImpl.class); - private ExecutorService threadPool = Executors.newFixedThreadPool(100); + private ExecutorService threadPool = Executors.newFixedThreadPool(100,new NamedThreadFactory("receiver")); @Override public synchronized void start() { diff --git a/tx-manager/pom.xml b/tx-manager/pom.xml index 4d163f987..a0c5f052a 100644 --- a/tx-manager/pom.xml +++ b/tx-manager/pom.xml @@ -5,7 +5,7 @@ com.codingapi tx-manager - 4.1.0 + 4.2.0-SNAPSHOT jar tx-manager @@ -27,6 +27,9 @@ 19.0 Dalston.SR1 + true + true + diff --git a/tx-manager/src/main/resources/application.properties b/tx-manager/src/main/resources/application.properties index 0fd633269..b12f29015 100644 --- a/tx-manager/src/main/resources/application.properties +++ b/tx-manager/src/main/resources/application.properties @@ -1,7 +1,7 @@ #######################################txmanager-start################################################# #服务端口 -server.port=8899 +server.port=7000 #tx-manager不得修改 spring.application.name=tx-manager @@ -16,7 +16,7 @@ spring.resources.static-locations=classpath:/static/ #spring.cloud.zookeeper.discovery.preferIpAddress = true #eureka 地址 -eureka.client.service-url.defaultZone=http://127.0.0.1:8761/eureka/ +eureka.client.service-url.defaultZone=http://127.0.0.1:7000/eureka/ eureka.instance.prefer-ip-address=true #######################################redis-start################################################# @@ -30,7 +30,7 @@ eureka.instance.prefer-ip-address=true ##redis 单点环境配置 #redis #redis主机地址 -spring.redis.host=127.0.0.1 +spring.redis.host=172.26.8.150 #redis主机端口 spring.redis.port=6379 #redis链接密码 diff --git a/tx-plugins-db/pom.xml b/tx-plugins-db/pom.xml index 747907e61..836925d8e 100644 --- a/tx-plugins-db/pom.xml +++ b/tx-plugins-db/pom.xml @@ -6,7 +6,7 @@ com.codingapi tx-lcn - 4.1.0 + 4.2.0-SNAPSHOT com.codingapi @@ -25,6 +25,16 @@ tx-client ${lcn.last.version} + + com.alibaba + druid + 1.0.19 + + + com.fasterxml.jackson.core + jackson-databind + 2.8.10 + diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/AbstractTransactionThread.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/AbstractTransactionThread.java index 75f10f82a..697f8cf5f 100644 --- a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/AbstractTransactionThread.java +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/AbstractTransactionThread.java @@ -1,5 +1,6 @@ package com.codingapi.tx.datasource.relational; +import com.codingapi.tx.aop.bean.TxTransactionLocal; import com.codingapi.tx.framework.thread.HookRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,6 +25,7 @@ protected void startRunnable(){ Runnable runnable = new HookRunnable() { @Override public void run0() { + TxTransactionLocal.setCurrent(null); try { transaction(); } catch (Exception e) { diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/LCNDBConnection.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/LCNDBConnection.java index 8ded34e57..6a7f231b2 100644 --- a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/LCNDBConnection.java +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/LCNDBConnection.java @@ -157,7 +157,7 @@ public void run() { } }, maxOutTime); - System.out.println("transaction is wait for TxManager notify, groupId : " + getGroupId()); + logger.info("transaction is wait for TxManager notify, groupId {}", getGroupId()); waitTask.awaitTask(); @@ -173,10 +173,10 @@ public void run() { rollbackConnection(); } - System.out.println("lcn transaction over, res -> groupId:"+getGroupId()+" and state is "+(rs==1?"commit":"rollback")); + logger.info("lcn transaction over, res -> groupId:"+getGroupId()+" and state is "+(rs==1?"commit":"rollback")); }catch (SQLException e){ - System.out.println("lcn transaction over,but connection is closed, res -> groupId:"+getGroupId()); + logger.info("lcn transaction over,but connection is closed, res -> groupId:"+getGroupId()); waitTask.setState(TaskState.connectionError.getCode()); } diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/LCNStartConnection.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/LCNStartConnection.java index 861e43d89..536cb5b29 100644 --- a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/LCNStartConnection.java +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/LCNStartConnection.java @@ -147,7 +147,7 @@ public void transaction()throws SQLException{ return; } - System.out.println(" start transaction is wait for TxManager notify, groupId : " + getGroupId()); + logger.info(" start transaction is wait for TxManager notify, groupId : " + getGroupId()); waitTask.awaitTask(); @@ -169,7 +169,7 @@ public void transaction()throws SQLException{ } else { rollbackConnection(); } - System.out.println(" lcn start transaction over, res -> groupId:"+getGroupId()+" and state is "+(rs==1?"commit":"rollback")); + logger.info(" lcn start transaction over, res -> groupId:"+getGroupId()+" and state is "+(rs==1?"commit":"rollback")); }catch (SQLException e){ diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/LCNTransactionDataSource.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/LCNTransactionDataSource.java index 2bbf22bba..270f9fafc 100644 --- a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/LCNTransactionDataSource.java +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/LCNTransactionDataSource.java @@ -4,8 +4,11 @@ import com.codingapi.tx.aop.bean.TxTransactionLocal; import com.codingapi.tx.datasource.AbstractResourceProxy; import com.codingapi.tx.datasource.ILCNConnection; +import com.codingapi.tx.datasource.relational.txc.TxcDBConnection; +import com.codingapi.tx.datasource.relational.txc.rollback.TxcRollbackService; import org.aspectj.lang.ProceedingJoinPoint; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.sql.Connection; @@ -23,7 +26,8 @@ public class LCNTransactionDataSource extends AbstractResourceProxy" + txTransactionLocal.getGroupId()); + return txc; + } + @Override protected void initDbType() { diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/AbstractTxcConnection.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/AbstractTxcConnection.java new file mode 100644 index 000000000..1d5cb4202 --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/AbstractTxcConnection.java @@ -0,0 +1,415 @@ +package com.codingapi.tx.datasource.relational.txc; + +import com.codingapi.tx.aop.bean.TxTransactionLocal; +import com.codingapi.tx.datasource.relational.AbstractTransactionThread; +import com.codingapi.tx.datasource.relational.LCNConnection; +import com.codingapi.tx.datasource.relational.txc.parser.TxcRuntimeContext; +import com.codingapi.tx.datasource.relational.txc.rollback.TxcRollbackService; +import com.codingapi.tx.datasource.service.DataSourceService; +import com.codingapi.tx.framework.task.TaskGroup; +import com.codingapi.tx.framework.task.TaskGroupManager; +import com.codingapi.tx.framework.task.TxTask; +import com.codingapi.tx.framework.thread.HookRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Array; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.NClob; +import java.sql.PreparedStatement; +import java.sql.SQLClientInfoException; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Statement; +import java.sql.Struct; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + + +/** + * created at 20171116 + * @author caisirius + */ +public abstract class AbstractTxcConnection extends AbstractTransactionThread + implements LCNConnection,TxcRuntimeContextService { + + private static final Logger logger = LoggerFactory.getLogger(AbstractTxcConnection.class); + + private boolean readOnly = false; + + volatile int state = 1; + + private Connection connection; + + DataSourceService dataSourceService; + + TxTask waitTask; + + int maxOutTime; + + String groupId; + + TxcRuntimeContext txcRuntimeContext; + + TxcRollbackService txcRollbackService; + + @Override + public TxcRuntimeContext getTxcRuntimeContext() { + if (txcRuntimeContext == null) { + this.txcRuntimeContext = new TxcRuntimeContext(); + txcRuntimeContext.setGroupId(groupId); + txcRuntimeContext.setBranchId(waitTask.getKey()); + + } + return txcRuntimeContext; + } + + @Override + public String getGroupId() { + return groupId; + } + + @Override + public TxTask getWaitTask() { + return waitTask; + } + + + + public AbstractTxcConnection(Connection connection, TxTransactionLocal transactionLocal, + DataSourceService dataSourceService, + TxcRollbackService txcRollbackService) { + readOnly = transactionLocal.isReadOnly(); + this.connection = connection; + this.dataSourceService = dataSourceService; + this.txcRollbackService = txcRollbackService; + + groupId = transactionLocal.getGroupId(); + maxOutTime = transactionLocal.getMaxTimeOut(); + + TaskGroup taskGroup; + if (transactionLocal.getKid() == null) { + logger.info("this is txc start-connection"); + taskGroup = TaskGroupManager.getInstance().createTask(groupId, transactionLocal.getType()); + } else { + taskGroup = TaskGroupManager.getInstance().createTask(transactionLocal.getKid(), transactionLocal.getType()); + } + waitTask = taskGroup.getCurrent(); + } + + // commit() 里要做工作 + @Override + public void commit() throws SQLException { + logger.info("commit"); + + connection.commit(); + + state = 1; + } + + @Override + public void rollback() throws SQLException { + connection.rollback(); + state = 0; + } + + @Override + public void close() throws SQLException { + connection.close(); + // 只有提交才需要 开启线程等待 + if (readOnly || state == 0) { + closeConnection(); + return; + } + startRunnable(); + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException { + connection.setAutoCommit(false); + } + + + // TODO Statement CallableStatement 也应该自定义!! + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + return connection.prepareCall(sql); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return connection.prepareCall(sql, resultSetType, resultSetConcurrency); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return connection.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + /*****default*******/ + @Override + public String nativeSQL(String sql) throws SQLException { + return connection.nativeSQL(sql); + } + + + @Override + public boolean getAutoCommit() throws SQLException { + return connection.getAutoCommit(); + } + + + @Override + public boolean isClosed() throws SQLException { + return connection.isClosed(); + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + return connection.getMetaData(); + } + + @Override + public void setReadOnly(boolean readOnly) throws SQLException { + if(readOnly) { + this.readOnly = readOnly; + logger.debug("setReadOnly - >" + readOnly); + connection.setReadOnly(readOnly); + TxTransactionLocal txTransactionLocal = TxTransactionLocal.current(); + txTransactionLocal.setReadOnly(readOnly); + } + } + + @Override + public boolean isReadOnly() throws SQLException { + return connection.isReadOnly(); + } + + @Override + public void setCatalog(String catalog) throws SQLException { + connection.setCatalog(catalog); + } + + @Override + public String getCatalog() throws SQLException { + return connection.getCatalog(); + } + + @Override + public void setTransactionIsolation(int level) throws SQLException { + connection.setTransactionIsolation(level); + } + + @Override + public int getTransactionIsolation() throws SQLException { + return connection.getTransactionIsolation(); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return connection.getWarnings(); + } + + @Override + public void clearWarnings() throws SQLException { + connection.clearWarnings(); + } + + @Override + public Map> getTypeMap() throws SQLException { + return connection.getTypeMap(); + } + + @Override + public void setTypeMap(Map> map) throws SQLException { + connection.setTypeMap(map); + } + + @Override + public void setHoldability(int holdability) throws SQLException { + connection.setHoldability(holdability); + } + + @Override + public int getHoldability() throws SQLException { + return connection.getHoldability(); + } + + @Override + public Savepoint setSavepoint() throws SQLException { + return connection.setSavepoint(); + } + + @Override + public Savepoint setSavepoint(String name) throws SQLException { + return connection.setSavepoint(name); + } + + @Override + public void rollback(Savepoint savepoint) throws SQLException { + connection.rollback(savepoint); + } + + @Override + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + connection.releaseSavepoint(savepoint); + } + + @Override + public Clob createClob() throws SQLException { + return connection.createClob(); + } + + @Override + public Blob createBlob() throws SQLException { + return connection.createBlob(); + } + + @Override + public NClob createNClob() throws SQLException { + return connection.createNClob(); + } + + @Override + public SQLXML createSQLXML() throws SQLException { + return connection.createSQLXML(); + } + + @Override + public boolean isValid(int timeout) throws SQLException { + return connection.isValid(timeout); + } + + @Override + public void setClientInfo(String name, String value) throws SQLClientInfoException { + connection.setClientInfo(name, value); + } + + @Override + public void setClientInfo(Properties properties) throws SQLClientInfoException { + connection.setClientInfo(properties); + } + + @Override + public String getClientInfo(String name) throws SQLException { + return connection.getClientInfo(name); + } + + @Override + public Properties getClientInfo() throws SQLException { + return connection.getClientInfo(); + } + + @Override + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + return connection.createArrayOf(typeName, elements); + } + + @Override + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + return connection.createStruct(typeName, attributes); + } + + @Override + public void setSchema(String schema) throws SQLException { + connection.setSchema(schema); + } + + @Override + public String getSchema() throws SQLException { + return connection.getSchema(); + } + + @Override + public void abort(Executor executor) throws SQLException { + connection.abort(executor); + } + + @Override + public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + connection.setNetworkTimeout(executor, milliseconds); + } + + @Override + public int getNetworkTimeout() throws SQLException { + return connection.getNetworkTimeout(); + } + + @Override + public T unwrap(Class iface) throws SQLException { + return connection.unwrap(iface); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return connection.isWrapperFor(iface); + } + + /***** wrap *******/ + + @Override + public Statement createStatement() throws SQLException { + Statement statement = connection.createStatement(); + return new TxcStatement(statement , this); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + Statement statement = connection.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); + return new TxcStatement(statement , this); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + Statement statement = connection.createStatement(resultSetType, resultSetConcurrency); + return new TxcStatement(statement , this); + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + // 这里返回自定义的 PreparedStatement + PreparedStatement localPreparedStatement = connection.prepareStatement(sql); + return new TxcPreparedStatement(localPreparedStatement, this, sql); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws + SQLException { + PreparedStatement localPreparedStatement = connection.prepareStatement(sql, resultSetType, resultSetConcurrency); + + return new TxcPreparedStatement(localPreparedStatement, this, sql); + + } + + @Override + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + PreparedStatement localPreparedStatement = connection.prepareStatement(sql, autoGeneratedKeys); + return new TxcPreparedStatement(localPreparedStatement, this, sql); + } + + @Override + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + PreparedStatement localPreparedStatement = connection.prepareStatement(sql, columnIndexes); + return new TxcPreparedStatement(localPreparedStatement, this, sql); + } + + @Override + public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + PreparedStatement localPreparedStatement = connection.prepareStatement(sql, columnNames); + return new TxcPreparedStatement(localPreparedStatement, this, sql); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + PreparedStatement localPreparedStatement = connection.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability); + return new TxcPreparedStatement(localPreparedStatement, this, sql); + } + +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/ColumnInfo.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/ColumnInfo.java new file mode 100644 index 000000000..06f8fd42a --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/ColumnInfo.java @@ -0,0 +1,79 @@ +package com.codingapi.tx.datasource.relational.txc; + +/** + * [类描述] + * + * @author caican + * @date 17/12/23 + */ +public class ColumnInfo { + private String tableName; + private String columnName; + private int type; + /** + * -1 : no key + * 0: PRI 主键索引 + * 1: UNI 唯一索引 + * 2: MUL 普通索引(联合索引) + */ + private int keyType; + private boolean isAllowNull; + private String defaultValue; + private String extra; + + public int getKeyType() { + return keyType; + } + + public void setKeyType(int keyType) { + this.keyType = keyType; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public String getColumnName() { + return columnName; + } + + public void setColumnName(String columnName) { + this.columnName = columnName; + } + + public int getType() { + return type; + } + + public void setType(int type) { + this.type = type; + } + + public boolean isAllowNull() { + return isAllowNull; + } + + public void setAllowNull(boolean allowNull) { + isAllowNull = allowNull; + } + + public String getDefaultValue() { + return defaultValue; + } + + public void setDefaultValue(String defaultValue) { + this.defaultValue = defaultValue; + } + + public String getExtra() { + return extra; + } + + public void setExtra(String extra) { + this.extra = extra; + } +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/ITxcStatement.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/ITxcStatement.java new file mode 100644 index 000000000..0fa72f2d1 --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/ITxcStatement.java @@ -0,0 +1,21 @@ +package com.codingapi.tx.datasource.relational.txc; + +import java.sql.Statement; + +/** + * [类描述] + * + * @author caican + * @date 17/12/4 + */ +public interface ITxcStatement extends Statement { + /** 返回执行的SQL + * @return SQL语句 + */ + String getSql(); + + Statement getStatement(); + + AbstractTxcConnection getTxcDBConnection(); + +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/IndexInfo.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/IndexInfo.java new file mode 100644 index 000000000..2822dae85 --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/IndexInfo.java @@ -0,0 +1,10 @@ +package com.codingapi.tx.datasource.relational.txc; + +/** + * [类描述] + * + * @author caican + * @date 17/12/23 + */ +public class IndexInfo { +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TableMetaInfo.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TableMetaInfo.java new file mode 100644 index 000000000..731b71c0f --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TableMetaInfo.java @@ -0,0 +1,123 @@ +package com.codingapi.tx.datasource.relational.txc; + +import java.util.HashMap; +import java.util.Map; + +/** + * [表描述] + * + * @author caican + * @date 17/12/23 + */ +public class TableMetaInfo { + /** + * SchemaName (if empty, set to CatalogName) + */ + private String schemaName; + /** + * TableName + */ + private String tableName; + + /** + * 列信息 + */ + private Map columnInfoMap = new HashMap<>(); + /** + * 索引 + */ + private Map indexInfoMap = new HashMap<>(); + + + public TableMetaInfo() { + } + + /** + * @param name + * @return + */ + public ColumnInfo getColumnByName(String name) { + String str = name.toUpperCase(); + ColumnInfo ret = this.columnInfoMap.get(str); + if (ret == null) { + if (name.charAt(0) == '`') { + ret = this.columnInfoMap.get(str.substring(1, name.length() - 1)); + } else { + ret = this.columnInfoMap.get("`" + str + "`"); + } + } + return ret; + } + + + public Map getPrimaryKey() { + HashMap ret = new HashMap<>(); + + for (Map.Entry entry : this.columnInfoMap.entrySet()) { + ColumnInfo columnInfo = entry.getValue(); + if (columnInfo.getKeyType() == 0) { + ret.put(entry.getKey(), columnInfo); + } + } + + if (ret.size() > 1) { + throw new RuntimeException("multi pks not support yet."); + } + return ret; + } + + public String getPrimaryKeyName() { + Map primaryKey = getPrimaryKey(); + + if (primaryKey.entrySet().size() > 1) { + throw new RuntimeException("multi pks not support yet."); + + } + + Map.Entry next = primaryKey.entrySet().iterator().next(); + return next.getKey(); + } + + public String getAutoIncrementPrimaryKey() { + + for (Map.Entry entry : this.columnInfoMap.entrySet()) { + ColumnInfo columnInfo = entry.getValue(); + if (columnInfo.getKeyType() == 0 && columnInfo.getExtra().equals("auto_increment")) { + return entry.getKey(); + } + } + return null; + } + + public String getSchemaName() { + return schemaName; + } + + public void setSchemaName(String schemaName) { + this.schemaName = schemaName; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public Map getColumnInfoMap() { + return columnInfoMap; + } + + public void setColumnInfoMap(Map columnInfoMap) { + this.columnInfoMap = columnInfoMap; + } + + public Map getIndexInfoMap() { + return indexInfoMap; + } + + public void setIndexInfoMap(Map indexInfoMap) { + this.indexInfoMap = indexInfoMap; + } +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TableMetaUtils.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TableMetaUtils.java new file mode 100644 index 000000000..c82e13866 --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TableMetaUtils.java @@ -0,0 +1,144 @@ +package com.codingapi.tx.datasource.relational.txc; + +import com.codingapi.tx.aop.bean.TxTransactionLocal; +import com.lorne.core.framework.utils.config.ConfigUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; + +/** + * [类描述] + * + * @author caican + * @date 17/12/23 + */ +public class TableMetaUtils { + + private static final Logger logger = LoggerFactory.getLogger(TableMetaUtils.class); + + private static final ConcurrentHashMap tableMetaInfoCache = + new ConcurrentHashMap<>(); + //TODO 定期让缓存失效 + + private static String dbType; + + + public static TableMetaInfo getTableMetaInfo(Connection connection, String tableName) { + if (StringUtils.isEmpty(tableName)) { + throw new RuntimeException("TableMeta cannot fetched without tableName"); + } + if (connection == null) { + throw new RuntimeException("TableMeta cannot fetched without Connection"); + } + String databaseName = null; + try { + databaseName = getDbNameFromUrl(connection.getMetaData().getURL()); + } catch (Exception e) { } + if (StringUtils.isEmpty(databaseName)) { + databaseName = "NULL"; + } + TableMetaInfo ret; + String fullTableName = databaseName + "." + tableName; + + // 先从本地缓存拿 + ret = tableMetaInfoCache.get(fullTableName); + if (ret == null) { + try { + logger.info("meta is null, fetch schema of " + tableName); + ret = fetchSchema(connection, tableName); + ret.setTableName(tableName); + ret.setSchemaName(databaseName); + + tableMetaInfoCache.putIfAbsent(fullTableName, ret); + } catch (SQLException e) { + logger.error("tableMeta error", e); + } + } + if (ret == null) { + throw new RuntimeException(String.format("[groupId:%s]get tablemeta failed", + TxTransactionLocal.current().getGroupId())); + } + + return ret; + } + + public static String getDbNameFromUrl (String url) { + if (StringUtils.isEmpty(url)) { + return null; + } + int start = 0; + boolean isThreeFound = true; // 要找到3个 '/' + for (int i=0; i<3;i++) { + int index = url.indexOf('/', start+1); + if (index == -1) { + isThreeFound = false; + break; + } + start = index; + } + if (! isThreeFound) { + return null; + } + + int index = url.indexOf('?', start); + String ret; + if (index == -1) { + ret = url.substring(start +1); + } else { + ret = url.substring(start + 1, index); + } + return ret; + } + + private static TableMetaInfo fetchSchema(Connection connection, String tableName) + throws SQLException { + // TODO 支持 mysql以外的 + + return fetchSchemaMysql(connection, tableName); + } + + private static TableMetaInfo fetchSchemaMysql(Connection connection, String tableName) + throws SQLException { + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery("desc " + tableName); + TableMetaInfo tableMetaInfo = new TableMetaInfo(); + tableMetaInfo.setColumnInfoMap(new HashMap()); + while (resultSet.next()) { + ColumnInfo columnInfo = new ColumnInfo(); + columnInfo.setTableName(tableName); + columnInfo.setColumnName(resultSet.getString("Field")); + boolean isAllowNull = true; + if ("NO".equalsIgnoreCase(resultSet.getString("Null"))) { + isAllowNull = false; + } + columnInfo.setAllowNull(isAllowNull); + String key = resultSet.getString("Key"); + int iKey = -1; + if ("PRI".equalsIgnoreCase(key)) { + iKey = 0; + } else if ("UNI".equalsIgnoreCase(key)) { + iKey = 1; + } else if ("MUL".equalsIgnoreCase(key)) { + iKey = 2; + } + columnInfo.setKeyType(iKey);// 这个最重要 + columnInfo.setExtra(resultSet.getString("Extra")); + + //TODO + // resultSet.getString("Type"); +// columnInfo.setType(); + + tableMetaInfo.getColumnInfoMap().put(columnInfo.getColumnName(), columnInfo); + } + return tableMetaInfo; + } + + +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TxcDBConnection.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TxcDBConnection.java new file mode 100644 index 000000000..2d039dedd --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TxcDBConnection.java @@ -0,0 +1,91 @@ +package com.codingapi.tx.datasource.relational.txc; + +import com.alibaba.fastjson.JSON; +import com.codingapi.tx.aop.bean.TxTransactionLocal; +import com.codingapi.tx.datasource.relational.txc.parser.CommitInfo; +import com.codingapi.tx.datasource.relational.txc.rollback.TxcRollbackService; +import com.codingapi.tx.datasource.service.DataSourceService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; + +/** + * create by caisirius on 2017/11/28 + */ + +public class TxcDBConnection extends AbstractTxcConnection { + private Logger logger = LoggerFactory.getLogger(TxcDBConnection.class); + + public TxcDBConnection(Connection connection, TxTransactionLocal txTransactionLocal, + DataSourceService dataSourceService, + TxcRollbackService txcRollbackService) { + super(connection, txTransactionLocal, dataSourceService, txcRollbackService); + } + + @Override + public void transaction() { + if (waitTask == null) { + logger.warn("waitTask is null"); + return; + } + + // start 结束就是全部事务的结束表示,考虑start挂掉的情况 + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + logger.warn("txc自动回滚->" + getGroupId()); + dataSourceService.schedule(getGroupId(), waitTask); + } + }, maxOutTime); + + logger.info("waiting for TxManager notify, groupId {}, timeout {}", getGroupId(), maxOutTime); + waitTask.awaitTask(); + + timer.cancel(); + + int rs = waitTask.getState(); + + logger.info("lcn txc transaction over, groupId {} and state is {}",getGroupId(),(rs==1?"commit":"rollback")); + // 提交 + if (rs == 1) { + // do nothing + } else { + try { + rollbackConnection(); + } catch (Exception e) { + logger.error("rollback error", e); + } + } + + waitTask.remove(); + } + + @Override + protected void closeConnection() throws SQLException { + + if (waitTask != null) { + if (!waitTask.isRemove()) { + waitTask.remove(); + } + } + } + + @Override + protected void rollbackConnection() throws SQLException { + logger.info("doTxcRollback kid:{},context:{}", waitTask.getKey() + , JSON.toJSONString(txcRuntimeContext)); + List commitInfos = txcRuntimeContext.getInfo(); + + // 逆序回滚 + for (int i = commitInfos.size() - 1; i >= 0; i--) { + txcRollbackService.rollback(commitInfos.get(i)); + } + } + +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TxcPreparedStatement.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TxcPreparedStatement.java new file mode 100644 index 000000000..9e20512ce --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TxcPreparedStatement.java @@ -0,0 +1,427 @@ +package com.codingapi.tx.datasource.relational.txc; + +import com.codingapi.tx.datasource.relational.txc.parser.ExecutePaser; +import com.codingapi.tx.datasource.relational.txc.parser.SQLType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.sql.Array; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Date; +import java.sql.NClob; +import java.sql.ParameterMetaData; +import java.sql.PreparedStatement; +import java.sql.Ref; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.RowId; +import java.sql.SQLException; +import java.sql.SQLXML; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; + +/** + * [类描述] + * + * @author caican + * @date 17/11/28 + * @title [confluence页面的title] + */ +public class TxcPreparedStatement extends TxcStatement implements PreparedStatement { + + private static final Logger logger = LoggerFactory.getLogger(TxcPreparedStatement.class); + + public TxcPreparedStatement(PreparedStatement localPreparedStatement, AbstractTxcConnection abstractTxcConnection, + String sql) { + super(localPreparedStatement, abstractTxcConnection); + this.sql = sql; + } + + /** + * 把这个参数缓存下来 有什么用? + */ + private List paramsList = new ArrayList<>(); + + public int getParameterCount() throws SQLException { + return getParameterMetaData().getParameterCount(); + } + + public synchronized List getParamsList() { + return this.paramsList; + } + + private void addParam(int paramInt, Object paramObject1, Object paramObject2, Object paramObject3) + throws SQLException { + List paramsList = getParamsList(); + paramsList.add(paramObject1); + } + + private void addParam(int paramInt, Object paramObject1, Object paramObject2) throws SQLException { + List paramsList = getParamsList(); + paramsList.add(paramObject1); + } + + private void addParam(int paramInt, Object paramObject) throws SQLException { + List paramsList = getParamsList(); + paramsList.add(paramObject); + } + + @Override + public void addBatch() throws SQLException { + ((PreparedStatement) this.statement).addBatch(); + } + + @Override + public void clearParameters() throws SQLException { + ((PreparedStatement) this.statement).clearParameters(); + } + + @Override + public boolean execute() throws SQLException { + if (isInTxcTransaction()) { + SQLType sqlType = ExecutePaser.parse(this); + boolean execute = ((PreparedStatement) this.statement).execute(); + ExecutePaser.after(this, sqlType); + return execute; + } + + return ((PreparedStatement) this.statement).execute(); + } + + @Override + public ResultSet executeQuery() throws SQLException { + // 查询语句也要包装么? + return ((PreparedStatement) this.statement).executeQuery(); + } + + @Override + public int[] executeBatch() throws SQLException { + if (isInTxcTransaction()) { + ExecutePaser.parse(this); + } + return this.statement.executeBatch(); + } + + @Override + public int executeUpdate() throws SQLException { + if (isInTxcTransaction()) { + SQLType sqlType = ExecutePaser.parse(this); + int num = ((PreparedStatement) this.statement).executeUpdate(); + ExecutePaser.after(this, sqlType); + return num; + } + + return ((PreparedStatement) this.statement).executeUpdate(); + } + + @Override + public ResultSetMetaData getMetaData() throws SQLException { + return ((PreparedStatement) this.statement).getMetaData(); + } + + @Override + public ParameterMetaData getParameterMetaData() throws SQLException { + return ((PreparedStatement) this.statement).getParameterMetaData(); + } + + @Override + public void setArray(int paramInt, Array paramArray) throws SQLException { + addParam(paramInt, paramArray); + ((PreparedStatement) this.statement).setArray(paramInt, paramArray); + } + + @Override + public void setAsciiStream(int paramInt, InputStream paramInputStream) throws SQLException { + addParam(paramInt, paramInputStream); + ((PreparedStatement) this.statement).setAsciiStream(paramInt, paramInputStream); + } + + @Override + public void setAsciiStream(int paramInt1, InputStream paramInputStream, int paramInt2) throws SQLException { + addParam(paramInt1, paramInputStream, paramInt2); + ((PreparedStatement) this.statement).setAsciiStream(paramInt1, paramInputStream, paramInt2); + } + + @Override + public void setAsciiStream(int paramInt, InputStream paramInputStream, long paramLong) throws SQLException { + addParam(paramInt, paramInputStream, paramLong); + ((PreparedStatement) this.statement).setAsciiStream(paramInt, paramInputStream, paramLong); + } + + @Override + public void setBigDecimal(int paramInt, BigDecimal paramBigDecimal) throws SQLException { + addParam(paramInt, paramBigDecimal); + ((PreparedStatement) this.statement).setBigDecimal(paramInt, paramBigDecimal); + } + + @Override + public void setBinaryStream(int paramInt, InputStream paramInputStream) throws SQLException { + addParam(paramInt, paramInputStream); + ((PreparedStatement) this.statement).setBinaryStream(paramInt, paramInputStream); + } + + @Override + public void setBinaryStream(int paramInt1, InputStream paramInputStream, int paramInt2) throws SQLException { + addParam(paramInt1, paramInputStream, paramInt2); + ((PreparedStatement) this.statement).setBinaryStream(paramInt1, paramInputStream, paramInt2); + } + + @Override + public void setBinaryStream(int paramInt, InputStream paramInputStream, long paramLong) throws SQLException { + addParam(paramInt, paramInputStream, paramLong); + ((PreparedStatement) this.statement).setBinaryStream(paramInt, paramInputStream, paramLong); + } + + @Override + public void setBlob(int paramInt, Blob paramBlob) throws SQLException { + addParam(paramInt, paramBlob); + ((PreparedStatement) this.statement).setBlob(paramInt, paramBlob); + } + + @Override + public void setBlob(int paramInt, InputStream paramInputStream) throws SQLException { + addParam(paramInt, paramInputStream); + ((PreparedStatement) this.statement).setBlob(paramInt, paramInputStream); + } + + @Override + public void setBlob(int paramInt, InputStream paramInputStream, long paramLong) throws SQLException { + addParam(paramInt, paramInputStream, paramLong); + ((PreparedStatement) this.statement).setBlob(paramInt, paramInputStream, paramLong); + } + + @Override + public void setBoolean(int paramInt, boolean paramBoolean) throws SQLException { + addParam(paramInt, paramBoolean); + ((PreparedStatement) this.statement).setBoolean(paramInt, paramBoolean); + } + + @Override + public void setByte(int paramInt, byte paramByte) throws SQLException { + addParam(paramInt, paramByte); + ((PreparedStatement) this.statement).setByte(paramInt, paramByte); + } + + @Override + public void setBytes(int paramInt, byte[] paramArrayOfByte) throws SQLException { + addParam(paramInt, paramArrayOfByte); + ((PreparedStatement) this.statement).setBytes(paramInt, paramArrayOfByte); + } + + @Override + public void setCharacterStream(int paramInt, Reader paramReader) throws SQLException { + addParam(paramInt, paramReader); + ((PreparedStatement) this.statement).setCharacterStream(paramInt, paramReader); + } + + @Override + public void setCharacterStream(int paramInt1, Reader paramReader, int paramInt2) throws SQLException { + addParam(paramInt1, paramReader, paramInt2); + ((PreparedStatement) this.statement).setCharacterStream(paramInt1, paramReader, paramInt2); + } + + @Override + public void setCharacterStream(int paramInt, Reader paramReader, long paramLong) throws SQLException { + addParam(paramInt, paramReader, paramLong); + ((PreparedStatement) this.statement).setCharacterStream(paramInt, paramReader, paramLong); + } + + @Override + public void setClob(int paramInt, Clob paramClob) throws SQLException { + addParam(paramInt, paramClob); + ((PreparedStatement) this.statement).setClob(paramInt, paramClob); + } + + @Override + public void setClob(int paramInt, Reader paramReader) throws SQLException { + addParam(paramInt, paramReader); + ((PreparedStatement) this.statement).setClob(paramInt, paramReader); + } + + @Override + public void setClob(int paramInt, Reader paramReader, long paramLong) throws SQLException { + addParam(paramInt, paramReader, paramLong); + ((PreparedStatement) this.statement).setClob(paramInt, paramReader, paramLong); + } + + @Override + public void setDate(int paramInt, Date paramDate) throws SQLException { + addParam(paramInt, paramDate); + ((PreparedStatement) this.statement).setDate(paramInt, paramDate); + } + + @Override + public void setDate(int paramInt, Date paramDate, Calendar paramCalendar) throws SQLException { + addParam(paramInt, paramDate, paramCalendar); + ((PreparedStatement) this.statement).setDate(paramInt, paramDate, paramCalendar); + } + + @Override + public void setDouble(int paramInt, double paramDouble) throws SQLException { + addParam(paramInt, paramDouble); + ((PreparedStatement) this.statement).setDouble(paramInt, paramDouble); + } + + @Override + public void setFloat(int paramInt, float paramFloat) throws SQLException { + addParam(paramInt, paramFloat); + ((PreparedStatement) this.statement).setFloat(paramInt, paramFloat); + } + + @Override + public void setInt(int paramInt1, int paramInt2) throws SQLException { + addParam(paramInt1, paramInt2); + ((PreparedStatement) this.statement).setInt(paramInt1, paramInt2); + } + + @Override + public void setLong(int paramInt, long paramLong) throws SQLException { + addParam(paramInt, paramLong); + ((PreparedStatement) this.statement).setLong(paramInt, paramLong); + } + + @Override + public void setNCharacterStream(int paramInt, Reader paramReader) throws SQLException { + addParam(paramInt, paramReader); + ((PreparedStatement) this.statement).setNCharacterStream(paramInt, paramReader); + } + + @Override + public void setNCharacterStream(int paramInt, Reader paramReader, long paramLong) throws SQLException { + addParam(paramInt, paramReader, paramLong); + ((PreparedStatement) this.statement).setNCharacterStream(paramInt, paramReader, paramLong); + } + + @Override + public void setNClob(int paramInt, NClob paramNClob) throws SQLException { + addParam(paramInt, paramNClob); + ((PreparedStatement) this.statement).setNClob(paramInt, paramNClob); + } + + @Override + public void setNClob(int paramInt, Reader paramReader) throws SQLException { + addParam(paramInt, paramReader); + ((PreparedStatement) this.statement).setNClob(paramInt, paramReader); + } + + @Override + public void setNClob(int paramInt, Reader paramReader, long paramLong) throws SQLException { + addParam(paramInt, paramReader, paramLong); + ((PreparedStatement) this.statement).setNClob(paramInt, paramReader, paramLong); + } + + @Override + public void setNString(int paramInt, String paramString) throws SQLException { + addParam(paramInt, paramString); + ((PreparedStatement) this.statement).setNString(paramInt, paramString); + } + + @Override + public void setNull(int paramInt1, int paramInt2) throws SQLException { + // addParam(paramInt1, g.addParam()); + ((PreparedStatement) this.statement).setNull(paramInt1, paramInt2); + } + + @Override + public void setNull(int paramInt1, int paramInt2, String paramString) throws SQLException { + // addParam(paramInt1, Integer.valueOf(paramInt2), g.addParam()); + ((PreparedStatement) this.statement).setNull(paramInt1, paramInt2, paramString); + } + + @Override + public void setObject(int paramInt, Object paramObject) throws SQLException { + addParam(paramInt, paramObject); + ((PreparedStatement) this.statement).setObject(paramInt, paramObject); + } + + @Override + public void setObject(int paramInt1, Object paramObject, int paramInt2) throws SQLException { + addParam(paramInt1, paramObject, paramInt2); + ((PreparedStatement) this.statement).setObject(paramInt1, paramObject, paramInt2); + } + + @Override + public void setObject(int paramInt1, Object paramObject, int paramInt2, int paramInt3) throws SQLException { + addParam(paramInt1, paramObject, paramInt2, paramInt3); + ((PreparedStatement) this.statement).setObject(paramInt1, paramObject, paramInt2, paramInt3); + } + + @Override + public void setRef(int paramInt, Ref paramRef) throws SQLException { + addParam(paramInt, paramRef); + ((PreparedStatement) this.statement).setRef(paramInt, paramRef); + } + + @Override + public void setRowId(int paramInt, RowId paramRowId) throws SQLException { + addParam(paramInt, paramRowId); + ((PreparedStatement) this.statement).setRowId(paramInt, paramRowId); + } + + @Override + public void setSQLXML(int paramInt, SQLXML paramSQLXML) throws SQLException { + addParam(paramInt, paramSQLXML); + ((PreparedStatement) this.statement).setSQLXML(paramInt, paramSQLXML); + } + + @Override + public void setShort(int paramInt, short paramShort) throws SQLException { + addParam(paramInt, paramShort); + ((PreparedStatement) this.statement).setShort(paramInt, paramShort); + } + + @Override + public void setString(int paramInt, String paramString) throws SQLException { + addParam(paramInt, paramString); + ((PreparedStatement) this.statement).setString(paramInt, paramString); + } + + @Override + public void setTime(int paramInt, Time paramTime) throws SQLException { + addParam(paramInt, paramTime); + ((PreparedStatement) this.statement).setTime(paramInt, paramTime); + } + + @Override + public void setTime(int paramInt, Time paramTime, Calendar paramCalendar) throws SQLException { + addParam(paramInt, paramTime, paramCalendar); + ((PreparedStatement) this.statement).setTime(paramInt, paramTime); + } + + @Override + public void setTimestamp(int paramInt, Timestamp paramTimestamp) throws SQLException { + // addParam(paramInt, new Timestamp(paramTimestamp.getTime() - paramTimestamp.getTime() % 1000)); + paramTimestamp.setNanos(0); + addParam(paramInt, paramTimestamp); + + ((PreparedStatement) this.statement).setTimestamp(paramInt, paramTimestamp); + } + + @Override + public void setTimestamp(int paramInt, Timestamp paramTimestamp, Calendar paramCalendar) throws SQLException { + paramTimestamp.setNanos(0); + addParam(paramInt, paramTimestamp, paramCalendar); + ((PreparedStatement) this.statement).setTimestamp(paramInt, paramTimestamp, paramCalendar); + } + + @Override + public void setURL(int paramInt, URL paramURL) throws SQLException { + addParam(paramInt, paramURL); + ((PreparedStatement) this.statement).setURL(paramInt, paramURL); + } + + @Override + public void setUnicodeStream(int paramInt1, InputStream paramInputStream, int paramInt2) throws SQLException { + addParam(paramInt1, paramInputStream, paramInt2); + ((PreparedStatement) this.statement).setUnicodeStream(paramInt1, paramInputStream, paramInt2); + } + +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TxcRuntimeContextService.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TxcRuntimeContextService.java new file mode 100644 index 000000000..822ee6683 --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TxcRuntimeContextService.java @@ -0,0 +1,14 @@ +package com.codingapi.tx.datasource.relational.txc; + + +import com.codingapi.tx.datasource.relational.txc.parser.TxcRuntimeContext; + +/** + * @author jsy. + * 17/12/7. + */ +public interface TxcRuntimeContextService { + + TxcRuntimeContext getTxcRuntimeContext(); + +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TxcSqlExecutor.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TxcSqlExecutor.java new file mode 100644 index 000000000..97bcd4067 --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TxcSqlExecutor.java @@ -0,0 +1,34 @@ +package com.codingapi.tx.datasource.relational.txc; + + +import com.codingapi.tx.datasource.relational.txc.parser.ResultConvertUtils; +import com.codingapi.tx.datasource.relational.txc.parser.SQLType; +import com.codingapi.tx.datasource.relational.txc.parser.TxcLine; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +/** + * @author jsy. + * @title + * @time 17/12/26. + */ +public class TxcSqlExecutor { + + public static List executeQuery(String sql, Connection connection) throws SQLException { + ResultSet resultSet = connection.prepareStatement(sql).executeQuery(); + return ResultConvertUtils.convertWithPrimary(resultSet, null, SQLType.SELECT); + } + + public static List executeQuery(PreparedStatement preparedStatement) throws SQLException { + ResultSet resultSet = preparedStatement.executeQuery(); + return ResultConvertUtils.convertWithPrimary(resultSet, null, SQLType.SELECT); + } + + public static void execute(String sql, Connection connection) throws SQLException { + connection.prepareStatement(sql).execute(); + } +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TxcStatement.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TxcStatement.java new file mode 100644 index 000000000..c9c961880 --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/TxcStatement.java @@ -0,0 +1,340 @@ +package com.codingapi.tx.datasource.relational.txc; + + +import com.codingapi.tx.aop.bean.TxTransactionLocal; +import com.codingapi.tx.datasource.relational.txc.parser.ExecutePaser; +import com.codingapi.tx.datasource.relational.txc.parser.SQLType; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.Statement; + +/** + * [类描述] + * + * @author caican + * @date 17/11/28 + * @title [confluence页面的title] + */ +public class TxcStatement implements ITxcStatement { + + protected Statement statement; + protected AbstractTxcConnection abstractTxcConnection; + + protected String sql; + + public TxcStatement(Statement localStatement, AbstractTxcConnection abstractTxcConnection) { + this.statement = localStatement; + this.abstractTxcConnection = abstractTxcConnection; + } + + @Override + public String getSql() { + return sql; + } + + @Override + public Statement getStatement() { + return statement; + } + + private void setSql(String sql) { + this.sql = sql; + } + + /** + * @return txc 是否开启 + */ + protected boolean isInTxcTransaction(){ + return TxTransactionLocal.isInTxcTransaction(); + } + + @Override + public boolean execute(String paramString) throws SQLException { + setSql(paramString); + if (isInTxcTransaction()) { + SQLType sqlType = ExecutePaser.parse(this); + boolean execute = this.statement.execute(paramString); + ExecutePaser.after(this, sqlType); + return execute; + } + + return this.statement.execute(paramString); + } + + @Override + public boolean execute(String paramString, int paramInt) throws SQLException { + setSql(paramString); + if (isInTxcTransaction()) { + SQLType sqlType = ExecutePaser.parse(this); + boolean execute = this.statement.execute(paramString, paramInt); + ExecutePaser.after(this, sqlType); + return execute; + } + + return this.statement.execute(paramString, paramInt); + } + + @Override + public boolean execute(String paramString, int[] paramArrayOfInt) throws SQLException { + setSql(paramString); + if (isInTxcTransaction()) { + SQLType sqlType = ExecutePaser.parse(this); + boolean execute = this.statement.execute(paramString, paramArrayOfInt); + ExecutePaser.after(this, sqlType); + return execute; + } + + return this.statement.execute(paramString, paramArrayOfInt); + } + + @Override + public boolean execute(String paramString, String[] paramArrayOfString) throws SQLException { + setSql(paramString); + if (isInTxcTransaction()) { + SQLType sqlType = ExecutePaser.parse(this); + boolean execute = this.statement.execute(paramString, paramArrayOfString); + ExecutePaser.after(this, sqlType); + return execute; + } + + return this.statement.execute(paramString, paramArrayOfString); + } + + @Override + public int[] executeBatch() throws SQLException { + if (! isInTxcTransaction()) { + return this.statement.executeBatch(); + } + throw new RuntimeException("Unsupported"); + } + + @Override + public ResultSet executeQuery(String paramString) throws SQLException { + setSql(paramString); + return this.statement.executeQuery(paramString); + } + + @Override + public int executeUpdate(String paramString) throws SQLException { + setSql(paramString); + if (isInTxcTransaction()) { + SQLType sqlType = ExecutePaser.parse(this); + int execute = this.statement.executeUpdate(paramString); + ExecutePaser.after(this, sqlType); + return execute; + } + + return this.statement.executeUpdate(paramString); + } + + @Override + public int executeUpdate(String paramString, int paramInt) throws SQLException { + setSql(paramString); + if (isInTxcTransaction()) { + ExecutePaser.parse(this); + + } + return this.statement.executeUpdate(paramString, paramInt); + } + + @Override + public int executeUpdate(String paramString, int[] paramArrayOfInt) throws SQLException { + setSql(paramString); + if (isInTxcTransaction()) { + ExecutePaser.parse(this); + } + return this.statement.executeUpdate(paramString, paramArrayOfInt); + } + + @Override + public int executeUpdate(String paramString, String[] paramArrayOfString) throws SQLException { + setSql(paramString); + if (isInTxcTransaction()) { + ExecutePaser.parse(this); + + } + return this.statement.executeUpdate(paramString, paramArrayOfString); + } + + // 不支持 + @Override + public void closeOnCompletion() + { + throw new RuntimeException("Unsupported"); + } + + @Override + public boolean isCloseOnCompletion() + { + throw new RuntimeException("Unsupported"); + } + + // 完全不动直接代理 + @Override + public void cancel() throws SQLException { + this.statement.cancel(); + } + + @Override + public void clearWarnings() throws SQLException { + this.statement.clearWarnings(); + } + + @Override + public void close() throws SQLException { + this.statement.close(); + } + + @Override + public void addBatch(String paramString) throws SQLException { + this.statement.addBatch(paramString); + } + + @Override + public void clearBatch() throws SQLException { + this.statement.clearBatch(); + } + + @Override + public Connection getConnection() throws SQLException { + return this.statement.getConnection(); + } + + @Override + public int getFetchDirection() throws SQLException { + return this.statement.getFetchDirection(); + } + + @Override + public int getFetchSize() throws SQLException { + return this.statement.getFetchSize(); + } + + @Override + public ResultSet getGeneratedKeys() throws SQLException { + return this.statement.getGeneratedKeys(); + } + + @Override + public int getMaxFieldSize() throws SQLException { + return this.statement.getMaxFieldSize(); + } + + @Override + public int getMaxRows() throws SQLException { + return this.statement.getMaxRows(); + } + + @Override + public boolean getMoreResults() throws SQLException { + return this.statement.getMoreResults(); + } + + @Override + public boolean getMoreResults(int paramInt) throws SQLException { + return this.statement.getMoreResults(paramInt); + } + + @Override + public int getQueryTimeout() throws SQLException { + return this.statement.getQueryTimeout(); + } + + @Override + public ResultSet getResultSet() throws SQLException { + return this.statement.getResultSet(); + } + + @Override + public int getResultSetConcurrency() throws SQLException { + return this.statement.getResultSetConcurrency(); + } + + @Override + public int getResultSetHoldability() throws SQLException { + return this.statement.getResultSetHoldability(); + } + + @Override + public int getResultSetType() throws SQLException { + return this.statement.getResultSetType(); + } + + @Override + public int getUpdateCount() throws SQLException { + return this.statement.getUpdateCount(); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return this.statement.getWarnings(); + } + + @Override + public boolean isClosed() throws SQLException { + return this.statement.isClosed(); + } + + @Override + public boolean isPoolable() throws SQLException { + return this.statement.isPoolable(); + } + + @Override + public void setCursorName(String paramString) throws SQLException { + this.statement.setCursorName(paramString); + } + + @Override + public void setEscapeProcessing(boolean paramBoolean) throws SQLException { + this.statement.setEscapeProcessing(paramBoolean); + } + + @Override + public void setFetchDirection(int paramInt) throws SQLException { + this.statement.setFetchDirection(paramInt); + } + + @Override + public void setFetchSize(int paramInt) throws SQLException { + this.statement.setFetchSize(paramInt); + } + + @Override + public void setMaxFieldSize(int paramInt) throws SQLException { + this.statement.setMaxFieldSize(paramInt); + } + + @Override + public void setMaxRows(int paramInt) throws SQLException { + this.statement.setMaxRows(paramInt); + } + + @Override + public void setPoolable(boolean paramBoolean) throws SQLException { + this.statement.setPoolable(paramBoolean); + } + + @Override + public void setQueryTimeout(int paramInt) throws SQLException { + this.statement.setQueryTimeout(paramInt); + } + + @Override + public boolean isWrapperFor(Class paramClass) throws SQLException { + return this.statement.isWrapperFor(paramClass); + } + + @Override + public T unwrap(Class paramClass) throws SQLException { + return this.statement.unwrap(paramClass); + } + + @Override + public AbstractTxcConnection getTxcDBConnection() { + return abstractTxcConnection; + } +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/AbstractParser.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/AbstractParser.java new file mode 100644 index 000000000..ad554d033 --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/AbstractParser.java @@ -0,0 +1,187 @@ +package com.codingapi.tx.datasource.relational.txc.parser; + +import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser; +import com.codingapi.tx.datasource.relational.txc.TableMetaUtils; +import com.codingapi.tx.datasource.relational.txc.TxcPreparedStatement; +import com.codingapi.tx.datasource.relational.txc.TxcStatement; +import org.apache.commons.collections.CollectionUtils; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +/** + * @author jsy. + * @title + * @time 17/12/11. + */ + +public abstract class AbstractParser{ + + public CommitInfo parse(TxcStatement txcStatement) throws SQLException { + CommitInfo commitInfo = new CommitInfo(); + + String sql = txcStatement.getSql(); + T sqlParseStatement = (T) new MySqlStatementParser(sql).parseStatement(); + + //设置sqltype + commitInfo.setSqlType(getSqlType()); + + //设置where + commitInfo.setWhere(getWhere(sqlParseStatement)); + + //sql + commitInfo.setSql(sql); + + + + if (txcStatement instanceof TxcPreparedStatement) { + commitInfo.setSqlParams(((TxcPreparedStatement) txcStatement).getParamsList()); + commitInfo.setWhereParams( + getWhereParams(((TxcPreparedStatement) txcStatement).getParamsList(), sqlParseStatement)); + } + + //解析之前的值 + commitInfo.setOriginalValue(getOriginValue(commitInfo.getWhereParams(), sqlParseStatement, txcStatement.getConnection())); + + //解析之后的值 + commitInfo.setPresentValue(getPresentValue(commitInfo.getSqlParams(), sqlParseStatement)); + + return commitInfo; + + + + } + + protected abstract List getWhereParams(List sqlParamsList, T parseSqlStatement); + + protected abstract String getWhere(T parseSqlStatement); + + //从当前sql取出值 + public abstract TxcTable getPresentValue(List sqlParamsList, T parseSqlStatement); + + //从数据库取出值 + public TxcTable getOriginValue(List whereParamsList, T parseSqlStatement, Connection connection) + throws SQLException { + TxcTable txcTable = new TxcTable(); + txcTable.setTableName(getTableName(parseSqlStatement)); + + // 组装sql + String primaryKeyName = TableMetaUtils + .getTableMetaInfo(connection, getTableName(parseSqlStatement)).getPrimaryKeyName(); + String selectSql = selectSql(parseSqlStatement, primaryKeyName); + + PreparedStatement preparedStatement = connection.prepareStatement(selectSql); + + if (CollectionUtils.isNotEmpty(whereParamsList)) { + // 设置条件 + for (int i = 1; i <= whereParamsList.size(); i++) { + preparedStatement.setObject(i, whereParamsList.get(i - 1)); + } + } + + // 执行查询sql + ResultSet resultSet = preparedStatement.executeQuery(); + + List txcLines = ResultConvertUtils.convertWithPrimary(resultSet, primaryKeyName, getSqlType()); + + // convert + txcTable.setLine(txcLines); + return txcTable; + } + + + public abstract SQLType getSqlType(); + + + +// +// public int getTypeByClass(Object x) { +// if (x == null) { +// return Types.OTHER; +// } +// +// Class clazz = x.getClass(); +// if (clazz == Byte.class) { +// return Types.TINYINT; +// } +// +// if (clazz == Short.class) { +// return Types.SMALLINT; +// } +// +// if (clazz == Integer.class) { +// return Types.INTEGER; +// } +// +// if (clazz == Long.class) { +// +// return Types.BIGINT; +// } +// +// if (clazz == String.class) { +// return Types.VARCHAR; +// } +// +// if (clazz == BigDecimal.class) { +// return Types.DECIMAL; +// } +// +// if (clazz == Float.class) { +// return Types.FLOAT; +// } +// +// if (clazz == Double.class) { +// return Types.DOUBLE; +// } +// +// if (clazz == java.sql.Date.class || clazz == java.util.Date.class) { +// return Types.DATE; +// } +// +// if (clazz == java.sql.Timestamp.class) { +// return Types.TIMESTAMP; +// } +// +// if (clazz == java.sql.Time.class) { +// return Types.TIME; +// } +// +// if (clazz == Boolean.class) { +// return Types.BOOLEAN; +// } +// +// if (clazz == byte[].class) { +// return JdbcParameter.TYPE.BYTES; +// } +// +// if (x instanceof InputStream) { +// return JdbcParameter.TYPE.BinaryInputStream; +// } +// +// if (x instanceof Reader) { +// return JdbcParameter.TYPE.CharacterInputStream; +// } +// +// if (x instanceof Clob) { +// return Types.CLOB; +// } +// +// if (x instanceof NClob) { +// return Types.NCLOB; +// } +// +// if (x instanceof Blob) { +// return Types.BLOB; +// } +// return Types.OTHER; +// +// } + + protected abstract String selectSql(T parseSqlStatement, String primaryKeyName); + + protected abstract String getTableName(T parseSqlStatement); + +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/CommitInfo.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/CommitInfo.java new file mode 100644 index 000000000..497bd04d3 --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/CommitInfo.java @@ -0,0 +1,106 @@ +package com.codingapi.tx.datasource.relational.txc.parser; + +import com.google.common.collect.Lists; + +import java.util.List; + + +public class CommitInfo implements Cloneable { + /** + * 更新之前 的行数据 + */ + private TxcTable originalValue = new TxcTable(); + /** + * 更新后的行数据 + */ + private TxcTable presentValue = new TxcTable(); + + /** + * Where 条件 + */ + private String where = ""; + + private List whereParams = Lists.newArrayList(); + /** + * 更新类型 UPDTAE or DELETE or insert + */ + private SQLType sqlType = null; + /** + * 业务执行sql + */ + private String sql = ""; + + /** + * sql的属性值,对应PreparedStatement存在 + */ + private List sqlParams = Lists.newArrayList(); + + private String schemaName; + + + + public TxcTable getOriginalValue() { + return originalValue; + } + + public void setOriginalValue(TxcTable originalValue) { + this.originalValue = originalValue; + } + + public TxcTable getPresentValue() { + return presentValue; + } + + public void setPresentValue(TxcTable presentValue) { + this.presentValue = presentValue; + } + + public String getWhere() { + return where; + } + + public void setWhere(String where) { + this.where = where; + } + + public List getWhereParams() { + return whereParams; + } + + public void setWhereParams(List whereParams) { + this.whereParams = whereParams; + } + + public SQLType getSqlType() { + return sqlType; + } + + public void setSqlType(SQLType sqlType) { + this.sqlType = sqlType; + } + + + public String getSql() { + return sql; + } + + public void setSql(String sql) { + this.sql = sql; + } + + public List getSqlParams() { + return sqlParams; + } + + public void setSqlParams(List sqlParams) { + this.sqlParams = sqlParams; + } + + public String getSchemaName() { + return schemaName; + } + + public void setSchemaName(String schemaName) { + this.schemaName = schemaName; + } +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/DeleteParser.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/DeleteParser.java new file mode 100644 index 000000000..9dcdb1a2e --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/DeleteParser.java @@ -0,0 +1,71 @@ +package com.codingapi.tx.datasource.relational.txc.parser; + +import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlDeleteStatement; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.ListUtils; + +import java.util.List; + +/** + * @author jsy. + * @title + * @time 17/12/22. + */ +public class DeleteParser extends AbstractParser { + private static DeleteParser instance = null; + + public static DeleteParser getInstance() { + if (instance == null) { + synchronized (DeleteParser.class) { + if (instance == null) { + instance = new DeleteParser(); + } + } + } + return instance; + } + + @Override + protected List getWhereParams(List sqlParamsList, MySqlDeleteStatement parseSqlStatement) { + if (CollectionUtils.isNotEmpty(sqlParamsList)) { + return sqlParamsList; + } + return ListUtils.EMPTY_LIST; + } + + @Override + protected String getWhere(MySqlDeleteStatement parseSqlStatement) { + return SqlUtils.toSQLString(parseSqlStatement.getWhere()); + } + + @Override + public TxcTable getPresentValue(List sqlParamsList, MySqlDeleteStatement parseSqlStatement) { + return null; + } + + + @Override + public SQLType getSqlType() { + return SQLType.DELETE; + } + + @Override + protected String selectSql(MySqlDeleteStatement mySqlUpdateStatement, String primaryKeyName) { + StringBuffer stringBuffer = new StringBuffer(); + stringBuffer.append("SELECT * "); + + // + stringBuffer.append(" from ").append(mySqlUpdateStatement.getTableName().getSimpleName()).append(" where "); + // + stringBuffer.append(SqlUtils.toSQLString(mySqlUpdateStatement.getWhere())); + return stringBuffer.toString(); + } + + @Override + protected String getTableName(MySqlDeleteStatement parseSqlStatement) { + return parseSqlStatement.getTableName().getSimpleName(); + } + + + +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/ExecutePaser.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/ExecutePaser.java new file mode 100644 index 000000000..fca522e22 --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/ExecutePaser.java @@ -0,0 +1,126 @@ +package com.codingapi.tx.datasource.relational.txc.parser; + +import com.alibaba.druid.sql.ast.SQLStatement; +import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlDeleteStatement; +import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlInsertStatement; +import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlUpdateStatement; +import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser; +import com.codingapi.tx.datasource.relational.txc.TableMetaInfo; +import com.codingapi.tx.datasource.relational.txc.TableMetaUtils; +import com.codingapi.tx.datasource.relational.txc.TxcStatement; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +/** + * [类描述] + * + * @author caican + * @date 17/11/28 + * @title [confluence页面的title] + */ +public class ExecutePaser { + + private static final Logger logger = LoggerFactory.getLogger(ExecutePaser.class); + + + + public static SQLType parse(TxcStatement txcStatement) { + long start = System.currentTimeMillis(); + SQLType sqlType = SQLType.SELECT; + try { + TxcRuntimeContext txcRuntimeContext = txcStatement.getTxcDBConnection().getTxcRuntimeContext(); + + //解析sql + String sql = txcStatement.getSql(); + SQLStatement sqlParseStatement = new MySqlStatementParser(sql).parseStatement(); + + CommitInfo commitInfo = null; + if (sqlParseStatement instanceof MySqlUpdateStatement) { + commitInfo = UpdateParser.getInstance().parse(txcStatement); + txcRuntimeContext.getInfo().add(commitInfo); + sqlType = SQLType.UPDATE; + } else if(sqlParseStatement instanceof MySqlInsertStatement){ + commitInfo = InsertParser.getInstance().parse(txcStatement); + txcRuntimeContext.getInfo().add(commitInfo); + sqlType = SQLType.INSERT; + } else if(sqlParseStatement instanceof MySqlDeleteStatement) { + commitInfo = DeleteParser.getInstance().parse(txcStatement); + txcRuntimeContext.getInfo().add(commitInfo); + sqlType = SQLType.DELETE; + } + + if (commitInfo != null && commitInfo.getSchemaName() == null) { + String dbName = TableMetaUtils.getDbNameFromUrl(txcStatement.getConnection().getMetaData().getURL()); + commitInfo.setSchemaName(dbName); + } + + } catch (Exception e) { + logger.error("parse sql error", e); + } finally { + long cost = System.currentTimeMillis() - start; + if (sqlType != SQLType.SELECT || cost > 50) { + logger.info("解析 sql:{}, cost:{}ms", txcStatement.getSql(), cost); + } + } + return sqlType; + } + + public static void after(TxcStatement txcStatement, SQLType sqlType) { + + try { + if (sqlType == SQLType.INSERT) { + TxcRuntimeContext txcRuntimeContext = txcStatement.getTxcDBConnection().getTxcRuntimeContext(); + List commitInfos = txcRuntimeContext.getInfo(); + if (commitInfos.size() == 0) { + return; + } + + CommitInfo commitInfo = commitInfos.get(commitInfos.size() - 1); + + List line = commitInfo.getPresentValue().getLine(); + if (line.size() > 1) { + logger.error("不支持多条插入sql"); + return; + } + + TxcLine txcLine = line.get(0); + + setPrimaryValue(txcStatement, commitInfo, txcLine); + + } + } catch (SQLException e) { + logger.error("execute parser after error", e); + } + } + + private static void setPrimaryValue(TxcStatement txcStatement, CommitInfo commitInfo, + TxcLine txcLine) throws SQLException { + TableMetaInfo tableMetaInfo = TableMetaUtils + .getTableMetaInfo(txcStatement.getConnection(), commitInfo.getPresentValue().getTableName()); + String autoIncrementPrimaryKey = tableMetaInfo.getAutoIncrementPrimaryKey(); + if (StringUtils.isBlank(autoIncrementPrimaryKey)) { + String primaryKeyName = tableMetaInfo.getPrimaryKeyName(); + txcLine.setPrimaryKey(primaryKeyName); + + for (TxcField txcField : txcLine.getFields()) { + if (txcField.getName().equals(primaryKeyName)) { + txcLine.setPrimaryValue(txcField.getValue()); + return; + } + } + + } else { + txcLine.setPrimaryKey(autoIncrementPrimaryKey); + ResultSet resultSet = txcStatement.getConnection().prepareStatement("select last_insert_id() as id") + .executeQuery(); + while (resultSet.next()) { + txcLine.setPrimaryValue(resultSet.getObject("id")); + } + } + } +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/InsertParser.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/InsertParser.java new file mode 100644 index 000000000..734521bed --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/InsertParser.java @@ -0,0 +1,94 @@ +package com.codingapi.tx.datasource.relational.txc.parser; + +import com.alibaba.druid.sql.ast.SQLExpr; +import com.alibaba.druid.sql.ast.statement.SQLInsertStatement; +import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlInsertStatement; +import org.apache.commons.collections.CollectionUtils; + +import java.sql.Connection; +import java.util.List; + +/** + * @author jsy. + * @title + * @time 17/12/22. + */ +public class InsertParser extends AbstractParser{ + + private static InsertParser instance = null; + + public static InsertParser getInstance() { + if (instance == null) { + synchronized (InsertParser.class) { + if (instance == null) { + instance = new InsertParser(); + } + } + } + return instance; + } + @Override + protected List getWhereParams(List sqlParamsList, + MySqlInsertStatement parseSqlStatement) { + return null; + } + + @Override + protected String getWhere(MySqlInsertStatement parseSqlStatement) { + return null; + } + + @Override + public TxcTable getPresentValue(List sqlParamsList, MySqlInsertStatement parseSqlStatement) { + + TxcTable txcTable = new TxcTable(); + txcTable.setTableName(parseSqlStatement.getTableName().getSimpleName()); + List line = txcTable.getLine(); + + List valuesList = parseSqlStatement.getValuesList(); + List columns = parseSqlStatement.getColumns(); + + + for (SQLInsertStatement.ValuesClause valuesClause : valuesList) { + List values = valuesClause.getValues(); + TxcLine txcLine = new TxcLine(); + for (int i = 0; i < columns.size(); i++) { + TxcField txcField = new TxcField(); + txcField.setName(SqlUtils.toSQLString(columns.get(i)).replace("\'", "").replace("`", "").trim()); + if (CollectionUtils.isNotEmpty(sqlParamsList)) { + txcField.setValue(sqlParamsList.get(i)); + } else { + txcField.setValue(SqlUtils.toSQLString(values.get(i))); + } + txcLine.getFields().add(txcField); + } + line.add(txcLine); + } + + + + return txcTable; + } + + @Override + public TxcTable getOriginValue(List whereParamsList, MySqlInsertStatement parseSqlStatement, + Connection connection) { + return null; + } + + + @Override + public SQLType getSqlType() { + return SQLType.INSERT; + } + + @Override + protected String selectSql(MySqlInsertStatement parseSqlStatement, String primaryKeyName) { + throw new RuntimeException("不支持的类型"); + } + + @Override + protected String getTableName(MySqlInsertStatement parseSqlStatement) { + return parseSqlStatement.getTableName().getSimpleName(); + } +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/ResultConvertUtils.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/ResultConvertUtils.java new file mode 100644 index 000000000..a97d8adee --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/ResultConvertUtils.java @@ -0,0 +1,150 @@ +package com.codingapi.tx.datasource.relational.txc.parser; + +import com.google.common.collect.Lists; +import org.apache.commons.lang.StringUtils; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Types; +import java.util.List; + +/** + * @author jsy. + * @title + * @time 17/12/26. + */ +public class ResultConvertUtils { + + + + public static List convertWithPrimary(ResultSet resultSet, String primaryKeyName, SQLType sqlType) throws SQLException { + List txcLines = Lists.newArrayList(); + ResultSetMetaData metaData = resultSet.getMetaData(); + + + while (resultSet.next()) { + TxcLine txcLine = new TxcLine(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + + if (i == metaData.getColumnCount() && StringUtils.equalsIgnoreCase(metaData.getColumnName(i), primaryKeyName) && sqlType == SQLType.UPDATE) { + + txcLine.setPrimaryKey(metaData.getColumnName(i)); + txcLine.setPrimaryValue(getDataByType(i, metaData.getColumnType(i), resultSet)); + } else { + TxcField txcField = new TxcField(); + + txcField.setName(metaData.getColumnName(i)); + txcField.setType(metaData.getColumnType(i)); + txcField.setValue(getDataByType(i, metaData.getColumnType(i), resultSet)); + + + txcLine.getFields().add(txcField); + } + + } + txcLines.add(txcLine); + + } + return txcLines; + } + + //https://www.cis.upenn.edu/~bcpierce/courses/629/jdkdocs/guide/jdbc/getstart/mapping.doc.html + private static Object getDataByType(int index, int columnType, ResultSet resultSet) throws SQLException { + + if (columnType == Types.BIT) { + return resultSet.getByte(index); + } + + if (columnType == Types.TINYINT) { + return resultSet.getByte(index); + + } + + if (columnType == Types.SMALLINT) { + return resultSet.getShort(index); + } + + + if (columnType == Types.INTEGER) { + return resultSet.getInt(index); + } + + if (columnType == Types.BIGINT) { + return resultSet.getLong(index); + } + + if (columnType == Types.FLOAT) { + return resultSet.getFloat(index); + + } + + if (columnType == Types.DOUBLE) { + return resultSet.getDouble(index); + + } + + if (columnType == Types.NUMERIC) { + return resultSet.getInt(index); + + } + + if (columnType == Types.DECIMAL) { + return resultSet.getBigDecimal(index); + } + + + if (columnType == Types.CHAR) { + return resultSet.getString(index); +// return resultSet.getCharacterStream(index); + } + + if (columnType == Types.VARCHAR) { + return resultSet.getString(index); + } + + + if (columnType == Types.LONGNVARCHAR) { + return resultSet.getString(index); + } + + if (columnType == Types.DATE) { + return resultSet.getDate(index); + } + + if (columnType == Types.TIME) { + return resultSet.getTime(index); + } + + if (columnType == Types.NCHAR) { + return resultSet.getNString(index); +// return resultSet.getNCharacterStream(index); + } + + if (columnType == Types.NVARCHAR) { + return resultSet.getNString(index); + } + + if (columnType == Types.OTHER) { + return resultSet.getObject(index); + } + + if (columnType == Types.BLOB) { + return resultSet.getBlob(index); + } + + if (columnType == Types.BOOLEAN) { + return resultSet.getBoolean(index); + } + + + if (columnType == Types.ARRAY) { + return resultSet.getArray(index); + } + + if (columnType == Types.TIMESTAMP) { + return resultSet.getTimestamp(index); + } + return resultSet.getObject(index); + } +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/SQLType.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/SQLType.java new file mode 100644 index 000000000..8f38be110 --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/SQLType.java @@ -0,0 +1,12 @@ +package com.codingapi.tx.datasource.relational.txc.parser; + +/** + * + */ +public enum SQLType { + SELECT(), + UPDATE(), + INSERT(), + DELETE(), + UNKNOW(); +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/SqlUtils.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/SqlUtils.java new file mode 100644 index 000000000..9f53384c4 --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/SqlUtils.java @@ -0,0 +1,18 @@ +package com.codingapi.tx.datasource.relational.txc.parser; + +import com.alibaba.druid.sql.SQLUtils; +import com.alibaba.druid.sql.ast.SQLObject; + +/** + * @author jsy. + * @title + * @time 17/12/15. + */ +public class SqlUtils { + + private static final String dbType = "mysql"; + + public static String toSQLString(SQLObject sqlObject) { + return SQLUtils.toSQLString(sqlObject); + } +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/TxcField.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/TxcField.java new file mode 100644 index 000000000..466b67a6a --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/TxcField.java @@ -0,0 +1,50 @@ +package com.codingapi.tx.datasource.relational.txc.parser; + + +import com.fasterxml.jackson.annotation.JsonIgnore; + +/** + * @author caisirius + */ +public class TxcField { + private String name; + + // DiffUtils 比对时忽略此字段 + @JsonIgnore + private int type; + private Object value; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getType() { + return type; + } + + public void setType(int type) { + this.type = type; + } + + public Object getValue() { + return value; + } + + public void setValue(Object value) { + this.value = value; + } + + public String getSqlName() { + return "`" + name + "`"; + } + + + @Override + public String toString() { + return String.format("[%s,%s]", new Object[]{this.name, String.valueOf(this.value)}); + } +} \ No newline at end of file diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/TxcLine.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/TxcLine.java new file mode 100644 index 000000000..70d29de9a --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/TxcLine.java @@ -0,0 +1,46 @@ +package com.codingapi.tx.datasource.relational.txc.parser; + + +import com.fasterxml.jackson.annotation.JsonIgnore; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author caisirius + */ +public class TxcLine { + + private List fields = new ArrayList(); + + // DiffUtils 比对时忽略此字段 + @JsonIgnore + private Object primaryKey; + + @JsonIgnore + private Object primaryValue; + + public List getFields() { + return fields; + } + + public void setFields(List fields) { + this.fields = fields; + } + + public Object getPrimaryKey() { + return primaryKey; + } + + public void setPrimaryKey(Object primaryKey) { + this.primaryKey = primaryKey; + } + + public Object getPrimaryValue() { + return primaryValue; + } + + public void setPrimaryValue(Object primaryValue) { + this.primaryValue = primaryValue; + } +} \ No newline at end of file diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/TxcRuntimeContext.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/TxcRuntimeContext.java new file mode 100644 index 000000000..54a069a10 --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/TxcRuntimeContext.java @@ -0,0 +1,73 @@ +package com.codingapi.tx.datasource.relational.txc.parser; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author caisirius + */ +public class TxcRuntimeContext { + private static final Logger logger = LoggerFactory.getLogger(TxcRuntimeContext.class); + + /** + * 事务组Id 对应于txc的 xid + */ + public String groupId; + /** + * 分支事务Id lcn里叫 kid + */ + public String branchId; + /** + * 提交信息 + */ + private List info = new ArrayList(); + + public int status; + /** + * 分支所在IP + */ + public String server; + + public List getInfo() { + return info; + } + + public void setInfo(List info) { + this.info = info; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + public String getBranchId() { + return branchId; + } + + public void setBranchId(String branchId) { + this.branchId = branchId; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public String getServer() { + return server; + } + + public void setServer(String server) { + this.server = server; + } +} \ No newline at end of file diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/TxcTable.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/TxcTable.java new file mode 100644 index 000000000..570ce4317 --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/TxcTable.java @@ -0,0 +1,74 @@ +package com.codingapi.tx.datasource.relational.txc.parser; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author caisirius + */ +public class TxcTable { + + public String schemaName; + public String tableName; + public String alias; + + private List line = new ArrayList(); + + + @Override + public String toString() { + StringBuilder localStringBuilder = new StringBuilder(); + for (int i = 0; i < this.line.size(); i++) { + + for (TxcField field : this.line.get(i).getFields()) { + + switch (field.getType()) { + case -15: + case -9: + case -6: + case -5: + case 1: + case 2: + case 4: + case 12: + case 2003: + localStringBuilder.append(field.getValue()).append(','); + default: + } + } + } + return localStringBuilder.toString(); + } + + public String getSchemaName() { + return schemaName; + } + + public void setSchemaName(String schemaName) { + this.schemaName = schemaName; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public String getAlias() { + return alias; + } + + public void setAlias(String alias) { + this.alias = alias; + } + + public List getLine() { + return line; + } + + public void setLine(List line) { + this.line = line; + } +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/UpdateParser.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/UpdateParser.java new file mode 100644 index 000000000..0bba4a28f --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/parser/UpdateParser.java @@ -0,0 +1,126 @@ +package com.codingapi.tx.datasource.relational.txc.parser; + +import com.alibaba.druid.sql.ast.expr.SQLValuableExpr; +import com.alibaba.druid.sql.ast.expr.SQLVariantRefExpr; +import com.alibaba.druid.sql.ast.statement.SQLUpdateSetItem; +import com.alibaba.druid.sql.ast.statement.SQLUpdateStatement; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.ListUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * @author jsy. + * @title + * @time 17/12/11. + */ +public class UpdateParser extends AbstractParser { + private static final Logger logger = LoggerFactory.getLogger(UpdateParser.class); + + + private static UpdateParser instance = null; + + public static UpdateParser getInstance() { + if (instance == null) { + synchronized (UpdateParser.class) { + if (instance == null) { + instance = new UpdateParser(); + } + } + } + return instance; + } + + @Override + protected List getWhereParams(List sqlParamsList, SQLUpdateStatement parseSqlStatement) { + if (CollectionUtils.isNotEmpty(sqlParamsList)) { + int size = 0; + + for (SQLUpdateSetItem sqlUpdateSetItem :parseSqlStatement.getItems()) { + if (sqlUpdateSetItem.getValue() instanceof SQLVariantRefExpr) { + size++; + } + } + return sqlParamsList.subList(size, sqlParamsList.size()); + } + return ListUtils.EMPTY_LIST; + + } + + + @Override + protected String getWhere(SQLUpdateStatement parseSqlStatement) { + return SqlUtils.toSQLString(parseSqlStatement.getWhere()); + } + + @Override + public TxcTable getPresentValue(List sqlParamsList, SQLUpdateStatement parseSqlStatement) { + + TxcTable txcTable = new TxcTable(); + txcTable.setTableName(parseSqlStatement.getTableName().getSimpleName()); + + TxcLine txcLine = new TxcLine(); + List items = parseSqlStatement.getItems(); + + int variantExpr = 0; + for (int i = 0; i < items.size(); i++) { + SQLUpdateSetItem sqlUpdateSetItem = items.get(i); + TxcField txcField = new TxcField(); + String cloumnName = SqlUtils.toSQLString(sqlUpdateSetItem.getColumn()).replace("\'", "").replace("`", "").trim(); + txcField.setName(cloumnName); + if (sqlUpdateSetItem.getValue() instanceof SQLVariantRefExpr) { + txcField.setValue(sqlParamsList.get(variantExpr++)); + } else if (sqlUpdateSetItem.getValue() instanceof SQLValuableExpr){ + txcField.setValue(SqlUtils.toSQLString(items.get(i).getValue())); + } else { + logger.info("不支持复杂的sql,{}", sqlUpdateSetItem.getClass().toString()); + throw new RuntimeException("不支持复杂的sql"); + } + + txcLine.getFields().add(txcField); + } + txcTable.getLine().add(txcLine); + + return txcTable; + } + + + + + @Override + public SQLType getSqlType() { + return SQLType.UPDATE; + } + + + @Override + protected String selectSql(SQLUpdateStatement mySqlUpdateStatement, String primaryKeyName) { + StringBuffer stringBuffer = new StringBuffer(); + stringBuffer.append("SELECT "); + + + List items = mySqlUpdateStatement.getItems(); + for (SQLUpdateSetItem sqlUpdateSetItem : items) { + + stringBuffer.append(SqlUtils.toSQLString(sqlUpdateSetItem.getColumn())).append(","); + + } + + stringBuffer.append(primaryKeyName); + + + + stringBuffer.append(" from ").append(mySqlUpdateStatement.getTableName().getSimpleName()).append(" where "); + + stringBuffer.append(SqlUtils.toSQLString(mySqlUpdateStatement.getWhere())); + return stringBuffer.toString(); + } + + @Override + protected String getTableName(SQLUpdateStatement parseSqlStatement) { + return parseSqlStatement.getTableName().getSimpleName(); + } + +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/AbstractRollback.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/AbstractRollback.java new file mode 100644 index 000000000..974c9a8bd --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/AbstractRollback.java @@ -0,0 +1,43 @@ +package com.codingapi.tx.datasource.relational.txc.rollback; + +import com.codingapi.tx.datasource.relational.txc.parser.CommitInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; + +/** + * @author jsy. + * @title + * @time 17/12/26. + */ +public abstract class AbstractRollback { + private Logger logger = LoggerFactory.getLogger(AbstractRollback.class); + + + public void rollback(CommitInfo commitInfo, Connection connection) throws SQLException { + + //check + boolean flag = canRollback(commitInfo, connection); + + + //rollback + if (flag) { + logger.info("rollback for sql:{}", commitInfo.getSql()); + List preparedStatements = assembleRollbackSql(commitInfo, connection); + + for (PreparedStatement preparedStatement : preparedStatements) { + preparedStatement.execute(); + } + logger.info("rollback sql success"); + } + } + + protected abstract List assembleRollbackSql(CommitInfo commitInfo, Connection connection) + throws SQLException; + + protected abstract boolean canRollback(CommitInfo commitInfo, Connection connection) throws SQLException; +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/DeleteRollback.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/DeleteRollback.java new file mode 100644 index 000000000..2a7f63251 --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/DeleteRollback.java @@ -0,0 +1,91 @@ +package com.codingapi.tx.datasource.relational.txc.rollback; + +import com.codingapi.tx.datasource.relational.txc.parser.CommitInfo; +import com.codingapi.tx.datasource.relational.txc.parser.TxcField; +import com.codingapi.tx.datasource.relational.txc.parser.TxcLine; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +/** + * @author jsy. + * @title + * @time 17/12/26. + */ +public class DeleteRollback extends AbstractRollback { + + private Logger logger = LoggerFactory.getLogger(DeleteRollback.class); + + private static DeleteRollback instance = null; + + public static DeleteRollback getInstance() { + if (instance == null) { + synchronized (DeleteRollback.class) { + if (instance == null) { + instance = new DeleteRollback(); + } + } + } + return instance; + } + + @Override + protected List assembleRollbackSql(CommitInfo commitInfo, Connection connection) + throws SQLException { + + ArrayList preparedStatements = Lists.newArrayList(); + String tableName = commitInfo.getOriginalValue().getTableName(); + + for (TxcLine txcLine : commitInfo.getOriginalValue().getLine()) { + List txcFields = txcLine.getFields(); + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("insert into ").append(tableName).append("("); + + for (int i = 0; i < txcFields.size(); i++) { + if (i == txcFields.size() - 1) { + stringBuilder.append(txcFields.get(i).getSqlName()).append(")"); + } else { + stringBuilder.append(txcFields.get(i).getSqlName()).append(","); + } + } + stringBuilder.append(" value ").append("("); + + for (int i = 0; i < txcFields.size(); i++) { + if (i == txcFields.size() - 1) { + stringBuilder.append("?").append(")"); + } else { + stringBuilder.append("?").append(","); + } + } + String sql = stringBuilder.toString(); + PreparedStatement preparedStatement = connection.prepareStatement(sql); + + for (int i = 1; i <= txcFields.size(); i++) { + preparedStatement.setObject(i, txcFields.get(i - 1).getValue()); + } + preparedStatements.add(preparedStatement); + } + + + return preparedStatements; + } + + @Override + protected boolean canRollback(CommitInfo commitInfo, Connection connection) throws SQLException { + + + if ( commitInfo.getOriginalValue().getLine().size() == 0) { + logger.error("未新影响行数,不回滚"); + return false; + } + + return true; + + } +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/DiffUtils.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/DiffUtils.java new file mode 100644 index 000000000..7a93961bc --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/DiffUtils.java @@ -0,0 +1,108 @@ +package com.codingapi.tx.datasource.relational.txc.rollback; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; + +import java.io.IOException; +import java.io.StringReader; +import java.math.BigDecimal; + +/** + * @author jsy. + * @title + * @time 17/12/14. + */ + +public class DiffUtils { + private static final ObjectMapper objectMapper = new ObjectMapper(); + + static { + objectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true); + objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, true); + SimpleModule simpleModule = new SimpleModule(); + simpleModule.addSerializer(BigDecimal.class, new JsonSerializer() { + @Override + public void serialize(BigDecimal value, JsonGenerator gen, SerializerProvider serializers) + throws IOException, JsonProcessingException { + gen.writeString(value.setScale(2).toString()); + } + }); + + simpleModule.addSerializer(StringReader.class, new JsonSerializer() { + @Override + public void serialize(StringReader value, JsonGenerator gen, SerializerProvider serializers) + throws IOException, JsonProcessingException { + gen.writeString(read(value)); + } + }); + + objectMapper.registerModule(simpleModule); + } + + public static boolean diff(Object oldDifDto, Object curDifDto) { + try { + String old = objectMapper.writeValueAsString(oldDifDto); + + String cur = objectMapper.writeValueAsString(curDifDto); + JsonNode oldJsonNode = objectMapper.readTree(old); + JsonNode curJsonNode = objectMapper.readTree(cur); + + if (oldJsonNode.equals(curJsonNode)) { + return true; + } + + return false; + } catch (Exception e) { + return false; + } + + } + + + public static ObjectMapper getObjectMapper() { + return objectMapper; + } + + + public static String read(StringReader stringReader) { + StringBuilder stringBuilder = new StringBuilder(); + try { + stringReader.reset(); + int c; + while ((c = stringReader.read()) != -1) { + stringBuilder.append((char)c); + } + return stringBuilder.toString(); + } catch (IOException e) { + return ""; + } + } + + public static void main(String[] args) { +// BigDecimal bigDecimal = new BigDecimal("50.00"); +// BigDecimal bigDecimal1 = new BigDecimal("50"); +// +// try { +// String s = objectMapper.writeValueAsString(bigDecimal); +// String s1 = objectMapper.writeValueAsString(bigDecimal1); +// System.out.println(s); +// System.out.println(s1); +// } catch (JsonProcessingException e) { +// e.printStackTrace(); +// } + + StringReader test = new StringReader("a:2:{s:20:\"php_serialize_option\";s:1:\" \";s:9:\"orderdata\";a:1:{i:0;a:22:{s:11:\"refund_type\";s:1:\"0\";s:8:\"dateline\";s:1:\"0\";s:11:\"mk_order_id\";s:4:\"null\";s:5:\"stype\";s:2:\"16\";s:11:\"sl_nickname\";s:10:\"zhigb_0016\";s:3:\"num\";s:1:\"1\";s:5:\"ptype\";s:2:\"23\";s:5:\"title\";s:55:\"lqq田酞递址秽梳猜欣勾翅#169779075稿件中标\";s:10:\"sl_user_id\";s:8:\"19182259\";s:7:\"link_id\";s:1:\"0\";s:11:\"offer_price\";s:3:\"0.0\";s:5:\"mtype\";s:1:\"0\";s:12:\"product_pkid\";s:9:\"169779075\";s:7:\"data_id\";s:8:\"90567713\";s:7:\"user_id\";s:8:\"19182244\";s:5:\"price\";s:4:\"50.0\";s:11:\"refund_time\";s:1:\"0\";s:8:\"nickname\";s:18:\"靖哥哥的店铺\";s:13:\"refund_amount\";s:3:\"0.0\";s:8:\"order_id\";s:8:\"90567198\";s:8:\"at_price\";s:4:\"50.0\";s:12:\"refund_state\";s:1:\"0\";}}}"); + String read = read(test); + + + System.out.println(read); + } + +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/InsertRollback.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/InsertRollback.java new file mode 100644 index 000000000..c222d2fd6 --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/InsertRollback.java @@ -0,0 +1,111 @@ +package com.codingapi.tx.datasource.relational.txc.rollback; + +import com.codingapi.tx.datasource.relational.txc.TxcSqlExecutor; +import com.codingapi.tx.datasource.relational.txc.parser.CommitInfo; +import com.codingapi.tx.datasource.relational.txc.parser.TxcField; +import com.codingapi.tx.datasource.relational.txc.parser.TxcLine; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; + +/** + * @author jsy. + * @title + * @time 17/12/25. + */ +public class InsertRollback extends AbstractRollback{ + + private Logger logger = LoggerFactory.getLogger(InsertRollback.class); + + private static InsertRollback instance = null; + + public static InsertRollback getInstance() { + if (instance == null) { + synchronized (InsertRollback.class) { + if (instance == null) { + instance = new InsertRollback(); + } + } + } + return instance; + } + + + @Override + protected List assembleRollbackSql(CommitInfo commitInfo, Connection connection) + throws SQLException { + + TxcLine txcLine = commitInfo.getPresentValue().getLine().get(0); + String tableName = commitInfo.getPresentValue().getTableName(); + String sql = "delete from " + tableName + " where " + txcLine.getPrimaryKey() + "= ?"; + PreparedStatement preparedStatement = connection.prepareStatement(sql); + + preparedStatement.setObject(1, txcLine.getPrimaryValue()); + return Lists.newArrayList(preparedStatement); + } + + + private PreparedStatement assembleQuerySql(TxcLine txcLine, String tableName, Connection connection) + throws SQLException { + List txcFields = txcLine.getFields(); + //查询db数据 + StringBuffer stringBuffer = new StringBuffer(); + stringBuffer.append("select "); + + for (int j = 0; j < txcFields.size(); j++) { + if (j != txcFields.size() - 1) { + stringBuffer.append(txcFields.get(j).getSqlName()).append(","); + } else { + stringBuffer.append(txcFields.get(j).getSqlName()); + } + } + + stringBuffer.append(" from ").append(tableName).append(" where ").append(txcLine.getPrimaryKey()).append("= ?"); + + String sql = stringBuffer.toString(); + PreparedStatement preparedStatement = connection.prepareStatement(sql); + preparedStatement.setObject(1, txcLine.getPrimaryValue()); + return preparedStatement; + } + + + + @Override + protected boolean canRollback(CommitInfo commitInfo, Connection connection) throws SQLException { + List txcLines = commitInfo.getPresentValue().getLine(); + + if (txcLines.size() > 1) { + logger.error("insert操作,影响行数大于1,不支持回滚"); + return false; + } + + if ( txcLines.size() == 0) { + logger.error("未新影响行数,不回滚"); + return false; + } + + TxcLine txcLine = txcLines.get(0); + + // 查询db数据 + PreparedStatement preparedStatement = assembleQuerySql(txcLine, commitInfo.getPresentValue().getTableName(), connection); + List dbValue = TxcSqlExecutor.executeQuery(preparedStatement); + + boolean diff = DiffUtils.diff(txcLines, dbValue); + if (!diff) { + try { + logger.error("数据不一致,不支持回滚操作, before:{},after:{}", + DiffUtils.getObjectMapper().writeValueAsString(txcLines), + DiffUtils.getObjectMapper().writeValueAsString(dbValue)); + } catch (Exception e) { + logger.error("error", e); + } + return false; + } + return true; + } +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/TxcRollbackDataSource.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/TxcRollbackDataSource.java new file mode 100644 index 000000000..cb1c4accf --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/TxcRollbackDataSource.java @@ -0,0 +1,34 @@ +package com.codingapi.tx.datasource.relational.txc.rollback; + +import org.springframework.stereotype.Component; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +/** + * @author jsy. + * @title + * @time 17/12/26. + */ +@Component +public class TxcRollbackDataSource { + + private Map dataSourceMap = new HashMap<>(); + + public void setDataSourceMap(Map dataSourceMap) { + this.dataSourceMap = dataSourceMap; + } + + public Connection getConnectionByDbName(String dbName) throws SQLException { + + DataSource dataSource = dataSourceMap.get(dbName); + if (dataSource == null) { + throw new SQLException("datasource do not exist, name: " + dbName); + } + + return dataSourceMap.get(dbName).getConnection(); + } +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/TxcRollbackService.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/TxcRollbackService.java new file mode 100644 index 000000000..d4a30480d --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/TxcRollbackService.java @@ -0,0 +1,18 @@ +package com.codingapi.tx.datasource.relational.txc.rollback; + + +import com.codingapi.tx.datasource.relational.txc.parser.CommitInfo; + +/** + * @author jsy. + * @title + * @time 17/12/22. + */ +public interface TxcRollbackService { + + + /**执行回滚 + * @param commitInfo + */ + void rollback(CommitInfo commitInfo); +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/TxcRollbackServiceImpl.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/TxcRollbackServiceImpl.java new file mode 100644 index 000000000..e2f651e13 --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/TxcRollbackServiceImpl.java @@ -0,0 +1,56 @@ +package com.codingapi.tx.datasource.relational.txc.rollback; + +import com.codingapi.tx.datasource.relational.txc.parser.CommitInfo; +import com.codingapi.tx.datasource.relational.txc.parser.SQLType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.sql.Connection; +import java.sql.SQLException; + +/** + * @author jsy. + * @title + * @time 17/12/22. + */ +@Component +public class TxcRollbackServiceImpl implements TxcRollbackService { + + private Logger logger = LoggerFactory.getLogger(TxcRollbackServiceImpl.class); + + @Autowired + private TxcRollbackDataSource rollbackDataSource; + + @Override + public void rollback(CommitInfo commitInfo) { + // 每次需要新获取一个连接 + Connection connection = null; + try { + connection = rollbackDataSource.getConnectionByDbName(commitInfo.getSchemaName()); + if (commitInfo.getSqlType() == SQLType.UPDATE) { + UpdateRollback.getInstance().rollback(commitInfo, connection); + } + + if (commitInfo.getSqlType() == SQLType.INSERT) { + InsertRollback.getInstance().rollback(commitInfo, connection); + } + + if (commitInfo.getSqlType() == SQLType.DELETE) { + DeleteRollback.getInstance().rollback(commitInfo, connection); + } + } catch (Exception e) { + logger.error("rollback error, sql:{}", commitInfo.getSql(), e); + } finally { + try { + if (connection != null) { + connection.close(); + } + } catch (SQLException e) { + logger.error("close error", e); + } + } + + } +} diff --git a/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/UpdateRollback.java b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/UpdateRollback.java new file mode 100644 index 000000000..8dab1a16c --- /dev/null +++ b/tx-plugins-db/src/main/java/com/codingapi/tx/datasource/relational/txc/rollback/UpdateRollback.java @@ -0,0 +1,111 @@ +package com.codingapi.tx.datasource.relational.txc.rollback; + +import com.alibaba.druid.sql.ast.statement.SQLUpdateStatement; +import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser; +import com.codingapi.tx.datasource.relational.txc.parser.CommitInfo; +import com.codingapi.tx.datasource.relational.txc.parser.TxcField; +import com.codingapi.tx.datasource.relational.txc.parser.TxcLine; +import com.codingapi.tx.datasource.relational.txc.parser.TxcTable; +import com.codingapi.tx.datasource.relational.txc.parser.UpdateParser; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +/** + * @author jsy. + * @title + * @time 17/12/14. + */ + +public class UpdateRollback extends AbstractRollback{ + private Logger logger = LoggerFactory.getLogger(UpdateRollback.class); + + + private static UpdateRollback instance = null; + + public static UpdateRollback getInstance() { + if (instance == null) { + synchronized (UpdateRollback.class) { + if (instance == null) { + instance = new UpdateRollback(); + } + } + } + return instance; + } + + + + + @Override + protected List assembleRollbackSql(CommitInfo commitInfo, Connection connection) + throws SQLException { + + ArrayList preparedStatements = Lists.newArrayList(); + String tableName = commitInfo.getOriginalValue().getTableName(); + + for (TxcLine txcLine : commitInfo.getOriginalValue().getLine()) { + List txcFields = txcLine.getFields(); + + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("update ").append(tableName).append(" ").append("set "); + + for (int i = 0; i < txcFields.size(); i++) { + if (i == txcFields.size() - 1) { + stringBuilder.append(txcFields.get(i).getSqlName()).append("=").append("?"); + } else { + stringBuilder.append(txcFields.get(i).getSqlName()).append("=").append("?").append(","); + } + } + String sql = stringBuilder.append(" where ").append(txcLine.getPrimaryKey()).append("=?").toString(); + PreparedStatement preparedStatement = connection.prepareStatement(sql); + + for (int j = 1; j <= txcFields.size(); j++) { + preparedStatement.setObject(j, txcFields.get(j - 1).getValue()); + } + + preparedStatement.setObject(1 + txcFields.size(), txcLine.getPrimaryValue()); + preparedStatements.add(preparedStatement); + } + + return preparedStatements; + + } + + @Override + protected boolean canRollback(CommitInfo commitInfo, Connection connection) throws SQLException { + String sql = commitInfo.getSql(); + SQLUpdateStatement sqlParseStatement = (SQLUpdateStatement) new MySqlStatementParser(sql).parseStatement(); + + TxcTable dbValue = UpdateParser.getInstance() + .getOriginValue(commitInfo.getWhereParams(), sqlParseStatement, connection); + + if ( commitInfo.getOriginalValue().getLine().size() == 0) { + logger.error("未影响行数,不回滚"); + return false; + } + + for (TxcLine txcLine : dbValue.getLine()) { + boolean diff = DiffUtils.diff(commitInfo.getPresentValue().getLine().get(0), txcLine); + if (!diff) { + try { + logger.error("数据不一致,不支持回滚操作, before:{},after:{}", + DiffUtils.getObjectMapper().writeValueAsString(commitInfo.getPresentValue().getLine().get(0)), + DiffUtils.getObjectMapper().writeValueAsString(txcLine)); + } catch (Exception e) { + logger.error("error", e); + } + return false; + } + } + + return true; + } + +}