安然写字的地方

关山难越,谁悲失路之人?萍水相逢,尽是他乡之客

0%

vert.x

vert.x 已经提供了很便利的 NetClinet,NetServer,这个EventBus Bridge提供的核心能力到底是什么呢?

Server和Client之间采用这种方式进行通信可以让Server端的代码更加清晰明了
Client和Server之间的交互模式也更简单一些:

  • 对于request-response,server简单地message.reply即可
  • 对于某些广播消息,client向多个server实例广播,或者是server向多个client广播,又或某些client也想接收另外的client向server的广播都非常简单和直观。
  • 通过注册地址,client也可以收到感兴趣的消息,不论是server发的,还是client发的

这里有个例子
https://github.com/foxgem/how-to/tree/master/vertx-tcp-eventbridge/src/main/groovy/foxgem

  • client向多个bridge send,则只有一个bridge可以收到
  • client向多个bridge publish,则都能收到
  • server和client之间的request-response
  • client注册要接受某地址的消息

vert.x 的EventBus有单机和cluster,bridge和cluster之间的区别又是什么?

EventBus 主要面向的是 verticles(类似actor)间的消息通信,cluster是为了将 EventBus 抽象为分布式的,同时提供了分布式网络下共识的能力

bridge 我理解则是通过跨进程的交互方式,让eventbus成为跨语言通信的 polyglot

EventBus虽然有tcp bridge,但是本意是用做verticles之间的通信用的

并不是用作客户端和服务器端之间的通信用的,正确的通信姿势应该是用vert.x的各种clients & servers

详细使用代码参见测试用例:
https://github.com/vert-x3/vertx-tcp-eventbus-bridge/blob/master/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridgeTest.java

参考

如何理解事务中的一致性

单机事务一致性的定义

可以理解单机一致性是:应用系统从一个正确的状态到另一个正确的状态,而ACID就是说事务能够通过AID来保证这个C的过程,C是目的,AID都是方法,ACID中的一致性是数据库保证所有的约束没有被打破

关于分布式事务的文章

关于分布式事务开源框架

补充资料

大型SOA架构体系里的数据一致性问题 
http://www.360doc.com/content/17/0212/09/13792507_628391906.shtml

从CRUD编程切换到事件溯源和区块链编程
http://www.jdon.com/49396

事件概念正在重塑分布式系统的未来
http://www.jdon.com/49368

什么是事件溯源Event Sourcing?
http://www.jdon.com/48501
https://github.com/baihui212/tsharding

保障数据一致性的6种方案
http://www.360doc.com/content/16/0419/22/31851752_552140045.shtml

常规方案

git rebase 方案

1
git commit --amend --author="anruence <anruence@gmail.com>"

官方方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
git filter-branch --env-filter '
WRONG_EMAIL="anran.war@alibaba-inc.com"
NEW_NAME="anruence"
NEW_EMAIL="anruence@gmail.com"

if [ "$GIT_COMMITTER_EMAIL" = "$WRONG_EMAIL" ]
then
export GIT_COMMITTER_NAME="$NEW_NAME"
export GIT_COMMITTER_EMAIL="$NEW_EMAIL"
fi
if [ "$GIT_AUTHOR_EMAIL" = "$WRONG_EMAIL" ]
then
export GIT_AUTHOR_NAME="$NEW_NAME"
export GIT_AUTHOR_EMAIL="$NEW_EMAIL"
fi
' --tag-name-filter cat -- --branches --tags

作用

Arthas 是Alibaba开源的Java诊断工具,深受开发者喜爱

当你遇到以下类似问题而束手无策时,Arthas可以帮助你解决:

这个类从哪个 jar 包加载的?为什么会报各种类相关的 Exception?
我改的代码为什么没有执行到?难道是我没 commit?分支搞错了?
遇到问题无法在线上debug,难道只能通过加日志打印入参出参再重新发布吗?
线上遇到某个用户的数据处理有问题,但线上同样无法debug,线下无法重现!
是否有一个全局视角来查看系统的运行状况?
有什么办法可以监控到JVM的实时运行状态?
接口链路耗时的统计

1
2
3
sc
watch
trace

官方使用文档

领域驱动设计

领域驱动本身是针对复杂系统设计的软件工程方法,它在战略层面有三个重要的点,一是聚焦业务核心价值,二是统一语言,三是业务领域性划分。

聚焦业务核心价值,也是领域驱动战略层面最主要的目标;统一语言是基于领域划分的,也就是在一个领域限界上下文中统一语言。通常情况下在微服务会对应到一个限界上下文。在战术层目标主要是在设计系统时,需要具备高内聚、低耦合、易扩展、易维护四个要点。战术层面主要是为了保证空泛的战略的无法落地,为落地提供了一系列具体的开发模式。这也是代码即架构的一种体现。

  1. 如何识别聚合与聚合根?

明确含义:一个Bounded Context(界定的上下文)可能包含多个聚合,每个聚合都有一个根实体,叫做聚合根;

识别顺序:先找出哪些实体可能是聚合根,再逐个分析每个聚合根的边界,即该聚合根应该聚合哪些实体或值对象;最后再划分Bounded Context;

聚合边界确定法则:根据不变性约束规则(Invariant)。不变性规则有两类:1)聚合边界内必须具有哪些信息,如果没有这些信息就不能称为一个有效的聚合;2)聚合内的某些对象的状态必须满足某个业务规则;

如何识别聚合与聚合根?

明确含义:一个Bounded Context(界定的上下文)可能包含多个聚合,每个聚合都有一个根实体,叫做聚合根;

识别顺序:先找出哪些实体可能是聚合根,再逐个分析每个聚合根的边界,即该聚合根应该聚合哪些实体或值对象;最后再划分Bounded Context;

聚合边界确定法则:根据不变性约束规则(Invariant)。不变性规则有两类:
1)聚合边界内必须具有哪些信息,如果没有这些信息就不能称为一个有效的聚合;
2)聚合内的某些对象的状态必须满足某个业务规则;

例子分析1:订单模型

Order(一 个订单)必须有对应的客户信息,否则就不能称为一个有效的Order;同理,Order对OrderLineItem有不变性约束,Order也必须至少有一个OrderLineItem(一条订单明细),否 则就不能称为一个有效的Order;另外,Order中的任何OrderLineItem的数量都不能为0,否则认为该OrderLineItem是无效 的,同时可以推理出Order也可能是无效的。因为如果允许一个OrderLineItem的数量为0的话,就意味着可能会出现所有 OrderLineItem的数量都为0,这就导致整个Order的总价为0,这是没有任何意义的,是不允许的,从而导致Order无效;所以,必须要求 Order中所有的OrderLineItem的数量都不能为0;那么现在可以确定的是Order必须包含一些OrderLineItem,那么应该是通 过引用的方式还是ID关联的方式来表达这种包含关系呢?这就需要引出另外一个问题,那就是先要分析出是OrderLineItem是否是一个独立的聚合 根。回答了这个问题,那么根据上面的规则就知道应该用对象引用还是用ID关联了。那么OrderLineItem是否是一个独立的聚合根呢?因为聚合根意 味着是某个聚合的根,而聚合有代表着某个上下文边界,而一个上下文边界又代表着某个独立的业务场景,这个业务场景操作的唯一对象总是该上下文边界内的聚合 根。想到这里,我们就可以想想,有没有什么场景是会绕开订单直接对某个订单明细进行操作的。也就是在这种情况下,我们 是以OrderLineItem为主体,完全是在面向OrderLineItem在做业务操作。有这种业务场景吗?没有,我们对 OrderLineItem的所有的操作都是以Order为出发点,我们总是会面向整个Order在做业务操作,比如向Order中增加明细,修改 Order的某个明细对应的商品的购买数量,从Order中移除某个明细,等等类似操作,我们从来不会从OrderlineItem为出发点去执行一些业 务操作;另外,从生命周期的角度去理解,那么OrderLineItem离开Order没有任何存在的意义,也就是说OrderLineItem的生命周 期是从属于Order的。所以,我们可以很确信的回答,OrderLineItem是一个实体。

例子分析2:帖子与回复的模型,做个对比,以便更好地理解。

不变性分析:帖子和回复之间有不变性规则吗?似乎我们只知道一点是肯定的,那就是帖子和回复之间的关系,1:N的关系;除了这个之外,我们看不到任何其他的 不变性规则。那么这个1:N的对象关系是一种不变性规则吗?不是!首先,一个帖子可以没有任何回复,帖子也不对它的回复有任何规则约束,它甚至都不知道自 己有多少个回复;再次,发表了一个回复和帖子也没有任何关系;其次,发表回复对帖子没有任何改变;从业务场景的角度去分析,我们有发表帖子的场景,有发表 回复的场景。当在发表回复的时候,是以回复为主体的,帖子只是这个回复里所包含的必要信息,用于说明这个回复是对哪个帖子的回复。这些都说明帖子和回复之 间找不出任何不变性约束的规则;因为帖子和回复都有各自独立的业务场景的需要,所以可以很容易理解它们都是独立的聚合根;那也很容易知道该如何建立他们之 间的关联了,但是我们要尽量减少关联,所以只保留回复对帖子的关联即可;帖子没有任何必要去保存一个回复的ID的列表;那么你可能会说,当我删除一个帖子 后,回复应该是没有存在的意义的呀?不对,不是没有存在的意义,而是删除了帖子后导致了回复对帖子的关联信息的缺失,导致数据不一致。这是因为帖子和回复之间有一种必然的联系(1:N),回复一定会有一个对应的帖子;但是回复有其自己的生命周期,不应该随着帖子的删除而级联删除。这种情况下,如果你删除了 帖子,就导致回复也成为了一条无效的数据;所以,我们绝对不允许删除任何聚合根,因为一旦你删除了聚合根,那就意味着与该聚合根相关的其他任何聚合根都会 有外键引用缺失的问题,会导致整个领域模型数据的不一致;所以,永远都不要删除聚合根;

容易误解的例子

拿博客评论举个例子:
比如3个Entity:User, Blog CommentOfBlog
可以理解为一个User发布了若干Blog,一个Blog又有若干CommentOfBlog,所以User就可以认为是聚会根了吗?
其实不是得,如果这样去思考,那整个系统只需要User一个聚合根就可以了,因为所有的东西都是User录入的。
我实际上也没否定聚合或聚合根的概念。我只是觉得聚合的概念过去单一,实际上我们对象之间的关系有很多种,有弱关联,聚合,组合。我现在更多思考的是一个对象是否可以独立?然后会从两个相关对象的各自的角度进行分别思考,最后得出对象应该独立还是从属于另外一个对象;
以User,Blog(你应该是指博客随笔或文章吧?),CommentOfBlog为例:User一定可以独立,这点你也没异议;Blog呢?首先User和Blog是一对多的关系,User可以发表多篇blog,但这不表明user离开blog就不能活了,或者说是否user离开blog就没意义了?显然不是。因此可以推理出user不需要聚合blog;从blog的角度分析,虽然每个blog都有一个作者(author,user类的一个实例),但是blog存在的意义是否总是为了user而存在,我们创建blog的目的是为了被内聚到user吗?显然不是,blog创建的目的与user无关,我们是为了写随笔而创建blog,我们创建一个blog不是为了在user里增加一个blog。所以,应该理解为blog.Author,即author只是一个blog的关联属性,表示谁创建了该blog。CommentOfBlog:一个blog可以有多个comment,从blog角度分析,blog是否必须聚合comment?不见得,因为blog可以没有评论;但是从comment的角度来看,comment是否总为了blog而存在?实际上是的,
1)comment可以属于其他的Blog吗?不可以;
2)comment被创建的目的是为了评论blog,可以说离开blog谈comment总是完全没有任何意义。
所以我们会让blog聚合comment。之前我们分析聚合,可能更多的仅仅总是从一方面去分析,就是会分析当前聚合根应该聚合哪些对象,而不会从哪些被聚合的实体上去分析它是否真的应该被聚合还是应该独立。所以有时的出来的聚合会不准确。

这个例子其实不对,blog不应该聚合comment,这个和帖子的关系应该是一致的,comment离开了blog就没有意义了吗

总结一下

1.找最小的业务场景
2.找出聚合根(先确定可能是聚合根的实体,可能包含的实体),确定的原则是:具有独立的生命周期(生命开始和结束)
3.确定聚合根的边界,如包括 Entity1,E2,E3.。。。这些实体要依赖于聚合根的存在而存在。
聚合边界确定法则:根据不变性约束规则(Invariant)。

不变性规则有两类:
1)聚合边界内必须具有哪些信息,如果没有这些信息就不能称为一个有效的聚合;
2)聚合内的某些对象的状态必须满足某个业务规则

反向验证:
1.验证一个实体是否属于这个聚合:他还可以属于其他聚合吗?离开了这个聚合根是不是就失去了意义。
2.实体变化了,影响聚合根的内容了吗?

再次验证:

1.是否“同生死,共存亡”;
聚合除了封装我们关心的信息外最主要的目的就是为了封装业务规则,保证数据的一致性,
业务规则比如一个银行账号的余额不能小于0,订单中的订单明细的个数不能为0,订单中不能出现两个明细对应的商品ID相同,订单明细中的商品信息必须合法,商品的名称不能为空,回复被创建时必须要传入被回复的帖子(因为没有帖子的回复不是一个合法的回复)等

聚合应当设计的尽可能小
聚合包含的东西过多,导致多人操作时并发冲突严重,导致系统可用性变差;所以实现了既能解决并发冲突的问题,也能保证让聚合来封装业务规则,实现模型级别的数据一致性;另外,聚合设计的小还有一个好处,就是:业务决定聚合,业务改变聚合。聚合设计的小除了可以降低并发冲突的可能性之外,同样减少了业务改变的时候,聚合的拆分个数,降低了聚合大幅重构(拆分)的可能性,从而能让我们的领域模型更能适应业务的变化。

聚合之间通过ID关联
其实聚合之间无需通过对象引用的方式来关联;

需要关联时,必须要
1)尽量避免多对多的关系;
2)如果必须多对多,应该转换为两个一对多,然后都只要在双方对象中保存对方的ID即可;

我觉得聚合的设计主要把握以下几点吧:
1)强调Invariants,即不变性;
2)并不是A因为B才有存在的意义就一定表示A被B聚合,此时A也可能是聚合根,帖子和回复的例子就说明了这一点;
3)如何识别聚合确实还是应该从业务上分析,而不是从技术角度去左右;应该从业务上分析实体之间的关系,找出真正的不变性,从而确定聚合的边界;
4)聚合之间通过ID关联有太多好处,特别是在分布式环境时更加凸显,具体好处需要你自己开发时才会感受到,想想什么是数据,什么是行为类吧,为什么数据与数据之间几乎没什么多态,而框架类库中的那些类能很好的通过对象引用实现多态?其实ID关联表示A明确知道它关联了哪个B,而对象引用表示A拥有某个类型的B,但是从语义上来说它不知道引用的是哪个B。从这个角度去想想为什么领域模型中的对象类之间为什么要用ID关联,而框架类库中强调行为的类为什么是对象引用吧。至于其他技术方面的好处,太多了,自己慢慢体会吧,呵呵;想想CQRS架构为什么也采用ID关联吧,想想为什么Evans也在其官网上推荐ID关联?
5)至于上面一个朋友提到的关于领域模型中对象与对象之间的交互上,即一个对象做了什么改变后另一个对象会做出相应的修改,这个有两种方法:1)Domain Service;2)Messaging;从抽象的角度来说,Domain Service是一种过程化思维,先做什么后做什么都在Domain Service中完成;这也是经典的DDD的做法,用于处理一个领域逻辑需要跨聚合根的情况;2)如果用Messaging,典型的架构是CQRS,Messaging就是通过发送消息和接受消息实现两个聚合之间的通信,一般通过bus实现publish-subscribe的消息模式;通过这种方式的好处是,把原来的get and call模式转换为publish-subscribe模式,这样的转换带来的好处就是可以灵活配置需要同步订阅消息还是异步订阅消息,以及对象之间不再依赖;我们可以根据业务的数据一致性要求来决定采用同步还是异步订阅消息;异步分布式消息通信的知识可以去学习下NServiceBus。

关于如何区分对象和服务

对象对应领域中的一些名词性的概念;服务对应领域中的一些动词性的概念。如货物,书本是对象,我们在领域中需要唯一区分这些对象;领域服务用来表示一个涉及到多个领域对象的一个动作或转换,如资金转帐就是一个领域服务,因为它涉及到两个银行帐号领域对象;之所以没有把领域服务看成是对象是因为
1)它和一般的对象不同,一般的对象既有状态也有行为,而领域服务只有行为没有状态,如果一个概念没有状态只有行为,那么我们是没有必要把它看成是对象的,因为永远不需要持久化它或重建它。对于领域服务,我们只要关心它能做什么,不用关心它的生命周期,所以它在整个领域模型中的任何时候都只要一个实例即可。
2)对于图书借还系统而言,借书者是一个对象,因为我们需要区分是哪个借书者;但是图书馆在我的场景中只需要一个,所有的借书者都面对同一个图书馆进行交互;可以认为图书馆提供了让用户借书和还书的服务。所以,我把图书馆定义为一个服务;
3)至于你说的如果有很多大学,它们分别都有自己的图书馆,那就是不同的场景了。此时我觉得图书馆就是一个对象,因为一个借书者可能会区分是从这个图书馆借的书还是那个图书馆借的书。

一致性

谈谈一致性这个名词
关于Paxos说的一致性,个人理解是指冗余副本(或状态等,但都是因为存在冗余)的一致性。这与关系型数据库中ACID的一致性说的不是一个东西。在关系数据库里,可以连副本都没有,何谈副本的一致性?按照经典定义,ACID中的C指的是在一个事务中,事务执行的结果必须是使数据库从一个一致性状态变到另一个一致性状态。那么,什么又是一致性状态呢,这跟业务约束有关系,比如经典的转账事务,事务处理完毕后,不能出现一个账户钱被扣了,另一个账户的钱没有增加的情况,如果两者加起来的钱还是等于转账前的钱,那么就是一致性状态。

从很多博文来看,对这两种一致性往往混淆起来。另外,CAP原则里面所说的一致性,个人认为是指副本一致性,与Paxos里面的一致性接近。都是处理“因为冗余数据的存在而需要保证多个副本保持一致”的问题,NoSQL放弃的强一致性也是指副本一致性,最终一致性也是指副本达到完全相同存在一定延时。

当然,如果数据库本身是分布式的,且存在冗余副本,则除了解决事务在业务逻辑上的一致性问题外,同时需要解决副本一致性问题,此时可以利用Paxos协议。但解决了副本一致性问题,还不能完全解决业务逻辑一致性;如果是分布式数据库,但并不存在副本的情况,事务的一致性需要根据业务约束进行设计。

另外,谈到Paxos时,还会涉及到拜占庭将军问题,它指的是在存在消息丢失的不可靠信道上试图通过消息传递的方式达到一致性是不可能的。Paxos本身就是利用消息传递方式解决一致性问题的,所以它的假定是信道必须可靠,这里的可靠,主要指消息不会被篡改。消息丢失是允许的。

问题

maven 3.6.3 最新版本在解决嵌套属性访问时存在空指针异常,很让人困惑

排查流程

maven源码debug思路

执行mvnDebug命令,执行后会看到监听一个端口,然后下载maven源代码配置到idea remote debug 端口,就可以断点调试了

issues

官方提交的链接
https://issues.apache.org/jira/browse/MNG-6921

前言:为什么需要重试

重试的场景

什么场景需要重试?就是我们认为的暂时的失败,比如说,系统繁忙,网络失败,请求被限流,插入数据时版本失败或者是被调用端返回的可重试的错误码,这个时候可以进行重试。而有些失败是不需要重试的,例如用户权限错误、内容被反垃圾系统拦截、非法数据等等,这些失败,一般是重试多少次都是不可能成功的。

重试的策略

重试需要注意哪些事情呢?是否只需要简单的重复调用几次就行?最近我就遇到这样一个场景。有一个业务,并发量非常的大,特别在每天早上8,9点的时候,很多用户都会使用这个功能,这个公司会盘路去写一些数据,有一次底层的数据库系统压力非常大,所以有些请求就会写失败,然后立马重试,最后造成雪崩!这是重试设计中一个比较大的误区,如果是因为资源不够,那么立马不停的重试非常容易压垮系统。所以,在重试设计中,我们有限地进行重试,重试不应该是无限次数的,其次,便是要间歇地进行重试,即每次失败都需要休息一会,并且每次休息的时间逐渐加长。例如有个系统开始限流保护到熔断了,那么连续重试多少次也没用,还不如休息一段时间,这种做法在TCP的拥塞控制中也被应用到。

重试设计的要点

我觉得写代码,只要每次远程调用的时候,多思考以下几个点即可:

  • 是否需要重试,哪些错误需要重试,哪些错误可以直接返回。这个也跟业务相关,有时候虽然可以后台重试,但是还是会返回失败让用户在操作一次,这样可以保证代码的简洁。
  • 重试的次数与频率,哪些失败可以立马重试的(更新数据库版本失败),哪些失败需要异步去重试。
  • 重试的时候,任务是否是幂等的。这是非常关键的一点,比如说退款任务,有可能只是网络繁忙,实际上结算系统已经把钱退还给用户。这也是我们设计接口中重要的一点,尽量去保证消息的幂等性。

重试的实现

实时重试

  • spring-retry
    自定义超时异常,重试策略,50ms重试,依次递增
    1
    @Retryable(value = {RetryException.class}, backoff = @Backoff(delay = 50, multiplier = 1))
    spring相关组件完整提供了这种能力,资料很多,就不赘述了

异步重试

异步重试的实现方法也多种多样,这里简单介绍一种思路,基于队列的一种重试机制

  1. 开始流程处理,当捕获到可重试异常时将参数信息发送到一个队列
  2. 队列消费者异步消费消息进行重试补偿,当重试失败次数超过某个阈值时,将消息转移到队列对应的死信队列中
  3. 死信队列在消费逻辑中同时可以增加一些额外的操作,限流,白名单功能

基于对队列消费情况的监控,我们同时可可以对异常数据进行异构分析

额外引申

基于最近疫情,对人员各数据进行报表统计的需求层出不穷(紧急),从零到一构建一套系统投入使用成本就有点高了,但公司使用了一种策略,快速实现了功能,基于现有问卷系统能力,对统计的数据进行解析,异构出一套统计数据库。思路还是很清新的~~~

最近在发布组件到中央仓库的过程中,总结了以下最佳实践

打包过程

最佳实践

一个项目,为了让配置更通用些。我觉得要有两个要素

  • parent-pom
  • dependencies-bom

enode-parent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.enodeframework</groupId>
<artifactId>enode-parent</artifactId>
<version>${revision}</version>
<modules>
<module>enode</module>
<module>bom</module>
<module>mysql</module>
<module>kafka</module>
<module>rocketmq</module>
<module>tests</module>
<module>samples</module>
</modules>
<packaging>pom</packaging>

<properties>
<revision>1.0.2-SNAPSHOT</revision>
<maven.skip.deploy>false</maven.skip.deploy>
<maven.jar.version>3.0.2</maven.jar.version>
<maven.surefire.version>3.0.0-M4</maven.surefire.version>
<maven.deploy.version>3.0.0-M1</maven.deploy.version>
<maven.compiler.version>3.8.1</maven.compiler.version>
<maven.source.version>3.2.0</maven.source.version>
<maven.war.version>3.2.3</maven.war.version>
<maven.javadoc.version>3.1.1</maven.javadoc.version>
<maven.jetty.version>9.4.11.v20180605</maven.jetty.version>
<nexus.staging.version>1.6.8</nexus.staging.version>
<maven.gpg.version>1.6</maven.gpg.version>
<maven.flatten.version>1.1.0</maven.flatten.version>
<maven.enforce.version>3.0.0-M2</maven.enforce.version>
</properties>

<name>${project.artifactId}</name>
<description>The enodeframework is devoted to helping engineers develop scalable applications.</description>
<url>http://www.enodeframework.org</url>

<licenses>
<license>
<name>MIT License</name>
<url>http://www.opensource.org/licenses/mit-license.php</url>
</license>
</licenses>
<developers>
<developer>
<name>anruence</name>
<email>anruence@gmail.com</email>
<organizationUrl>http://www.enodeframework.org</organizationUrl>
</developer>
</developers>
<scm>
<tag>master</tag>
<url>https://github.com/anruence/enode</url>
<connection>scm:git:git@github.com:anruence/enode.git</connection>
</scm>

<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
<repository>
<id>ossrh</id>
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
</repository>
</distributionManagement>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.enodeframework</groupId>
<artifactId>enode-dependencies-bom</artifactId>
<version>${revision}</version>
<scope>import</scope>
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.compiler.version}</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId>
<version>${maven.war.version}</version>
<configuration>
<warName>${project.artifactId}-${project.version}</warName>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>${maven.deploy.version}</version>
<configuration>
<skip>${maven.skip.deploy}</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven.surefire.version}</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>flatten-maven-plugin</artifactId>
<version>${maven.flatten.version}</version>
<configuration>
<updatePomFile>true</updatePomFile>
<flattenMode>resolveCiFriendliesOnly</flattenMode>
</configuration>
<executions>
<execution>
<id>flatten</id>
<phase>process-resources</phase>
<goals>
<goal>flatten</goal>
</goals>
</execution>
<execution>
<id>flatten.clean</id>
<phase>clean</phase>
<goals>
<goal>clean</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>${maven.source.version}</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>${maven.javadoc.version}</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<doclint>none</doclint>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>sonatype-oss-release</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>${maven.gpg.version}</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

enode-dependencies-bom

需要对项目依赖的项目进行整体管理,参见spring系统的bom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.enodeframework</groupId>
<artifactId>enode-dependencies-bom</artifactId>
<version>${revision}</version>
<packaging>pom</packaging>
<properties>
<revision>1.0.2-SNAPSHOT</revision>
<springboot.version>2.2.5.RELEASE</springboot.version>
<vertx.version>3.8.5</vertx.version>
<slfj4.version>1.7.30</slfj4.version>
<guava.version>28.2-jre</guava.version>
<gson.version>2.8.6</gson.version>
<junit.version>4.13</junit.version>
<rocketmq.version>4.6.1</rocketmq.version>
<mysql.version>8.0.19</mysql.version>
<reflections.version>0.9.12</reflections.version>
<maven.flatten.version>1.1.0</maven.flatten.version>
<maven.gpg.version>1.6</maven.gpg.version>
</properties>

<name>${project.artifactId}</name>
<description>The enodeframework is devoted to helping engineers develop scalable applications.</description>
<url>http://www.enodeframework.org</url>

<licenses>
<license>
<name>MIT License</name>
<url>http://www.opensource.org/licenses/mit-license.php</url>
</license>
</licenses>
<developers>
<developer>
<name>anruence</name>
<email>anruence@gmail.com</email>
<organizationUrl>http://www.enodeframework.org</organizationUrl>
</developer>
</developers>
<scm>
<tag>master</tag>
<url>https://github.com/anruence/enode</url>
<connection>scm:git:git@github.com:anruence/enode.git</connection>
</scm>

<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
<repository>
<id>ossrh</id>
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
</repository>
</distributionManagement>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${springboot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-stack-depchain</artifactId>
<version>${vertx.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.enodeframework</groupId>
<artifactId>enode</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.enodeframework</groupId>
<artifactId>enode-mysql</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.enodeframework</groupId>
<artifactId>enode-rocketmq</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.enodeframework</groupId>
<artifactId>enode-kafka</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>${reflections.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slfj4.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>flatten-maven-plugin</artifactId>
<version>${maven.flatten.version}</version>
<configuration>
<updatePomFile>true</updatePomFile>
<flattenMode>resolveCiFriendliesOnly</flattenMode>
</configuration>
<executions>
<execution>
<id>flatten</id>
<phase>process-resources</phase>
<goals>
<goal>flatten</goal>
</goals>
</execution>
<execution>
<id>flatten.clean</id>
<phase>clean</phase>
<goals>
<goal>clean</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>sonatype-oss-release</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>${maven.gpg.version}</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

需求

如何自定义log4j的配置文件路径?

加载顺序

1
2
3
4
5
6
7
8
9
10
11
Automatic Configuration
Log4j has the ability to automatically configure itself during initialization. When Log4j starts it will locate all the ConfigurationFactory plugins and arrange then in weighted order from highest to lowest. As delivered, Log4j contains three ConfigurationFactory implementations: one for JSON, one for YAML, and one for XML.

Log4j will inspect the "log4j.configurationFile" system property and, if set, will attempt to load the configuration using the ConfigurationFactory that matches the file extension.
If no system property is set the YAML ConfigurationFactory will look for log4j2-test.yaml or log4j2-test.yml in the classpath.
If no such file is found the JSON ConfigurationFactory will look for log4j2-test.json or log4j2-test.jsn in the classpath.
If no such file is found the XML ConfigurationFactory will look for log4j2-test.xml in the classpath.
If a test file cannot be located the YAML ConfigurationFactory will look for log4j2.yaml or log4j2.yml on the classpath.
If a YAML file cannot be located the JSON ConfigurationFactory will look for log4j2.json or log4j2.jsn on the classpath.
If a JSON file cannot be located the XML ConfigurationFactory will try to locate log4j2.xml on the classpath.
If no configuration file could be located the DefaultConfiguration will be used. This will cause logging output to go to the console.

Native Java Application

原生Java应用,通过启动时添加系统变量即可

1
-Dlog4j.configurationFile=log4j2.xml

SpringBoot应用

SpringBoot应用也很简单
直接在application.properties中配置即可

1
logging.config=classpath:log4j2-hui-prod.xml

Spring应用

网络上一搜,大部分都是这样的配置

1
2
3
4
5
6
7
8
9
10
11
<context-param>
<param-name>log4jConfigLocation</param-name>
<param-value>classpath:log4j2-hui-prod.xml</param-value>
</context-param>
<context-param>
<param-name>log4jRefreshInterval</param-name>
<param-value>60000</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.util.Log4jConfigListener</listener-class>
</listener>

这样能行?log4j1 时代用的都是这套配置,但同样适用log4j2吗?追踪实现时发现Log4jConfigListener 在Spring 4.2.1版本后已经废弃了

先直接贴出答案,各位看官急用的话看到这里就可以解决问题。

配置变更

log4jConfigLocation -> log4jConfiguration

1
2
3
4
<context-param>
<param-name>log4jConfiguration</param-name>
<param-value>classpath:log4j2-hui-prod.xml</param-value>
</context-param>

PS:大家是不是从来没有注意过web.xml下的web-app的配置,类似下面这样的:

1
2
3
4
5
6
7
<?xml version="1.0" encoding="UTF-8"?>
<web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd">

</web-app>

修改为 3.1 or higher
可参考:http://www.oracle.com/webfolder/technetwork/jsc/xml/ns/javaee/index.html

1
2
3
4
5
6
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
version="3.1">
</web-app>

详细分析

web-app 这些配置是干嘛的

首先这是个 XML Schemas for Java EE Deployment Descriptors,主要是配置描述符

根因分析

上面说到Spring实现的Listener已经废弃了,我们详细跟踪到了 Log4jServletContextListener ,这个是log4j2-web这个jar中实现的。按理说直接引入就应该可以达到修改配置路径的目的了,但发现实际上还是做了些限制,贴出下面的代码应该就能说明问题了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

package org.apache.logging.log4j.web;

import java.util.EnumSet;
import java.util.Set;
import javax.servlet.DispatcherType;
import javax.servlet.FilterRegistration;
import javax.servlet.ServletContainerInitializer;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.status.StatusLogger;

/**
* In a Servlet 3.0 or newer environment, this initializer is responsible for starting up Log4j logging before anything
* else happens in application initialization. For consistency across all containers, if the effective Servlet major
* version of the application is less than 3.0, this initializer does nothing.
*/
public class Log4jServletContainerInitializer implements ServletContainerInitializer {

private static final Logger LOGGER = StatusLogger.getLogger();

@Override
public void onStartup(final Set<Class<?>> classes, final ServletContext servletContext) throws ServletException {
if (servletContext.getMajorVersion() > 2 && servletContext.getEffectiveMajorVersion() > 2 &&
!"true".equalsIgnoreCase(servletContext.getInitParameter(
Log4jWebSupport.IS_LOG4J_AUTO_INITIALIZATION_DISABLED
))) {
LOGGER.debug("Log4jServletContainerInitializer starting up Log4j in Servlet 3.0+ environment.");

final FilterRegistration.Dynamic filter =
servletContext.addFilter("log4jServletFilter", Log4jServletFilter.class);
if (filter == null) {
LOGGER.warn("WARNING: In a Servlet 3.0+ application, you should not define a " +
"log4jServletFilter in web.xml. Log4j 2 normally does this for you automatically. Log4j 2 " +
"web auto-initialization has been canceled.");
return;
}

final Log4jWebLifeCycle initializer = WebLoggerContextUtils.getWebLifeCycle(servletContext);
initializer.start();
initializer.setLoggerContext(); // the application is just now starting to start up

servletContext.addListener(new Log4jServletContextListener());

filter.setAsyncSupported(true); // supporting async when the user isn't using async has no downsides
filter.addMappingForUrlPatterns(EnumSet.allOf(DispatcherType.class), false, "/*");
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* Returns the major version of the Servlet API that this
* servlet container supports. All implementations that comply
* with Version 3.0 must have this method return the integer 3.
*
* @return 3
*/
public int getMajorVersion();


/**
* Returns the minor version of the Servlet API that this
* servlet container supports. All implementations that comply
* with Version 3.0 must have this method return the integer 0.
*
* @return 0
*/
public int getMinorVersion();


/**
* Gets the major version of the Servlet specification that the
* application represented by this ServletContext is based on.
*
* <p>The value returned may be different from {@link #getMajorVersion},
* which returns the major version of the Servlet specification
* supported by the Servlet container.
*
* @return the major version of the Servlet specification that the
* application represented by this ServletContext is based on
*
* @throws UnsupportedOperationException if this ServletContext was
* passed to the {@link ServletContextListener#contextInitialized} method
* of a {@link ServletContextListener} that was neither declared in
* <code>web.xml</code> or <code>web-fragment.xml</code>, nor annotated
* with {@link javax.servlet.annotation.WebListener}
*
* @since Servlet 3.0
*/
public int getEffectiveMajorVersion();


/**
* Gets the minor version of the Servlet specification that the
* application represented by this ServletContext is based on.
*
* <p>The value returned may be different from {@link #getMinorVersion},
* which returns the minor version of the Servlet specification
* supported by the Servlet container.
*
* @return the minor version of the Servlet specification that the
* application xrepresented by this ServletContext is based on
*
* @throws UnsupportedOperationException if this ServletContext was
* passed to the {@link ServletContextListener#contextInitialized} method
* of a {@link ServletContextListener} that was neither declared in
* <code>web.xml</code> or <code>web-fragment.xml</code>, nor annotated
* with {@link javax.servlet.annotation.WebListener}
*
* @since Servlet 3.0
*/
public int getEffectiveMinorVersion();

简而言之就是:
2.5 => (major = 2 minor = 5)
3.1 => (major = 3 minor = 1)
MajorVersion => 实际引入jar的版本
EffectiveMajorVersion => web-app中声明的版本

参考

前言

ENode是一个基于消息的架构,使用ENode开发的系统,每个环节都是处理消息,处理完后产生新的消息。本篇文章我想详细分析一下ENode框架内部是如何实现整个消息处理流程的。为了更好的理解我后面的流程的描述,我觉得还是应该先把ENode的架构图贴出来,好让大家在看后面的分析时,可以对照这个架构图进行思考和理解。

ENode架构图

ENode框架内部实现流程分析

  1. Controller发送ICommand到分布式消息队列EQueue(可以是各类分布式MQ);

  2. 【从这一步开始处理Command】MQ中的CommandConsumer接收到该ICommand,先创建一个ICommandContext实例,然后调用ENode中的ICommandExecutor执行当前ICommand并将ICommandContext传递给ICommandExecutor;

  3. ICommandExecutor根据当前ICommand的类型,获取到一个唯一的ICommandHandler,然后调用ICommandHandler的Handle方法处理当前ICommand,调用时传递当前的ICommandContext给ICommandHandler;

  4. ICommandHandler处理完Command后,ICommandExecutor获取当前ICommandContext中新增或修改的聚合根;

  5. 检查当前ICommandContext中是否只有一个新增或修改的聚合根;如果超过1个,则报错,通过这样的检查来从框架级别保证一个Command一次只能修改一个聚合根;

  6. 如果发现当前新增或修改的聚合根为0个,则直接认为当前的ICommand已处理完成,就调用ICommandContext的OnCommandExecuted方法,该方法内部会通知EQueue发送CommandResult消息给Controller;然后Controller那边的进程,会有一个CommandResultProcessor接收到这个CommandResult的消息,然后就知道该ICommand的处理结果了;(update: 消息通知的方式已经修改为使用 Socket 发送结果)

  7. ICommandExecutor从ICommandContext拿到当前唯一修改的聚合根后,取出该聚合根里产生的IDomainEvent。由于一个聚合根一次可能会长生多个IDomainEvent,所以我们会构建一个EventStream对象。这个对象包含了所有当前聚合根所产生的IDomainEvent。一个EventStream会包含很多重要的信息,包括当前ICommand的ID、聚合根的ID、聚合根的Version(版本号),以及所有的IDomainEvent,等等;【update:命令在确认事件持久化完成后则执行完成】

  8. ICommandExecutor将该Command添加到ICommandStore。因为ICommandStore是以CommandId为主键(即Key),所以如果CommandId重复,框架就会知道,然后就会做重复时的逻辑处理,这点后面再详细分析;【update:基于性能考虑,废弃了这种处理方式,Command的幂等判断在持久化事件的时候来做了】

  9. 如果Command成功添加到ICommandStore,则接下来调用IEventService的Commit方法将当前EventStream持久化到IEventStore;

  10. IEventService内部主要做3件事情:

    • 1)将EventStream持久化到IEventStore;
    • 2)持久化成功后调用IMemoryCache更新缓存(缓存可以配置为本地缓存也可以配置为分布式缓存Redis,如果Command的处理是集群处理的,那我们应该用共享缓存,也就是用Redis这种分布式缓存);
    • 3)缓存更新好之后,调用IEventPublisher接口的Publish方法将EventStream发布出去,IEventPublisher的具体实现者会把当前的EventStream发送到EQueue。

    这3步是正常情况的流程。如果遇到持久化到IEventStore时遇到版本号重复(同一个聚合根ID+聚合根的Version相同,则认为有并发冲突),此时框架需要做不同的逻辑处理;这点也在后面详细分析。

  11. 【从这一步开始处理Domain Event】EventStream被ENode.EQueue中的EventConsumer接收到,然后EventConsumer调用IEventProcessor处理当前的EventStream;

  12. IEventProcessor首先判断当前的EventStream是否可以被处理,这里我们需要保证的很关键的一点是,必须确保事件的持久化顺序和被事件的订阅者处理的顺序要严格一样,否则就会出现Command端的数据和Query端的Read DB中的数据不一致的情况。关于如何保证这个顺序的一致,后面我们在详细分析。这里先举个简单的例子来说明为什么要顺序一致。比如假如现在有一个聚合根的一个属性,该属性的默认值是0,然后该属性先后发生了三个Domain Event(代表的意思分别是对这个属性做+1,*2,-1)。这三个事件如果按照这样的顺序发生后,那这个属性最后的值是1;但是如果这3个事件被消费者消费的顺序是+1,-1,*2那最后的结果就不是1了,而是0了;所以通过这个例子,我想大家应该都知道了为什么要严格保证聚合根持久化事件的顺序必须和被消费的顺序要完全一致了;

  13. 假如当前的EventStream允许被处理,则IEventProcessor对当前的EventStream中的每个IDomainEvent做如下处理:

    • 根据IDomainEvent的类型得到所有当前IEventProcessor节点上所有注册的IEventHandler,然后调用它们的Handle方法,完成比如Query端的Read DB的更新。但是事情还没那么简单,因为我们还需要保证当前的IDomainEvent只会被当前的IEventHandler处理一次,否则IEventHandler就会因为重复处理了IDomainEvent而导致最后的数据不对;这里的幂等也在后面详细讨论。
  14. 有些IEventHandler处理完IDomainEvent后会产生新的ICommand(就是Saga Process Manager)的情况。这种情况下,我们还需要把这些产生的ICommand由框架自动发送到消息队列(EQueue);但是事情也没那么简单,要是发送这些ICommand失败了呢?那就需要重发,那重发如何设计才能保证不管重发多少次,也不会导致ICommand的重复执行呢?这里其实最关键的一点是要保证你每次重发的ICommand的Id总是和第一次发送时要相同的,否则框架就无法知道是否是同一个Command了。这里的具体设计后面再分析。

    Command的幂等处理

首先Command的幂等判断在持久化事件的时候来做了,怎么做的呢?

将事件流提交到EventStore时,EventStream存储表中定义了聚合根ID和CommadnId的唯一索引,根据insert语句抛出的异常类型来判断是不是DuplicateCommand。

这里涉及到两种持久化方式的处理(批量处理,顺序处理)

  1. 未开启批量处理,顺序写入db,通过SQLException来判断唯一索引的冲突情况(Command重复 or Event重复)。
  2. 当开启批量处理时,写入失败时,重试每个写SQL,然后执行1。

上面流程中的第8步,Command会被添加到ICommandStore。这里,实际上我添加到ICommandStore的是一个HandleCommand对象,该对象包含当前的Command之外,还有当前被修改的聚合根ID。这样做的理由请看我后面的解释。我们知道ICommandStore会对CommandId作为主键,这样我们就能绝对保证一个Command不会被重复添加。如果Command添加到ICommandStore成功,那自然最好了,直接进入后续的步骤即可;但是如果出现CommandId重复的时候,我们需要做怎么样的处理呢?

如果出现重复,则需要根据CommandId(主键),把之前已经持久化过的HandledCommand取出来;然后然后我们从HandledCommand拿到被修改的聚合根ID,然后最关键的一步:我们将该聚合根ID以及CommandId作为条件从IEventStore中查询出一个可能存在的EventStream。如果存在,就说明这个Command所产生的Domain Event已经被持久化了,所以我们只要再做一遍发布事件的操作即可。即调用IEventPublisher.Publish方法来发布事件到Query Side。那么为什么要发布呢?因为虽然事件被持久化了,但并不代表已经成功被发布出去了。因为理论上有可能Domain Event被持久化成功了,但是在要发布事件的时候,断电了!所以这种情况下,重启服务器就会出现这里讨论的情况了。所以我们需要再次Publish事件。

然后,如果没有根据CommandId和聚合根ID查找到EventStream呢?那也好办,因为这种情况就说明这个Command虽然被持久化了,但是它所产生的EventStream却没有被持久化到EventStore,所以我们需要将当前的EventStream调用IEventService.Commit方法进行持久化事件。

另外,这里其实有一个疑问,为什么查找EventStream不能仅仅根据CommandId呢?原因是:从技术上来说,我们可以只根据CommandId来查找到一个唯一的EventStream,但这样设计的话,就要求EventStore必须要支持通过一个CommandId来全局唯一定位到一个EventStream了。但是因为考虑到EventStore的数据量是非常大的,我们以后可能会根据聚合根ID做水平拆分(sharding)。这样的话,我们仅仅靠CommandId就无法知道到哪个分片下去查找对应的EventStream了。所以,如果查询时,能同时指定聚合根ID,那我们就能轻松知道首先到哪个分片下去找EventStream,然后再根据CommandId就能轻松定位到一个唯一的EventStream了。

既然说到这里,我再说一下CommandStore的水平分割的设计吧,CommandStore的数据量也是非常大的,因为它会存储所有的Command。不过幸好,我们对于CommandStore只需要根据CommandId去查找即可,所以我们可以根据CommandId来做Hash取模的方式来水平拆分。这样即便是分片了,我们只要知道了一个给定的CommandId,也能知道它当前是在哪个分片下的,就很容易找到该Command了。

所以,通过上面的分析,我们知道了CommandStore和EventStore在设计上不仅仅考虑了如何存储数据,还考虑了未来大数据量时如何分片,以及如何在分片的情况下仍然能方便的查找到我们的数据。

最后,上面还有一种情况没有说明,就是当出现Command添加到CommandStore时发现重复,但是尝试从CommandStore中根据CommandId查询该Command时,发现查不到,天哪!这种情况实际上不应该出现,如果出现,那说明CommandStore内部有问题了。因为为什么添加时说有重复,而查询却差不多来呢?呵呵。这种情况就无法处理了,我们只能记录错误日志,然后进行后续的排查。

Domain Event持久化时的并发冲突检测和处理

上面流程中的第10步,我们提到:如果遇到EventStream持久化到IEventStore时遇到版本号重复(同一个聚合根ID+聚合根的Version相同,则认为有并发冲突),此时框架需要做不同的逻辑处理。具体是:

首先,我们可以先想想为什么会出现同一个聚合根会在几乎同一时刻产生两个版本号一样的领域事件,并持久化到EventStore。首先,我先说一下这种情况几乎不会出现的理由:ENode中,在ICommandExecutor在处理一个Command时,会检查当前该Command所要修改的聚合根是否已经有至少一个聚合根正在被处理,如果有,则会将当前Command排入到这个聚合根所对应的等候队列。也就是说,它暂时不会被执行。然后当当前聚合根的前面的Command被执行完了后才会从这个等候队列取出下一个等待的Command进行处理。通过这样的设计,我们保证了,对一个聚合根的所有Command,不会并行被执行,只会按照顺序被执行。因为每个ICommandExecutor会在需要的时候,为某个聚合根自动创建这种等候队列,只要对该聚合根的Command同一时刻进来2个或以上。

那么,要是集群的时候呢?你一台机器的话,通过上面的方式可以确保一个聚合根实例的所有的Command会被顺序处理。但是集群的时候,可能一个聚合根会在多台机器被同时处理了。要解决这个问题的思路就是对Command按照聚合根ID进行路由了,因为一般只要是修改聚合根的Command总是会带有一个聚合根ID,所以我们可以按照这个特性,对被发送的Command按照聚合根ID进行路由。只要CommandId相同,则总是会被路由到同一个队列,然后因为一个队列总是只会被一台机器消费,从而我们能保证对同一个聚合根的Command总是会落到一台机器上被处理。那么你可能会说,要是热点数据呢?比如有些聚合根突然对他修改的Command可能非常多(增加了一倍),而有些则很少,那怎么办呢?没关系,我们还有消息队列的监控平台。当出现某个聚合根的Command突然非常多的时候,我们可以借助于EQueue的Topic的Queue可以随时进行增加的特性来应付这个问题。比如原来这个Topic下只有4个Queue,那现在增加到8个,然后消费者机器也从4台增加到8台。这样相当于Command的处理能力也增加了一倍。从而可以方便的解决热点数据问题。因此,这也是我想要自己实现分布式消息队列EQueue的原因啊!有些场景,要是自己没有办法完全掌控,会很被动,直接导致整个架构的严重缺陷,最后导致系统瘫痪,而自己却无能为了。当然你可以说我们可以使用Kafka, Rocketmq这样的高性能分布式队列,确实。但是毕竟这种高大上的队列非常复杂,且都是非.NET平台。除了问题,维护起来肯定比自己开发的要难维护。当然除非你对它们非常精通且有自信的运维能力。

通过上面的思路实现的,确保聚合根的Command总是被顺序线性处理的设计,对EventStore有非常大的意义。因为这样可以让EventStore不会出现并发冲突,从而不会造成无谓的对EventStore的访问,也可以极大的降低EventStore的压力。

但是什么时候还是可能会出现并发冲突呢?因为:

1)当处理Command的某台机器挂了,然后这台机器所消费的Queue里的消息就会被其他机器接着消费。其他机器可能会从这个Queue里批量拉取一些Command消息来消费。然后此时假如我们重启了这台有问题的服务器,重启完之后,因为又会开始消费这个Queue。然后一个关键的点是,每次一台机器启动时,会从EQueue的Broker拉取这个Queue最后一个被消费的消息的位置,也就是Offset,而由于这个Offset的更新是异步的,比如5s才会更新到EQueue的Broker,所以导致这台重启后的服务器从Broker上拉取到的消费位置其实是有延迟的,从而就可能会去消费在那台之前接替你的服务器已经消费过的或者正在消费的Command消息了。当然这种情况因为条件太苛刻,所以基本不会发生,即便会发生,一般也不会导致Command的并发执行。但是这毕竟也是一种可能性。实际上这里不仅仅是某个服务器挂掉后再重启的情况会导致并发冲突,只要是处理Comand的机器的集群中有任何的机器的增加或减少,由于都会导致Command消息的消费者集群重新负载均衡。在这个负载均衡的过程中,就会导致同一个Topic下的同一个Queue里的部分消息可能会在两台服务器上被消费。原因是Queue的消费位置(offset)的更新不是实时的,而是定时的。所以,我们一般建议,尽量不要在消息很多的时候做消费者集群内机器的变动,而是尽量在没什么消息的时候,比如凌晨4点时,做集群的扩容操作。这样可以尽量避免所有可能带来的消息重复消费或者并发冲突的可能性。呵呵,这段话也许很多人看的云里雾里,我只能说到这个程度了,也许要完全理解,大家还需要对EQueue的设计很清楚才行!

2)就算同一个机器内,其实也是有可能出现对同一个聚合根的并发修改,也就是针对同一个聚合根的两个Command被同时执行。原因是:当一个Command所对应的EventStream在被持久化时出现重复,然后我就会放在一个本地的内存队列进行重试,然后重试由于是在另一个专门的重试线程里,该线程不是正常处理Command的线程。所以假如对该聚合根后续还有Command要被处理,那就有可能会出现同一时刻,一个聚合根被两个Command修改的情况了。

现在,我们在回来讨论,假如遇到冲突时,要怎么做?这个上面我简单提到过,就是需要重试Command。但也不是这么简单的逻辑。我们需要:

a. 先检查当前的EventStream的Version是否为1,假如为1,说明有一个创建聚合根的Command被并发执行了。此时我们无须在重试了,因为即便再重试,那最后产生的EventStream的版本号也总是1,因为只要是第一次创建聚合根,那这个聚合根所产生的DomainEvent的版本总是1。所以这种情况下,我们只需要直接从EventStore拿出这个已经存在的EventStream,然后通过IEventPublisher.Publish方法发布该EventStream即可。为什么要再次发布,上面解释Command的幂等时,也解释了原因,这里是一样的原因。这里也有一个小的点需要注意,就是假如尝试从EventStore拿出这个EventStream时,假如没获取到呢?这个问题实际上不应该出现,原因就像上面分析Command幂等时一样,为什么会出现添加时提示存在,但查询时却查不到的情况呢?这种情况就是EventStore的设计有问题了,读写存在非强一致性的情况了。

b. 如果当前的EventStream的Version大于1,则我们需要先更新内存缓存(Redis),然后做Command的重试处理。为什么要先更新缓存呢?因为如果不更新,有可能重试时,拿到的聚合根的状态还是旧的,所以重试后还是导致版本号冲突。那为什么从缓存中拿到的聚合根的状态可能还是旧的呢?因为EventStream已经存在于EventStore并不代表这个EventStream的修改已经更新到缓存了。因为我们是先持久化到EventStore,在更新缓存的。完全有可能你还没来得及更新缓存的时候,另一个Command正好需要重试呢!所以,最保险的做法,就是再重试的时候将缓存中的聚合根状态更新到最新值。那怎么更新呢?呵呵,很简单,就是通过事件溯源(即Event Sourcing技术)了。我们只要从Event Store获取当前聚合根的所有的Event Stream,然后溯源这些事件,最后就能得到聚合根的最新版本的状态了,然后更新到缓存即可。

最后,如果需要重试的话,要怎么重试呢?很简单,只要扔到一个本地的基于内存的重试队列即可。我现在是用BlockingCollection的。

如何保证事件产生的顺序和被消费的顺序相同

为什么要保证这个相同的顺序,在上面的流程步骤介绍里已经说明了。这里我们分析一下如何实现这个顺序的一致。基本的思路是用一个表,存放所有聚合根当前已经处理过的最大版本号,假如当前已经处理过的最大版本号是10,那接下来只能处理这个聚合根版本号为11的EventStream。即便Version=12或者更后面的先过来,也只能等着。那怎么等呢?也是类似Command的重试队列一样,在一个本地的内存队列等就行了。比如现在最大已处理的版本号是10,然后现在12,13这两个版本号的EventStream先过来,那就先到队列等着,然后版本号是11的这个事件过来了,就可以处理。处理好之后,当前最大已处理的版本号就编程11了,所以等候队列中的版本号为12的EventStream就可以允许被处理了。整个控制逻辑就是这样。那么这是单机的算法,要是集群呢?实际上这不必考虑集群的情况,因为我们每台机器上都是这个顺序控制逻辑,所以如果是集群,那最多可能出现的情况(实际上这种情况存在的可能性也是非常的低)是,版本号为11的EventStream被并发的处理。这种情况就是我下面要分析的。

这里实际上还有一个细节我还没说到,这个细节和EQueue的Consumer的ConsumerGroup相关,就是假如一种消息,有很多Consumer消费,然后这些Consumer假如分为两个ConsumerGroup,那这两个ConsumerGroup的消费是相互隔离的。也就是说,所有这些消息,两个ConsumerGroup内的Consumer都会消费到。这里如果不做一些其他的设计,可能会在用户使用时遇到潜在的问题。这里我没办法说的很清楚,说的太清楚估计会让大家思维更混乱,且因为这个点不是重点。所以就不展开了。有兴趣的朋友可以看一下ENode中的EventPublishInfo表中的EventProcessorName字段的用意。

如何保证一个IDomainEvent只会被一个IEventHandler处理一次

这个只是提供一个思路,框架未实现。
这一条的原因,我想大家都能理解。比如一个Event Handler是更新读库的,可能我们会执行一条有副作用的SQL,比如update product set price = price + 1 where id = 1000。这条SQL如果被重复执行一次,那price字段的值就多了1了,这不是我们所期望的结果。所以框架需要有基本的责任可以基本避免这种情况的发生。那怎么实现呢?思路也是用一张表,记录被执行的DomainEvent的ID以及当前处理这个DomainEvent的Event Handler的类型的一个Code,对这两个联合字段做联合唯一索引。每次当一个Event Handler要处理一个Domain Event时,先判断是否已经处理过,如果没处理过,则处理,处理过之后把被处理的Domain Event ID和EventHandler Type Code添加到这个表里即可。那假如添加的时候失败了呢?因为有可能也会有并发的情况,导致Event Handler重复处理同一个Domain Event。这种情况框架就不做严谨的处理了,因为框架本身也无法做到。因为框架式无法知道Event Handler里面到底在做什么的。有可能是发送邮件,也有可能是记录日志,也可能是更新读取(Read DB)。所以,最根本的,还是要求Event Handler内部,也就是开发这自己需要考虑幂等的实现。当然框架会提供给开发者必要的信息来帮助他们完成严谨幂等控制。比如框架会提供当前Domain Event 的版本号给Event Handler,这样Event Handler里就能在Update SQL的Where部分把这个Version带上,从而实现乐观并发控制。比如下面的代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void Handle(IEventContext context, SectionNameChangedEvent evnt)
{
TryUpdateRecord(connection =>
{
return connection.Update(
new
{
Name = evnt.Name,
UpdatedOn = evnt.Timestamp,
Version = evnt.Version
},
new
{
Id = evnt.AggregateRootId,
Version = evnt.Version - 1
}, Constants.SectionTable);
});
}

上面的代码中,当我们更新一个论坛的版块时,我们可以在sql的where条件中,用version = evnt.Verion - 1这样的条件。从而确保当前你要处理的事件一定是上一次已处理的事件的版本号的下一个版本号,也就是保证了Query Side的更新的顺序严格和事件产生的顺序一致。这样即便框架在有漏网之鱼的时候,Event Handler内部也能做严谨的顺序控制。当然如果你的Event Handler是发送邮件,那我还真不知道该如何进一步保证这个严谨的顺序或者并发冲突了,呵呵。有兴趣的朋友可以和我交流。

在Saga Process Manager中产生的ICommand如何能够支持重试发送而不导致操作的重复

终于到最后一点了,好累。坚持就是胜利!假如现在的Saga Event Handler里是会产生Command,那框架在发送这些Command时,要确保不能重复执行。怎么办呢?假如在Saga Event Handler里产生的Command的Id每次都是新new出来的一个唯一的值,那框架就无法知道这个Command是否和之前的重复了,框架会认为这是两个不同的Command。这里其实有两种做法:

  1. 框架先把Saga Event Handler中产生的Command保存起来,然后慢慢发送到EQueue。发送成功一个,就删除一个。直到全部发送完为止。这种做法是可行的,因为这样一来,我们要发送的Command就总是从存储这些Command的地方去拿了,所以不会出现每次要发送的同一个Command的ID都是不同的情况。但是这种设计性能不是太好,因为要发送的Command必须要先被保存起来,然后再发送,发送完之后还要删除。性能上肯定不会太高。

  2. 第二种做法是,Command不存储起来,而是直接把Saga Event Handler中产生的Command拿去发送。但这种设计要求:框架对这种要发送的Command的ID总是按照某个特定的规则来产生的。这种规则要保证产生的CommandId首先是要唯一的,其次是确定的。下面我们看一下下面的代码:

1
2
3
4
5
6
7
private string BuildCommandId(ICommand command, IDomainEvent evnt, int eventHandlerTypeCode)
{
var key = command.GetKey();
var commandKey = key == null ? string.Empty : key.ToString();
var commandTypeCode = _commandTypeCodeProvider.GetTypeCode(command.GetType());
return string.Format("{0}{1}{2}{3}", evnt.Id, commandKey, eventHandlerTypeCode, commandTypeCode);
}

上面这个代码是一个函数,用来构建要被发送的Command的Id的,我们可以看到ID是由Command的一个key+要被发送的Command的类型的code+当前被处理的Domain Event的ID,以及当前的Saga Event Handler的类型的code这四个信息组成。对于同一个Domain Event被同一个Event Handler处理,然后如果产生的Command的类型也是一样的,那我们基本可以通过这三个信息构建一个唯一的CommandId了,但是有时这样还不够,因为我们可能在一个Event Handler里构建两个类型完全一样的Command,但是他们修改的聚合根的ID不同。所以,我上面才有一个commandKey的组成部分。这个key默认就是这个Command要修改的聚合根的ID。这样,通过这样4个信息的组合,我们可以确保不管某个Domain Event被某个Saga Event Handler处理多少次,最后产生的Command的ID总是确定的,不变的。当然上面的commandKey有时仅仅考虑聚合根ID可能还不够,虽然我还没遇到过这种情况,呵呵。所以我框架设计上,就允许开发者能重新GetKey方法,开发者需要理解何时需要重写这个方法。看了这里的说明应该就知道了!