分布式事务最终一致性解决方案-DTCS

简介

DTCS(Data Transfer Consistency Service):数据一致性传输服务,通过MQ+一致性保障所实现的数据一致性传输服务。为全方面保障数据传输的一致性,增加运维人员人工介入纠正未及时实现数据一致性的处理方式。

业务调用时序

业务调用时序

后续

表决心: 此方案后续坚持实现后,必将开源于Github,届时请同行不吝指正。


观点仅代表自己,期待你的留言。

ubuntu下thinkpad电池阀设置(待验证)

设置电池阀

sudo add-apt-repository ppa:linrunner/tlp
sudo apt-get update

sudo apt-get install tlp tlp-rdw
sudo apt-get install tp-smapi-dkms acpi-call-tools

sudo gedit /etc/default/tlp

Main battery (values in %)

START_CHARGE_THRESH_BAT0=10
STOP_CHARGE_THRESH_BAT0=96

之后执行

sudo tlp setcharge

重启 OK 电池阀设置完成了

查看电池阀设置

tone@ubuntu:~$ sudo tlp-stat –battery
— TLP 0.4 ——————————————–

+++ ThinkPad Extended Battery Functions
tp-smapi = active
tpacpi-bat = active

+++ ThinkPad Battery Status (Main)
/sys/devices/platform/smapi/BAT0/manufacturer = LGC
/sys/devices/platform/smapi/BAT0/model = 42T4865
/sys/devices/platform/smapi/BAT0/manufacture_date = 2011-11-24
/sys/devices/platform/smapi/BAT0/first_use_date = 2012-04-13
/sys/devices/platform/smapi/BAT0/cycle_count = 63
/sys/devices/platform/smapi/BAT0/design_capacity = 62160 [mWh]
/sys/devices/platform/smapi/BAT0/last_full_capacity = 59470 [mWh]
/sys/devices/platform/smapi/BAT0/remaining_capacity = 42410 [mWh]
/sys/devices/platform/smapi/BAT0/remaining_percent = 71 [%]
/sys/devices/platform/smapi/BAT0/remaining_running_time_now = 189 [min]
/sys/devices/platform/smapi/BAT0/remaining_charging_time = not_charging [min]
/sys/devices/platform/smapi/BAT0/power_now = -13432 [mW]
/sys/devices/platform/smapi/BAT0/power_avg = -12742 [mW]

tpacpi-bat.BAT0.startThreshold = 10 [%]
tpacpi-bat.BAT0.stopThreshold = 96 [%]
tpacpi-bat.BAT0.forceDischarge = 0


观点仅代表自己,期待你的留言。

Linux下通过nginx实现直播间功能的实验

实验环境

  • 系统环境

    1
    2
    wujianjun@wujianjun-work ~ $ uname -a
    Linux wujianjun-work 4.10.0-37-generic #41~16.04.1-Ubuntu SMP Fri Oct 6 22:42:59 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
  • 软件环境
    OBS(Open Broadcaster Software) v20.0.1 (Linux)

    nginx version: nginx/1.13.6
    built by gcc 5.4.0 20160609 (Ubuntu 5.4.0-6ubuntu1~16.04.5)
    built with OpenSSL 1.0.2g 1 Mar 2016
    TLS SNI support enabled
    configure arguments: –with-pcre=pcre-8.38 –add-module=nginx-rtmp-module-1.1.11

Nginx+obs安装及配置

安装obs

1
2
3
4
5
wujianjun@wujianjun-work ~ $ sudo add-apt-repository ppa:kirillshkrogalev/ffmpeg-next
wujianjun@wujianjun-work ~ $ sudo apt-get update && sudo apt-get install ffmpeg
wujianjun@wujianjun-work ~ $ sudo apt-get install obs-studio
wujianjun@wujianjun-work ~ $ sudo add-apt-repository ppa:obsproject/obs-studio
wujianjun@wujianjun-work ~ $ sudo apt-get update && sudo apt-get install obs-studio

nginx加装rtmp模块

nginx-rtmp-module (https://github.com/arut/nginx-rtmp-module)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
wujianjun@wujianjun-work ~ $ sudo apt-get install build-essential
wujianjun@wujianjun-work ~ $ wget wget http://nginx.org/download/nginx-1.13.6.tar.gz
wujianjun@wujianjun-work ~/nginx-1.13.6 $ wget https://github.com/arut/nginx-rtmp-module/archive/v1.1.11.tar.gz
wujianjun@wujianjun-work ~/nginx-1.13.6 $ tar -xvf v1.1.11.tar.gz
wujianjun@wujianjun-work ~/nginx-1.13.6 $ wget http://jaist.dl.sourceforge.net/project/pcre/pcre/8.38/pcre-8.38.tar.gz
wujianjun@wujianjun-work ~/nginx-1.13.6 $ tar -xvf pcre-8.38.tar.gz
wujianjun@wujianjun-work ~/nginx-1.13.6 $ ls -all
总用量 748
drwxr-xr-x 9 wujianjun wujianjun 4096 10月 15 11:39 .
drwxr-xr-x 63 wujianjun wujianjun 4096 10月 15 11:33 ..
drwxr-xr-x 6 wujianjun wujianjun 4096 10月 15 11:33 auto
-rw-r--r-- 1 wujianjun wujianjun 282456 10月 10 23:22 CHANGES
-rw-r--r-- 1 wujianjun wujianjun 430416 10月 10 23:22 CHANGES.ru
drwxr-xr-x 2 wujianjun wujianjun 4096 10月 15 11:33 conf
-rwxr-xr-x 1 wujianjun wujianjun 2502 10月 10 23:22 configure
drwxr-xr-x 4 wujianjun wujianjun 4096 10月 15 11:33 contrib
drwxr-xr-x 2 wujianjun wujianjun 4096 10月 15 11:33 html
-rw-r--r-- 1 wujianjun wujianjun 1397 10月 10 23:22 LICENSE
drwxr-xr-x 2 wujianjun wujianjun 4096 10月 15 11:33 man
drwxrwxr-x 6 wujianjun wujianjun 4096 2月 13 2017 nginx-rtmp-module-1.1.11
drwxr-xr-x 7 wujianjun wujianjun 4096 11月 23 2015 pcre-8.38
-rw-r--r-- 1 wujianjun wujianjun 49 10月 10 23:22 README
drwxr-xr-x 9 wujianjun wujianjun 4096 10月 15 11:33 src
wujianjun@wujianjun-work ~/nginx-1.13.6 $ ./configure --with-pcre=pcre-8.38 --add-module=nginx-rtmp-module-1.1.11
wujianjun@wujianjun-work ~/nginx-1.13.6 $ make && sudo make install
wujianjun@wujianjun-work ~/nginx-1.13.6 $ ls -all /usr/local/nginx/
总用量 24
drwxr-xr-x 6 root root 4096 10月 15 16:11 .
drwxr-xr-x 11 root root 4096 10月 15 16:11 ..
drwxr-xr-x 2 root root 4096 10月 15 16:11 conf
drwxr-xr-x 2 root root 4096 10月 15 16:11 html
drwxr-xr-x 2 root root 4096 10月 15 16:11 logs
drwxr-xr-x 2 root root 4096 10月 15 16:11 sbin

增加rtmp协议配置

1
wujianjun@wujianjun-work ~/nginx-1.13.6 $ sudo vi /usr/local/nginx/conf/nginx.conf

在nginx.conf文件末尾增加以下rtmp协议的配置

1
2
3
4
5
6
7
8
9
10
11
rtmp {
server {
listen 1935;
chunk_size 4096;

application live {
live on;
record off;
}
}
}

启动&测试

  • 启动nginx

    1
    wujianjun@wujianjun-work ~/nginx-1.13.6 $ sudo /usr/local/nginx/sbin/nginx
  • 启动OBS
    打开刚安装的OBS软件,在来源处配置图像的推送来源(我这里选择窗口捕获),点击右下角”设置”,进行如下图配置流推送地址

配置完成后,点击”开始推流”

  • 启动支持网络流播放的视频播放器(演示使用vlc播放器)
    配置网络流播放的地址,如下图:

    当点击”播放”后,稍等几秒,即可看到播放器显示了obs捕获的图像了。

由于视频流需要通过网络进行传输,所以直播图像会有几秒的延迟。

http访问直播视频

1、更改nginx.conf中配置,增加hls配置(hls是在流媒体服务器中用来存放流媒体的文件夹),再次hls所在目录设置为http协议访问目录即可,更改后的配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
rtmp {
server {
listen 1935;
chunk_size 4096;

application live {
live on;
hls on;
hls_path /usr/share/nginx/html/hls;
hls_fragment 5s;
}
}
}

http {
server {
listen 80;
.....
location / {
#root html;
root /usr/share/nginx/html;
index index.html index.htm;
}
.....
}
}

注意: hls所在目录nginx的用户必须有写入权限。

2、obs软件配置录制流名称
在配置obs推送流URL的下方有一个设置”流名称”的地方,这里可以随意填写一个名称(我这里示例填入”test”)

3、重启一下nginx与obs软件,我们即可在手机浏览器中输入 http://ip/hls/test.m3u8 即可通过手机播放直播视频。(直播延迟有点大,后续出文章优化)


观点仅代表自己,期待你的留言。

分布式下数据事务

一、概念

1、CAP定律

Consistency(一致性): 数据一致更新,针对集群内所有节点数据变动都是同步完成。
Availability(可用性): 在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。
Partition tolerance(分区容错性): 以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。

It states, that though its desirable to have Consistency, High-Availability and Partition-tolerance in every system, unfortunately no system can achieve all three at the same time.
在分布式系统的设计中,没有一种设计可以同时满足一致性,可用性,分区容错性 3个特性

CAP定制

2、ACID模型

Atomicity(原子性): 事务作为一个整体被执行,包含在其中的对数据库的操作要么全部被执行,要么都不执行。
Consistency(一致性): 事务应确保数据库的状态从一个一致状态转变为另一个一致状态。一致状态的含义是数据库中的数据应满足完整性约束。
Isolation(隔离性): 多个事务并发执行时,一个事务的执行不应影响其他事务的执行。
Durability(持久性): 一个事务一旦提交,他对数据库的修改应该永久保存在数据库中。

3、BASE模型(反ACID模型)

Basically Available(基本可用): 基本可用是指分布式系统在出现不可预知故障的时候,允许损失部分可用性。
Soft state(软状态): 软状态和硬状态相对,是指允许系统中的数据存在中间状态,并认为该中间状态的存在不会影响系统的整体可用性,数据更改时允许集群的不同节点有一段时间进行数据状态同步。
Eventually consistent(最终一致性): 事务处理过程中,会有短暂不一致的情况,但通过恢复系统,可以让事务的数据达到最终一致的目标。

二、强一致性解决方案

1、2PC (二阶段提交)

算法思路: 参与者将操作成败通知协调者,再由协调者根据所有参与者的反馈情报决定各参与者是否要提交操作还是中止操作。
XA协议: XA是一个分布式事务协议,由Tuxedo提出。XA中大致分为两部分:事务管理器(Transaction Manager)和本地资源管理器(Local Resource Manager)。其中本地资源管理器往往由数据库实现,比如Oracle、DB2这些商业数据库都实现了XA接口,而事务管理器作为全局的调度者,负责各个本地资源的提交和回滚。

X/Open DTP模型

二阶段:

  1. 准备阶段: 事务协调者(事务管理器)给每个参与者(资源管理器)发送Prepare消息,每个参与者要么直接返回失败(如权限验证失败),要么在本地执行事务,写本地的redo和undo日志,但不提交本地事务。
  2. 提交阶段: 如果协调者收到了参与者的失败消息或者超时,直接给每个参与者发送回滚(Rollback)消息;否则,发送提交(Commit)消息;参与者根据协调者的指令执行提交或者回滚操作,释放所有事务处理过程中使用的锁资源。(注意:必须在最后阶段释放锁资源)

缺点:

  1. 同步阻塞问题。执行过程中,所有参与节点都是事务阻塞型的。当参与者占有公共资源时,其他第三方节点访问公共资源不得不处于阻塞状态。
  2. 单点故障。由于协调者的重要性,一旦协调者发生故障。参与者会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。(如果是协调者挂掉,可以重新选举一个协调者,但是无法解决因为协调者宕机导致的参与者处于阻塞状态的问题)
  3. 数据不一致。在二阶段提交的阶段二中,当协调者向参与者发送commit请求之后,发生了局部网络异常或者在发送commit请求过程中协调者发生了故障,这回导致只有一部分参与者接受到了commit请求。而在这部分参与者接到commit请求之后就会执行commit操作。但是其他部分未接到commit请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据部一致性的现象。
  4. 二阶段无法解决的问题:协调者再发出commit消息之后宕机,而唯一接收到这条消息的参与者同时也宕机了。那么即使协调者通过选举协议产生了新的协调者,这条事务的状态也是不确定的,没人知道事务是否被已经提交。

2、3PC (三阶段提交)

与两阶段提交不同的是,三阶段(CanCommit、PreCommit、DoCommit)提交有两个改动点:

  1. 引入超时机制。同时在协调者和参与者中都引入超时机制。
  2. 在第一阶段和第二阶段中插入一个准备阶段。保证了在最后提交阶段之前各参与节点的状态是一致的。

CanCommit阶段: 3PC的CanCommit阶段其实和2PC的准备阶段很像。协调者向参与者发送commit请求,参与者如果可以提交就返回Yes响应,否则返回No响应。
PreCommit阶段: 协调者根据参与者的反应情况来决定是否可以继续事务的PreCommit操作。如果参与者执行完本地事务操作则返回YES,假如有任何一个参与者向协调者发送了No响应,或者等待超时之后,协调者都没有接到参与者的响应,那么就执行事务的中断。
doCommit阶段: 该阶段由协调者通知参与者的PreCommit阶段反馈进行判断,最终决定是真正的事务提交,还是执行事务回滚。

缺点:
在doCommit阶段,如果参与者无法及时接收到来自协调者的doCommit或者rebort请求时,会在等待超时之后,会继续进行事务的提交。(其实这个应该是基于概率来决定的,当进入第三阶段时,说明参与者在第二阶段已经收到了PreCommit请求,那么协调者产生PreCommit请求的前提条件是他在第二阶段开始之前,收到所有参与者的CanCommit响应都是Yes。(一旦参与者收到了PreCommit,意味他知道大家其实都同意修改了)所以,一句话概括就是,当进入第三阶段时,由于网络超时等原因,虽然参与者没有收到commit或者abort响应,但是他有理由相信:成功提交的几率很大。 )

3、Paxos算法

待补充

三、最终一致性解决方案

1、本地消息表

设计思想: 是将远程分布式事务拆分成一系列的本地事务。借助关系型数据库中的表即可实现。
集群节点同步: 定时扫描本地消息表,将需要同步状态的消息产生MQ消息通过向时效性高的MQ放入消息,再由其它集群节点消费该消息完成通知,通过消息消费状态表来避免MQ消息的重复消费。

2、RocketMQ(事务消息)

以网传处理流程为例进行说明

说明:
1、当第3步confirmB失败时,则交由RMQ定时调用checkTransaction进行处理结果的检测,如果为失败,则rollback,否则发送消息到B账户。
2、当第5步consumeB失败时,则交由RMQ定时进行重试,但需要设置最大重试次数,如果达到最大次数依然失败,则需要人工介入进行修正。

所以RMQ需要一个人工修正的控制台,当系统通过重试无法进行修正时以人工做为做终的修正手段来做保障最终事务一致性。


观点仅代表自己,期待你的留言。

Cron表达式说明

CronTrigger

CronTriggers往往比SimpleTrigger更有用,如果您需要基于日历的概念,而非SimpleTrigger完全指定的时间间隔,复发的发射工作的时间表。
CronTrigger,你可以指定触发的时间表如“每星期五中午”,或“每个工作日9:30时”,甚至“每5分钟一班9:00和10:00逢星期一上午,星期三星期五“。
即便如此,SimpleTrigger一样,CronTrigger拥有的startTime指定的时间表时生效,指定的时间表时,应停止(可选)结束时间。

Cron Expressions

cron的表达式被用来配置CronTrigger实例。 cron的表达式是字符串,实际上是由七子表达式,描述个别细节的时间表。
这些子表达式是分开的空白,代表:

  1. Seconds
  2. Minutes
  3. Hours
  4. Day-of-Month
  5. Month
  6. Day-of-Week
  7. Year (可选字段)
    例 “0 0 12 ? * WED” 在每星期三下午12:00 执行,

个别子表达式可以包含范围, 例如,在前面的例子里(“WED”)可以替换成 “MON-FRI”, “MON, WED, FRI”甚至”MON-WED,SAT”.

“* ” 代表整个时间段.

每一个字段都有一套可以指定有效值,如

Seconds (秒) :可以用数字0-59 表示,

Minutes(分) :可以用数字0-59 表示,

Hours(时) :可以用数字0-23表示,

Day-of-Month(天) :可以用数字1-31 中的任一一个值,但要注意一些特别的月份

Month(月) :可以用0-11 或用字符串 “JAN, FEB, MAR, APR, MAY, JUN, JUL, AUG, SEP, OCT, NOV and DEC” 表示

Day-of-Week(每周):可以用数字1-7表示(1 = 星期日)或用字符口串“SUN, MON, TUE, WED, THU, FRI and SAT”表示

“/”:为特别单位,表示为“每”如“0/15”表示每隔15分钟执行一次,“0”表示为从“0”分开始, “3/20”表示表示每隔20分钟执行一次,“3”表示从第3分钟开始执行

“?”:表示每月的某一天,或第周的某一天

“L”:用于每月,或每周,表示为每月的最后一天,或每个月的最后星期几如“6L”表示“每月的最后一个星期五”

“W”:表示为最近工作日,如“15W”放在每月(day-of-month)字段上表示为“到本月15日最近的工作日”

““#”:是用来指定“的”每月第n个工作日,例 在每周(day-of-week)这个字段中内容为”6#3” or “FRI#3” 则表示“每月第三个星期五”

1)Cron表达式的格式:秒 分 时 日 月 周 年(可选)。

字段名 允许的值 允许的特殊字符
0-59 , - * /
0-59 , - * /
小时 0-23 , - * /
1-31 , - * ? / L W C
1-12 or JAN-DEC , - * /
周几 1-7 or SUN-SAT , - * ? / L C #
年 (可选字段) empty, 1970-2099 , - * /
“?”字符:表示不确定的值

“,”字符:指定数个值

“-”字符:指定一个值的范围

“/”字符:指定一个值的增加幅度。n/m表示从n开始,每次增加m

“L”字符:用在日表示一个月中的最后一天,用在周表示该月最后一个星期X

“W”字符:指定离给定日期最近的工作日(周一到周五)

“#”字符:表示该月第几个周X。6#3表示该月第3个周五

2)Cron表达式范例:

每隔5秒执行一次:*/5 * * * * ?

每隔1分钟执行一次:0 */1 * * * ?

每天23点执行一次:0 0 23 * * ?

每天凌晨1点执行一次:0 0 1 * * ?

每月1号凌晨1点执行一次:0 0 1 1 * ?

每月最后一天23点执行一次:0 0 23 L * ?

每周星期天凌晨1点实行一次:0 0 1 ? * L

在26分、29分、33分执行一次:0 26,29,33 * * * ?

每天的0点、13点、18点、21点都执行一次:0 0 0,13,18,21 * * ?

转载自:http://www.cnblogs.com/maybo/p/5189617.html
观点仅代表自己,期待你的留言。

多标识位存储优化方案

背景

在做程序开发的过程中常常会遇到数据的标识位(取值为”是”与”否”)需要进行存储,
如神马专车系统消息中心中的一个场景:新建一个消息需要勾选发送对象,可选的对象有:司机,下单人,乘车人。
最常见的Mysql存储方式为一个消息表中包含三个字段sendDriver,sendUser,sendPassenger
按用户新建消息所勾选的情况,依次存储到数据表中并在后续的业务中直接获取值进行对比。

优化方案

利用”与或非”的运算可将这多个标识符存储到一个字段中。
实例如下:
按业务这里有三个标识符需要存储,因此可以定义一个长度为3的二进行序列:000,从左至右第一位表示司机,第二位表示下单人,第三位表示乘车人。
那么,如果只需要发送给司机则可标识为100的二进行序列,如果下单人也需要通知,则可标识为110的二进行序列,依次,如果都需要通知则为111的二进制序列。
存储的值则为二进行制序列对应的十进制即可,当需要判断时,则可采用”与”运算符进行判定。

示例Java代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static final byte NOTIFY_PASSENGER = 0B001; //二进制为001
public static final byte NOTIFY_SUBSCRIBER = 0B010; //二进制为010
public static final byte NOTIFY_DRIVER = 0B100; //二进制为100

/**
* 获取需要存储的值则为notifyFlag
*/

public byte getStoreValue() {
byte notifyFlag = 0B000; //二进制为000
if (request.isNotifyDriver()) {
notifyFlag |= NOTIFY_DRIVER;
}
if (request.isNotifyPassenger()) {
notifyFlag |= NOTIFY_PASSENGER;
}
if (request.isNotifySubscriber()) {
notifyFlag |= NOTIFY_SUBSCRIBER;
}
return notifyFlag;
}

/**
* 判断是否需要通知
* @param notifyTarget 预计需要通知的目标
* @return true-需要通知,false-不需要通知
*/

public boolean isNotify(byte notifyTarget) {
byte notifyFlag = 0B111;//二进制为111 , 从数据库取出
return notifyFlag & notifyTarget == notifyTarget;
}

在Java中byte的最大值为:127,二进行为0B1111111, 足够标识七个标识位!


观点仅代表自己,期待你的留言。

问题分析 之 no transaction is in progress

问题现象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
javax.persistence.TransactionRequiredException: no transaction is in progress
at org.hibernate.ejb.AbstractEntityManagerImpl.flush(AbstractEntityManagerImpl.java:301)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.springframework.orm.jpa.ExtendedEntityManagerCreator$ExtendedEntityManagerInvocationHandler.invoke(ExtendedEntityManagerCreator.java:365)
at $Proxy34.flush(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:240)
at $Proxy34.flush(Unknown Source)
... 53 more

示例代码 - Proxy

1
2
3
4
5
6
7
8
9
10
11
public class classA {

public void doA {
this.doB();
}

@Transaction
public void doB{
// do save or update
}
}

问题分析:
如上代码段,由于doB为对象内方法,而Spring事务的开启依赖到AOP(Proxy),在doA方法调用doB方法时,
由于是对象内的方法调用,造成doB方法的@Transaction不会被Proxy对象代理,进而造成Transaction失效。

解决方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ClassA {
private ClassB classB;

public void doA() {
classB.doB();
}
}

public class ClassB {

@Transaction
public void doB() {
// do save or update
}

}

将需要事务的方法doB通过Proxy进行代理,doA在使用时则是通过Spring开启事务的代理进行的调用。


观点仅代表自己,期待你的留言。

问题分析 之 Transaction marked as rollbackOnly

问题现象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
org.springframework.transaction.TransactionSystemException: Could not commit JPA transaction; nested exception is javax.persistence.RollbackException: Transaction marked as rollbackOnly
at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:526)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:757)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:726)
at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:521)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:291)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy49.lazyAssignOrder(Unknown Source)
at com.smzc.provider.order.facade.OrderFacade.lazyAssignOrder(OrderFacade.java:1015)
at com.smzc.provider.order.schedule.LazyAssignOrderTask$2.run(LazyAssignOrderTask.java:51)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: javax.persistence.RollbackException: Transaction marked as rollbackOnly
at org.hibernate.jpa.internal.TransactionImpl.commit(TransactionImpl.java:74)
at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:517)
... 13 more

代码示例-事务嵌套

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Transaction
public void doA() {
// do something
try {
doB();
} catch(Exception e) {
doC();
}
// do something
}

@Transaction
public void doA2() {
// do something
try {
doB();
} catch(Exception e) {

}
// do save or update
}

@Transaction
public void doB() {
// do save or update
if(true) {
throw new Exception("模拟异常");
}
}

@Transaction
public void doC() {
// do save or update
}

问题分析:
如上代码段,由于事务的传播性,doA,doB,doC方法其实共用的是由doA开启的同一个事务对象。
当doB方法抛出异常后事务被标记为回滚状态,再尝试执行doC方法或者执行任何的更改方法,在进行数据更新后进行事务commit时,此时则为抛出以上的异常。

解决方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ClassA {
private ClassB classB;
//@Transaction
public void doA() {
try {
classB.doB();
} catch(Exception e) {
classB.doC();
}
}
}

public class ClassB {
@Transaction
public void doB() {
if(1==1) {
throw new Exception("模拟异常");
}
}

@Transaction
public void doC() {
//有数据更改动作
}
}

以上的解决方法去掉了doA的事务,交由doB与doC分别开启两个事务解决,当doB失败时只标记doB的事务回滚,doC的事务依然能进行提交。


观点仅代表自己,期待你的留言。

Spring InitializingBean趟坑笔记

问题现象

在Spring完成上下文初始化完成后, InitializingBean的实现类中重写的afterPropertiesSet方法并未执行.

一、类对象延迟初始

1
2
3
4
5
6
7
8
9
10
11
12
public interface InitializingBean {
/**
* Invoked by a BeanFactory after it has set all bean properties supplied
* (and satisfied BeanFactoryAware and ApplicationContextAware).
* <p>This method allows the bean instance to perform initialization only
* possible when all bean properties have been set and to throw an
* exception in the event of misconfiguration.
* @throws Exception in the event of misconfiguration (such
* as failure to set an essential property) or if initialization fails.
*/

void afterPropertiesSet() throws Exception;
}

由于此接口的方法afterPropertiesSet是在对象的所有属性被初始化后才会调用。当Spring的配置文件中设置类初始默认为”延迟初始”(default-lazy-init="true",此值默认为false)时,
类对象如果不被使用,则不会实例化该类对象。所以 InitializingBean子类不能用于在容器启动时进行初始化的工作,则应使用Spring提供的ApplicationListener接口来进行程序的初始化工作。

另外,如果需要InitializingBean子类对象在Spring容器启动时就初始化并则容器调用afterPropertiesSet方法则需要在类上增加org.springframework.context.annotation.Lazy注解并设置为false即可(也可通过spring配置bean时添加lazy-init="false")。


观点仅代表自己,期待你的留言。

分布式系统下并发读取不重复数据

问题背景

分布式系统模块存在定时任务,当模块被部署多个实例时,不同实例的定时任务在运行时就存在从数据源中获取相同数据的情况,
虽然通过乐观锁(数据记录增加version,每次update则set verion+=1 where version=?)的处理方式可以避免多实例更新数据造成脏数据入库的问题,
但这无疑是降低了分布式系统部署的初衷(通过横向部署多实例增加系统的处理能力)。

经过分析,总结如下几种处理方式。

处理方式

1、数据置标识位

在数据库记录中增加flag,当数据被读取后则更改flag的值,避免记录被再次读取。

具体步骤如下:

  • 锁定分布式锁
  • 查询指定数量的数据
  • 更新已查询出数据的标识位
  • 释放分布式锁

当处理完成时: 增加数据version,重置标识位

2、单实例读取,多实例处理

读取数据Job:利用Redis的分布式锁保障一个实例拥有读取数据的步骤,读取出来的数据再存放到Redis中的需处理数据队列。

处理业务Job:所有部署实例通过监控Redis的需处理数据队列如果有数据则take一个数据出来进行业务处理。

具体步骤如下:

  • 锁定分布式锁
  • 检查需要处理的数据队列是否为空,如为空,则跳过当前读取任务。
  • 获取需要处理的数据并存入Redis的处理队列中
  • 释放分布式锁
    由于读取数据的步骤由单进程来获取,所以就避免了并发读取数据的问题

add by 2017.8.10 : 增加处理中的数据队列,在读取数据job时需要排除到正在处理的数据。用于当有数据在处理且数据未更新时,加载数据进行加载时将数据加载出的Bug。
add by 2017.9.13 : 将锁定分布式锁扩展到第一步骤,防止并发情况下通过队列容量检查,成功获取到锁后加载到重复记录。

3、利用自增ID

利用分布式锁+Redis存储已处理ID+数据的自增ID来获取数据数据。

具体步骤如下:

  • 锁定分布式锁
  • 从Redis中取出已处理数据的最大ID值(此处用x代替)
  • 从数据库中取出大于x获取出y(固定数据量)的数据
  • 更新Redis已处理的数据最大ID值
  • 释放分布式锁

存在的问题

问题描述: 数据任务在进行业务处理的过程中,如处理失败则再也不会被以上步骤再次获取。

解决方法1: 当业务处理失败时,在Redis中记录下失败的数据ID。在获取数据步骤中增加获取失败数据的步骤。

解决方法2: 当业务处理失败时,在Redis中记录下失败的数据信息。在业务处理环节增加处理已失败的数据重试步骤。

4、数据取余

利用zk获取到总实例数,计算模块实例需要处理的余值。

具体步骤如下:

  • 模块启动注册到zk,并计算当前实例需要处理的余值为x (当前实例index % 总实例数)
  • 从数据库取数据时,将数据主键%总实例数,如果余值为x,则取出数据,否则忽略此数据。
  • 当模块总实例数变化时,重新计算当前实例需要处理的余值。

风险: 由于模块实例所需要处理的数据固定,当数据取余后的值可能会命中同一个实例,造成模块各实例处理压力不平均。


观点仅代表自己,期待你的留言。