低级知识传播者资料

本文主要介绍低级知识传播者资料 方法和在新技术下所面对的“挑战”,方便大家深入理解低级知识传播者资料 过程。本文也将分享低级知识传播者资料 所遇到的问题和应对策略,怎么解决怎么做的问题。
通过深入本文可以理解代码原理,进行代码文档的下载,也可以查看相应 Demo 部署效果。

曹工杂谈:分布式事务解决方案之基于本地消息表实现最终一致性

前言

为什么写这个?其实我这边的业务场景,严格来说,不算是典型的分布式事务,需求是这样说的:因为我这边负责的一个服务消费者consumer,是用户登录的入口;正常情况下,登录时候要走用户中心,这是个单独的服务;如果用户中心挂了,我这边自然是没法登录的。

现在的需求就是说,假设用户中心挂了,也要可以正常登录。因为我这个consumer其实也是缓存了用户的数据的,在本地登录也可以的,如果在我本地登录的话,我就得后续等用户中心恢复后,再把相关状态同步过去。

基于这样一个需求,我这边的实现方案是:

1.配置文件里维护一个开关,表示是否开启:故障转移模式。暂不考虑动态修改开关(如果要做,简单做就提供个接口来改;复杂做,就放到配置中心里,我们现在用的nacos,可以改了后推送到服务端)

2.如果开关是打开的,表示需要进行故障转移,则登录、退出登录等各种需要访问用户中心的请求,都存储到数据库中;数据库会有一张表,用来存放这类请求。大致如下:

 CREATE TABLE `cached_http_req_to_resend` (   `http_req_id` bigint(20) NOT NULL COMMENT '主键',   `req_type` tinyint(4) NOT NULL COMMENT '请求类型,1:推送待处置结果给第三方系统',   `third_sys_feign_name` varchar(30) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '第三方系统的名称,和feignClient的保持一致',   `http_req_body` varchar(4000) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '请求体',   `current_state` tinyint(4) DEFAULT NULL COMMENT '该请求当前状态,1:成功;2:失败;3:待处理;4:失败次数过多,放弃尝试',   `fail_count` tinyint(4) DEFAULT NULL COMMENT '截止目前,失败次数;超过指定次数后,将跳过该请求',   `success_time` datetime DEFAULT NULL COMMENT '请求成功发送的时间',   `create_time` datetime DEFAULT NULL COMMENT '创建时间',   `related_entity_id` bigint(21) DEFAULT NULL COMMENT '相关的实体的id,比如在推送待处置警情时,这个id为处警id',   PRIMARY KEY (`http_req_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci 

3.单独开一个schedule线程,定时去扫这个表,发现有需要处理的,就去重新发送请求就行了,成功了的,直接更新状态为success。

这个模式,其实就算是分布式事务中的:本地消息表方案了。

本地消息表,有一个注意的点,就是要把保存消息的操作和业务相关操作,放到同一个事务中,这样可以确保,业务成功了,消息肯定是落库了的,很可靠。然后再开启个定时任务,去扫描消息表即可。

我这边不是发消息,而是发请求,道理是类似的。

下面开始基于代码demo来讲解。

代码结构

这边就是简单的几个module,基于spring cloud开发了一个服务提供者和一个服务消费者。服务提供者对外暴露的接口,通过api.jar的形式,提供给消费者,这种算是强耦合了,有优点,也有缺点,这里就不讨论了。

消费者通过feign调用服务提供者。有人会问,不需要eureka这些东西吗,其实是可以不需要的,我们直接在ribbon的配置中,把服务对应的:ip和端口写死就完了。

低级知识传播者

我们这里就是,消费者访问服务提供者,正常情况下直接访问就行了;但我们这里,模拟的就是服务A访问不了的情况,所以会直接把请求落库,后续由定时线程去处理。

服务提供者-api

我们看看服务提供者api,里面仅有一个接口:

public interface FeignServiceA {     /**      *      * @return      */     @RequestMapping("/login")     public Message<LoginRespVO> login(@RequestBody LoginReqVO loginReqVO); }  

低级知识传播者

服务提供者的逻辑

低级知识传播者

其中,逻辑如下:

@RestController @Slf4j public class DemoController extends BaseController implements FeignServiceA {  	// 1     @Override     public Message<LoginRespVO> login(@RequestBody LoginReqVO loginReqVO) {         log.info("login  is ok,param:{}", loginReqVO);          LoginRespVO vo = new LoginRespVO();         vo.setUserName(loginReqVO.getUserName());         vo.setAge(11);         vo.setToken(UUID.randomUUID().toString());          return successResponse(vo);     } } 

这里1处就是提供了一个接口,接口里返回一点点信息。测试一下:

低级知识传播者

服务消费者之正常请求服务提供者

pom.xml中依赖服务提供者的api

<dependency>    <groupId>com.example</groupId>    <artifactId>service-provider-A-api</artifactId>    <version>0.0.1-SNAPSHOT</version> </dependency>  

feign client代码

我们需要写一个接口,继承其feign api。

@FeignClient(value = "SERVICE-A") public interface RpcServiceForServiceA extends FeignServiceA { } 

要调用的时候,怎么弄呢? 直接注入该接口,然后调用对应的方法就行了,这样就可以了。

@Autowired private RpcServiceForServiceA rpcServiceForServiceA;  Message<LoginRespVO> message = rpcServiceForServiceA.login(reqVO); 

但是,我们好像没有配置注册中心之类的东西,这个我们可以绕过,因为最终发起调用的是,ribbon这个组件。

ribbon提供了几个接口,其中一个,就是用来获取服务对应的实例列表。

这里要说的,就是下面这个接口:

package com.netflix.loadbalancer;  import java.util.List;  /**  * Interface that defines the methods sed to obtain the List of Servers  * @author stonse  *  * @param <T>  */ public interface ServerList<T extends Server> {      public List<T> getInitialListOfServers();          /**      * Return updated list of servers. This is called say every 30 secs      * (configurable) by the Loadbalancer's Ping cycle      *       */     public List<T> getUpdatedListOfServers();     }  

这个接口,有多个实现,ribbon自带了几个实现,然后eureka 、nacos的客户端,都自己进行了实现。

低级知识传播者

ribbon自带的实现中,有一个叫做:

public class ConfigurationBasedServerList extends AbstractServerList<Server>  {  	private IClientConfig clientConfig;     ...  	@Override 	public List<Server> getUpdatedListOfServers() {         // 1         String listOfServers = clientConfig.get("listOfServers");         return derive(listOfServers); 	} 

1处可以看到,它获取服务对应的实例,就是通过去配置文件里获取listOfServers这个key中配置的那些。

总之,最终我们向下面这样配置就行了:

SERVICE-A.ribbon.ReadTimeout=3000 SERVICE-A.ribbon.listOfServers=localhost:8082 SERVICE-A.ribbon.NIWSServerListClassName=com.netflix.loadbalancer.ConfigurationBasedServerList 

这里的前缀,SERVICE-A和之前下面这个地方一致就行了:

@FeignClient(value = "SERVICE-A") public interface RpcServiceForServiceA extends FeignServiceA { } 

正常情况下,就说完了,直接调用就行,和httpclient调用没啥本质差别。只不过ribbon提供了负载均衡、重试等各种功能。

设计表结构,在使用故障转移模式时,保存请求

表结构我前面已经贴了,这里就展示下数据吧(可点击放大查看):

低级知识传播者

保存请求的代码很简单:

	@Override     public LoginRespVO login(LoginReqVO reqVO) {         boolean failOverModeOn = isFailOverModeOn();         /**          * 故障转移没有开启,则正常调用服务          */         if (!failOverModeOn) { 			...             return ...;         }          /**          * 1 使用本地数据进行服务,并将请求保存到数据库中          */         iCachedHttpReqToResendService.saveLoginReqWhenFailOver(reqVO);          /**          * 返回一个 dummy 数据          */         return new LoginRespVO();     } 

上面的1处,就会保存请求到数据库。

定时线程消费逻辑

概览

定时线程这边,我设计得比较复杂一点。因为实际场景中,上面的表中,会存储多个第三方服务的请求;比如service-A,service-B。

所以,这里的策略是:

低级知识传播者

简单来说,就是定时线程,拿到任务后,按照第三方服务的名字来进行group by操作,比如,要发送到service-A的请求放一起,按时间排好序;要发送给service-B的放一起,排好序。

然后找到service-A,service-B各自对应的处理器,然后把数据丢给这些处理器;处理器拿到后,就会放到阻塞队列里;

然后此时worker线程就会被阻塞队列给唤醒,唤醒后,就去开始处理这些请求,包括发起feign调用,并且更新结果到数据库中。

定时线程入口

@Scheduled(cron = "0/30 * * * * ?  ") public void sendCachedFeignReq() {     Thread.currentThread().setName("SendCachedFeignReqTask");     log.info("start sendCachedFeignReq");      /**      * 1、获取锁      */     boolean success = iCommonDistributedLockService.tryLock(DISTRIBUTED_LOCK_ENUM.SEND_CACHED_FEIGN_REQ_TO_REMOTE_SERVER.lockName, DISTRIBUTED_LOCK_ENUM.SEND_CACHED_FEIGN_REQ_TO_REMOTE_SERVER.expireDurationInSeconds);      /**      * 进行业务逻辑处理      */     iCachedHttpReqToResendService.processCachedFeignReqForLoginLogout(); 	...  } 

这里还加了个分布式锁的操作,用数据库实现的,还没经过充分测试,可能会有点小问题,不过不是重点。

下面看看业务逻辑:

@Override public void processCachedFeignReqForLoginLogout() {     // 1     String[] feignClients = {EFeignClient.SERVICE_A.getName()};     // 2     for (String feignClient : feignClients) {          /**          * 3 从数据库获取要发送到该服务的请求          */         List<CachedHttpReqToResend> recordsFromDb = getRecordsFromDb(feignClient);         if (CollectionUtils.isEmpty(recordsFromDb)) {             continue;         }                /**          * 4 根据feign client,找到对应的处理器          */         CachedHttpReqProcessor cachedHttpReqProcessor = cachedHttpReqProcessors.stream().filter(item -> item.support(feignClient)).findFirst().orElse(null);         if (cachedHttpReqProcessor == null) {             throw new RuntimeException();         }          /**          * 5  利用对应的处理器,处理该部分请求          */         cachedHttpReqProcessor.process(recordsFromDb);     } } 
  • 1,定义一个数组,数组中包括所有要处理的第三方系统
  • 2,遍历
  • 3,根据该serviceName,比如,根据service-A,去数据库查询对应的请求(这里可能和前面的图有点出入,以这里的代码为准)
  • 4,根据该service-A,找到对应的处理器
  • 5,利用第四步找到的处理器,来处理第三步中查到的数据

怎么找到service-A对应的处理器

我们先看看处理器这个接口:

public interface CachedHttpReqProcessor {     /**      * 该处理器是否支持处理该service      * @param feignClientName      * @return      */     boolean support(String feignClientName);       /**      * 具体的处理逻辑      * @param list      */     void process(Collection<CachedHttpReqToResend> list);      /**      * worker线程的名字      * @return      */     String getThreadName(); } 

然后看看针对service-A的处理器,是怎么实现的:

@Service public class CachedHttpReqProcessorForServiceA extends AbstractCachedHttpReqProcessor {    	// 1     @Override     public boolean support(String feignClientName) {         return Objects.equals(EFeignClient.SERVICE_A.getName(), feignClientName);     }      @Override     public String getThreadName() {         return "CachedHttpReqProcessorForServiceA";     } 

1处,判断传入的feign客户端,是否等于EFeignClient.SERVICE_A,如果是,说明找到了对应的处理器。

我们这里将这个service,注册为了bean;在有多个serviceA,serviceB的时候,就会有多个CachedHttpReqProcessor处理器。

我们在之前的上层入口那里,就注入了一个集合:

    @Autowired     private List<CachedHttpReqProcessor> cachedHttpReqProcessors;  

然后在筛选对应的处理器时,就是通过遍历这个集合,找到合适的处理器。

具体的,大家可以把代码拉下来看看。

CachedHttpReqProcessor的处理逻辑

对于serviceA,serviceB,service C,由于处理逻辑很大部分是相同的,我们这里提取了一个抽象类。

@Slf4j public abstract class AbstractCachedHttpReqProcessor implements CachedHttpReqProcessor {    private LinkedBlockingQueue<CachedHttpReqToResend> blockingQueue = new LinkedBlockingQueue<>(500);  private AtomicBoolean workerInited = new AtomicBoolean(false);  Thread workerThread;  @Override public void process(Collection<CachedHttpReqToResend> list) {     if (CollectionUtils.isEmpty(list)) {         return;     }      /**      * 1 直到有任务要处理时(该方法被调用时),才去初始化线程      */     if (workerInited.compareAndSet(false, true)) {         // 2         workerThread = new Thread(new InnerWorker());         workerThread.setDaemon(true);         workerThread.setName(getThreadName());         workerThread.start();     }      /**      * 放到阻塞队列里      */     blockingQueue.addAll(list); } 

我们这里1处,给每个处理器,定义了一个工作线程,且只在本方法被调用时,才去初始化该线程;为了防止并发,使用了AtomicBoolean,保证只会初始化一次。

2处,给线程设置了Runnable,它会负责实际的业务处理。

然后3处,直接把要处理的任务,丢到阻塞队列即可。

Worker的处理逻辑

任务已经是到了阻塞队列了,那么,谁去处理呢,就是worker了。如果大家忘了整体的设计,可以回去看看那张图。

public abstract boolean doProcess(Integer reqType, CachedHttpReqToResend cachedHttpReqToResend);  /**  * 从队列取数据;取到后,调用子类的方法去处理;  * 子类处理后,返回处理结果  * 根据结果,设置成功或者失败的状态  */ public  class InnerWorker implements Runnable {      @Override     public void run() {         while (true) {             // 1             boolean interrupted = Thread.currentThread().isInterrupted();             if (interrupted) {                 log.info("interrupted ,break out");                 break;             }            			// 2             CachedHttpReqToResend cachedHttpReqToResend;             try {                 cachedHttpReqToResend = blockingQueue.take();             } catch (InterruptedException e) {                 log.info("interrupted,e:{}", e);                 break;             } 			// 3             Integer reqType = cachedHttpReqToResend.getReqType();             if (reqType == null) {                 continue;             }              try {                 /**                  * 4 使用模板方法设计模式,交给子类去实现                  */                 boolean success = doProcess(reqType, cachedHttpReqToResend);  				// 5                 if (!success) {                     cachedHttpReqToResend.setFailCount(cachedHttpReqToResend.getFailCount() + 1);                 } else {              cachedHttpReqToResend.setCurrentState(CachedHttpReqToResend.CURRENT_STATE_SUCCESS);                     cachedHttpReqToResend.setSuccessTime(new Date());                 } 				// 6                 boolean count = iCachedHttpReqToResendService.updateById(cachedHttpReqToResend);                 if (count) {                     log.debug("update sucess");                 }             } catch (Throwable throwable) {                 log.error("e:{}", throwable);                 continue;             }          }      } } 
  • 1,判断是否被中断了,这样可以在程序关闭时,感知到;避免线程泄漏
  • 2,从阻塞队列中,获取任务
  • 3,判断请求类型是否为null,这个是必须要的
  • 4,使用模板方法设计模式,具体逻辑,具体怎么发请求,谁去发,交给子类实现
  • 5、6,根据结果,更新这条数据的状态。

子类中的具体逻辑

我们这里贴个全貌:

@Service @Slf4j public class CachedHttpReqProcessorForServiceA extends AbstractCachedHttpReqProcessor {     @Autowired     private FeignServiceA feignServiceA;      @Autowired     private ObjectMapper objectMapper;      @Override     public boolean support(String feignClientName) {         return Objects.equals(EFeignClient.SERVICE_A.getName(), feignClientName);     }      @Override     public String getThreadName() {         return "CachedHttpReqProcessorForServiceA";     }       /**      * 1 根据请求type字段,我们就知道是要发送哪一个请求      * @param reqType      * @param cachedHttpReqToResend      * @return      */     @Override     public boolean doProcess(Integer reqType, CachedHttpReqToResend cachedHttpReqToResend) {         switch (reqType) {             // 2             case CachedHttpReqToResend.REQ_TYPE_LOGIN_TO_SERVICE_A: {                 // 3                 String httpReqBody = cachedHttpReqToResend.getHttpReqBody();                 try {                     // 4                     LoginReqVO loginReqVO = objectMapper.readValue(httpReqBody, LoginReqVO.class);                     /**                      * 5 发起登录                      */                     Message<LoginRespVO> message = feignServiceA.login(loginReqVO);                     boolean success = FeignMsgUtils.isSuccess(message);                     return success;                 } catch (Throwable e) {                     log.error("e:{}", e);                     return false;                 }              }         }          return true;     } } 
  • 1,这个类就是实现了父类中的抽象方法,这里体现的就是模板方法设计模式
  • 2,根据请求type,判断要访问哪个接口
  • 3,4,将请求体进行反序列化
  • 5,发起请求,调用feign。

代码如何使用

具体的代码,我放在了:

https://gitee.com/ckl111/all-simple-demo-in-work-1/tree/master/blockingqueue-consumer-producer

建表语句:

https://gitee.com/ckl111/all-simple-demo-in-work-1/tree/master/blockingqueue-consumer-producer/consumer/sql

服务提供者A的访问入口:

curl -i -X POST     -H "Content-Type:application/json"     -d  '{    "userName": "zhangsan",   "password":"123" }'   'http://localhost:8082/login' 

服务消费者的application.properties中:

failover.mode=true 

这个为true时,就是故障转移模式,访问如下接口时,请求会落库

http://localhost:8081/login.do

为false的话,就会直接进行feign调用。

代码中的bug

其实这个代码是有bug的,因为我们是定时线程,假设每隔30s执行,那假设我一开始取了10条出来,假设全部放到队列了,阻塞队列此时有10条,假设worker处理特别慢,30s内也没执行完的话,定时线程会再次取出状态没更新的那个任务,又丢到队列里。

任务就被重复消费了。

大家可以想想怎么处理这个问题,通过这个bug,我也发现,blockingqueue是一种比较彻底的解耦方式,但是,我们这里的业务,解耦了吗,如果业务不是解耦的,用这个方式,其实是有点问题。

过两天我再更新这部分的方案,生产者和消费者,这里还是需要通信的,才能避免任务重复消费的问题。

总结

要实现一个本地消息表最终一致性方案,有一定开发量,而且我这里,消费过程中,强行引入了多线程和生产者、消费者模式,增加了部分复杂度。

不过,代码不就是要多折腾吗?

低级知识传播者资料部分资料来自网络,侵权毕设源码联系删除

区块链毕设网(www.qklbishe.com)全网最靠谱的原创区块链毕设代做网站
部分资料来自网络,侵权联系删除!
资源收费仅为搬运整理打赏费用,用户自愿支付 !
qklbishe.com区块链毕设代做网专注|以太坊fabric-计算机|java|毕业设计|代做平台 » 低级知识传播者资料

提供最优质的资源集合

立即查看 了解详情