Lebear
If you don't like the world, create one instead of complaining.

由遥控车引起的不可靠的消息服务(监听断连、顺序颠倒、消息丢失)的问题及解决

2022-01-12 java TCP WebRTC 消息队列
Word count: 5.6k | Reading time: 23min

背景:最近的项目中,有一个端到端控制的场景(即A发送命令,通过RTC服务,使B接收到消息并执行命令),其中为了开发方便使用了RTC服务作为两端通信的“桥梁“。每一组端,都连接到同一个RTC Room,这样一来,多组端都会互不影响。

上述业务可以简单抽象成下图,手柄 发送消息 “前进5米“ 到RTC服务器,然后 遥控车 接受到该rtc消息后,执行 命令(前进5米)。抽象模型和执行流程如下:

业务抽象表示

执行流程


All right!这一切看起来都是那么美好~~~

但是But

这个RTC服务是某不知名厂商提供的极其不稳定的服务,具备以下特点:

  1. 会掉线。
  2. 时序不能保证。
  3. 可能会丢失消息

可谓是条条致命啊!在刚开始的业务中,由于使用并不频繁,所以rtc服务也就相对较”稳定”,上述问题并没有浮现出来。但是在业务扩张后,真的是招招致命啊!

既然使用了rtc,那么就要对她负责!(F**K (╯‵□′)╯︵┻━┻),So, 我们来针对这几个问题分析一下,是否可解?(当然是可解,不然就不会有下文了)

首先,为什么会出现上述问题呢?

通过查阅资料,了解到:WebRTC使用了流控传输协议(SCTP),这个协议啊,其实是很棒的一个协议,同时兼备了TCP和UDP的功能,当然也比较复杂。具有多路复用等优点。通信是先建立四次握手建立SCTP连接,通过channel以流的形式发送消息(如下图),该通道有多路,数据互不干涉,但是当一条通道堵塞后,会导致该连接整体中断。

但是其服务提供者大多场景是在音视频通话,所以并没有启用可靠传输。没错,正是因此,导致时序错误、丢失消息。上面说到的通道阻塞又会导致SCTP连接中断则是掉线的罪魁祸首。

SCTP通道,其中msg可以以有序(绿色)发送,也可以以无序(黄色)发送


一、掉线问题

首先解决掉线问题,掉线是个相对不可预测的事件。所以我们可以考虑通过心跳探活和重连机制来处理。

举个例子:小明和小O在通过某信打语音电话,但是小明的网络并不好,所以小明会每隔两分钟向小O问:能听到我讲话嘛?小O回复:可以的。来保证网络正常,通话仍在继续。

以下是实现心跳的简单实现代码,其他细节则可忽略(例如小车移动)。

首先,我们模拟出RTC的实现,主要有RtcEvent接口用来实现事件监听、RtcMessage类统一消息体、RtcClient类则是Rtc的主要操作。

1
2
3
4
5
package rtc;
// 实现消息监听
public interface RtcEvent {
void handleMessage(RtcMessage msg);
}
1
2
3
4
5
6
7
8
9
10
11
package rtc;
// 统一化消息体
public class RtcMessage {
public String key; // 消息的Key
public int value; // 消息的内容
public long time; // 消息创建的时间戳

public RtcMessage() {
this.time = System.currentTimeMillis();
}
}

RtcClient也是模拟出来的,这部分主要模拟接收消息、掉线的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package rtc;
// 模拟Rtc功能SDK的实现
public class RtcClient {

private boolean shutdown = true;

public RtcClient(String uuid) {
shutdown = false;
}

// 监听发送来的心跳和消息,用线程来模拟
public void onListening(RtcEvent e) {

// 模拟定时接收心跳的线程
Thread t = new Thread(() -> {
for (int i = 0; !shutdown ; i++) {
RtcMessage heartbeat = new RtcMessage();
heartbeat.key = null;
e.handleMessage(heartbeat);
System.out.println("心跳:"+ i);

// 1、模拟中间接收到消息
if (i == 10) {
// 执行命令
RtcMessage cmdMsg = new RtcMessage();
cmdMsg.key = "MOVE";
cmdMsg.value = 1000;
e.handleMessage(cmdMsg);
}

// 2、模拟掉线
if (i == 15){
try {
Thread.sleep(2000);
break;
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
});
t.start();

}

// 模拟发送出去心跳
public void send(RtcMessage msg) {
System.out.println("send heartbeat: " + msg.time);
// e.handleMessage();
}

// 停止
public void close() {
// 让线程停止
this.shutdown = true;
}
}

然后我们可以去开发小车端的功能,利用心跳机制,避免断连。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import rtc.RtcClient;
import rtc.RtcMessage;

public class Car {

// SDK提供的 rtc 客户端
private static RtcClient rtcClient;
// 判断小车是否已经开启
public static volatile boolean isOn;
// 心跳线程
private static Thread heartbeatThread;
// 连接 的 rtc room 的 id
private String uuid;
// 上一次心跳时间
private long lastHeartbeatTime = -1;
// 死亡沟壑时间,如果超过2000毫秒未接受到心跳,则说明挂了,需要重启client
public static final int DEAD_GAP_TIME = 2000;

public Car(String uuid){
this.uuid = uuid;
rtcClient = new RtcClient(uuid);

// 定时发送心跳的线程
heartbeatThread = new Thread(() -> {
for(;;){
// 探活,是否可以按时接收到心跳,如果不可以,则说明client可能断开连接了,需要重新连接
checkAndReset();
System.out.println("last heartbeat: " + this.lastHeartbeatTime);
rtcClient.send(new RtcMessage());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 开启心跳探活线程
heartbeatThread.start();
}

/**
* 重置 rtc 连接
*/
private synchronized void checkAndReset() {
if (lastHeartbeatTime == -1){
// 刚初始化,跳过
return;
}
// 如果超过2000毫秒未接受到心跳,则说明挂了,需要重启client
if (System.currentTimeMillis() - lastHeartbeatTime > DEAD_GAP_TIME){
if(rtcClient != null){
rtcClient.close();
}
// 重新建立连接
rtcClient = new RtcClient(this.uuid);
startUp();// 重新监听
System.out.println("reset: "+ rtcClient);
}
}

/**
* 开启小车
*/
public synchronized void startUp(){

// 开始监听 uuid room 的 rtc的消息
rtcClient.onListening(receiveMsg -> {

// 判断如果是心跳,则更新上一次探活时间
if (receiveMsg.key == null){
lastHeartbeatTime = receiveMsg.time;
}else {
// 执行rtc消息传来的命令,让小车进行相应的动作
executeCommand(receiveMsg.key, receiveMsg.value);
System.out.println("[ Cmd: " + receiveMsg.key + ", val: " + receiveMsg.value + " ]");

}
});
}

/**
* 执行命令
* @param cmd 对应的指令
* @param v 值
*/
private void executeCommand(String cmd, int v){
switch (cmd) {
case "MOVE":
move(v);
break;
case "SPIN":
spin(v);
break;
}
}


/**
* 移动距离
* @param distance 距离
*/
private void move(int distance){
System.out.println("move " + distance);
}

/**
* 旋转角度
* @param angle 角度
*/
private void spin(int angle){
System.out.println("spin " + angle);
}

}

通过上面的代码,我们可以发现,对于连接断开的问题,我们通过心跳机制顺利解决~即在心跳间隔大于一定时间时候则判定断开连接,这个时候就要重新连接。至此,我们已经解决了第一个问题。

我们通过一个Demo程序,看一下心跳效果。

1
2
3
4
5
6
7
8
public class DemoMain {

public static void main(String[] args) throws InterruptedException {
Car c = new Car("10086");
c.startUp();
Thread.sleep(10000);
}
}


二、消息时序问题

我们上面的代码是在Client中开启线程,使用for循环,来模拟心跳(heartbeat)以及命令(Command)。在实际生产环境中,心跳可以使用for循环来实现;但是命令是随时可能发送的,然而我们使用的SCTP协议的时候没有保证消息接受的顺序,所以会产生什么问题呢?看下面的gif,蓝色表示木板,黄色箭头表示小车和车头方向。

正常情况下,控制器发送三条命令,依次是 移动1、旋转90°、移动1。小车按顺序接收命令,则可以顺利到达指定位置(图2-1)。

图2-1 正常情况

但是,如果消息并不是按顺序到达,而是乱序,就会出现下面的情况(图2-2),先接收到 SPIN,然后接收到 MOVE,就会导致小车从木板上掉落下去。

图2-2 乱序情况

显然,我们应该避免(图2-2)这样的情况,所以如何解决呢?我们可以借鉴TCP的机制。

既然大家看到了这篇文章,想必大家都或多或少了解TCP协议吧,在TCP协议中,通过SEQ和ACK的机制来确保报文顺序。具体是指发送方将要发送的数据分割成合适大小的报文段,然后在每个报文段标上序号,这个序号就是SEQ,接收端在接收到报文后,发送ACK来反馈自己已经接受到的数据报文,以及通知发送方下次发送的报文序列号。这个期间,接收方可能以乱序的形式接受到这些报文,那么在接收后,对报文进行排序,即可达到数据顺序正确的目的。

图源自网络(https://www.cnblogs.com/silyvin/p/11927398.html)

分析到这里,我们可以试一试走Seq这条路,给每个消息打上Seq,然后通过Seq来保证时序,这里先以简单的实现方式来出发,不考虑消息丢失,只考虑消息乱序。

首先,改造RtcMessage,使其支持消息序号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package rtc;

public class RtcMessage implements Comparable<RtcMessage> {
public String key;
public int value;
public long time;
public long seq;

public RtcMessage() {
this.time = System.currentTimeMillis();
}


@Override // 实现比较,为了让消息自排序,后面会讲为什么要实现这个方法
public int compareTo(RtcMessage o) {
if (this.seq > o.seq) {
return -1;
}
return 1;
}
}

然后改造RtcClient,模拟发送多条指令,以多线程的方式发送,并携带序号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package rtc;

import java.util.ArrayList;
import java.util.List;

public class RtcClient {

// 使用标志位来控制线程的停止
private boolean shutdown = true;

public RtcClient(String uuid) {
shutdown = false;
}

// 监听发送来的心跳和消息,用线程来模拟
public void onListening(RtcEvent e) {

// 定时发送心跳的线程
Thread t = new Thread(() -> {
for (int i = 0; !shutdown ; i++) {
RtcMessage heartbeat = new RtcMessage();
heartbeat.key = null;
e.handleMessage(heartbeat);
System.out.println("心跳:"+ i);

// 模拟中间接收到消息
if (i == 10) {

// 正确顺序,0-MOVE、1-SPIN、2-MOVE、3-SPIN、4-MOVE
// 先生成顺序消息
List<RtcMessage> msgList = new ArrayList<>(); // 使用list装
for(int j = 0; j < 5; j++){
RtcMessage cmdMsg = new RtcMessage();
if ((j & 1) == 0){
// 执行命令
cmdMsg.key = "MOVE";
cmdMsg.value = 1000;
}else {
// 执行命令
cmdMsg.key = "SPIN";
cmdMsg.value = 90;
}
cmdMsg.seq = j;
msgList.add(cmdMsg);
}
// 然后开启多线程发送指令
msgList.forEach(msg -> {
new Thread(() -> {
e.handleMessage(msg);
}).start();
});


}

// 模拟掉线
if (i == 15){
try {
Thread.sleep(2000);
break;
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
});
t.start();

}

// 模拟发送出去心跳
public void send(RtcMessage msg) {
System.out.println("send heartbeat: " + msg.time);
// e.handleMessage();
}

// 停止
public void close() {
// 让线程停止
this.shutdown = true;
}
}

然后,我们在Car中打印出执行的指令和序列号,出现了下面的情况,小车接收到的指令顺序并不正确,但是如果我们直接按照这个顺序执行(转180°,再走3000米),就会出大问题;而我们期望的是(走1000米,转90°,走1000米,转90°,走1000米)。

我们上文中引入了Seq的概念,所以接下来,我们在小车接收端(Car)也使用以下吧~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import rtc.RtcClient;
import rtc.RtcMessage;

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

public class Car {

// SDK提供的 rtc 客户端
private static RtcClient rtcClient;
// 判断小车是否已经开启
public static volatile boolean isOn;
// 心跳线程
private static Thread heartbeatThread;
// 连接 的 rtc room 的 id
private String uuid;
// 上一次心跳时间
private long lastHeartbeatTime = -1;
// 死亡沟壑时间,如果超过2000毫秒未接受到心跳,则说明挂了,需要重启client
private static final int DEAD_GAP_TIME = 2000;
// 记录上一次消费的命令Seq
private static final AtomicLong lastSeq = new AtomicLong();
// 顺序队列
private static final PriorityBlockingQueue<RtcMessage> waitingQueue = new PriorityBlockingQueue<>();
// 锁
private static final ReentrantLock lock = new ReentrantLock();

public Car(String uuid) {
this.uuid = uuid;
rtcClient = new RtcClient(uuid);

// 定时发送心跳的线程
heartbeatThread = new Thread(() -> {
for (; ; ) {
// 探活,是否可以按时接收到心跳,如果不可以,则说明client可能断开连接了,需要重新连接
checkAndReset();
System.out.println("last heartbeat: " + this.lastHeartbeatTime);
rtcClient.send(new RtcMessage());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 开启心跳探活线程
heartbeatThread.start();
initMsgWaitingQueue();
}

/**
* 初始化命令消费等待队列,开启线程,轮询等待队列,做到有顺序消费(执行命令)
*/
private void initMsgWaitingQueue() {
// 初始化首次消费命令顺序为0,以后每消费一次,序列号都+1,这个在tcp中可以作为ACK
new Thread(() -> {
for (; ; ) {
lock.lock();
System.out.println("wait for command...");
if (waitingQueue.size() == 0) {
// 还没有消息
// System.out.println("null message.");
} else {
RtcMessage receiveMsg = waitingQueue.peek();

// 如果本次有消息
// 如果这次的消息序列号不等于上次+1,即 seq != lastSeq + 1,则还需要再把它入队列,再等等
if (receiveMsg.seq != lastSeq.get()) {
// 既然不相等,那就需要再减回去
System.out.printf("receiveMsg seq: %d, last seq: %d\n", receiveMsg.seq, lastSeq.get());
System.out.println("not sequence message, continue wait...");
} else {
// 若 seq = lastSeq + 1,则直接执行命令,进行消费
executeCommand(receiveMsg.key, receiveMsg.value);
lastSeq.addAndGet(1); // 标记上次消费的位置
System.out.println("[ Seq: " + receiveMsg.seq + ", Cmd: " + receiveMsg.key + ", val: " + receiveMsg.value + " ]");
waitingQueue.poll();
continue;
}
}
lock.unlock();

// 等一下消息吧
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}).start();
}

/**
* 重置 rtc 连接
*/
private synchronized void checkAndReset() {
if (lastHeartbeatTime == -1){
// 刚初始化,跳过
return;
}
// 如果超过2000毫秒未接受到心跳,则说明挂了,需要重启client
if (System.currentTimeMillis() - lastHeartbeatTime > DEAD_GAP_TIME) {
if (rtcClient != null) {
rtcClient.close();
}
// 重新建立连接
rtcClient = new RtcClient(this.uuid);
startUp();// 重新监听
System.out.println("reset: " + rtcClient);
}
}

/**
* 开启小车
*/
public synchronized void startUp() {

// 开始监听 uuid room 的 rtc的消息
rtcClient.onListening(receiveMsg -> {

// 判断如果是心跳,则更新上一次探活时间
if (receiveMsg.key == null) {
lastHeartbeatTime = receiveMsg.time;
} else {
// 执行rtc消息传来的命令,让小车进行相应的动作
// executeCommand(receiveMsg.key, receiveMsg.value);
// System.out.println("[ Seq: " + receiveMsg.seq + ", Cmd: " + receiveMsg.key + ", val: " + receiveMsg.value + " ]");
addWaitingQueue(receiveMsg); // 添加等待队列,来代替直接消费,这里使用优先队列保证顺序
}
});
}

/**
* @param receiveMsg 接收到的消息
*/
private void addWaitingQueue(RtcMessage receiveMsg) {
// 加个锁吧~
lock.lock();
waitingQueue.offer(receiveMsg);
lock.unlock();
}


/**
* 执行命令
*
* @param cmd 对应的指令
* @param v 值
*/
private void executeCommand(String cmd, int v) {
switch (cmd) {
case "MOVE":
move(v);
break;
case "SPIN":
spin(v);
break;
}
}


/**
* 移动距离
*
* @param distance 距离
*/
private void move(int distance) {
System.out.println("move " + distance);
}

/**
* 旋转角度
*
* @param angle 角度
*/
private void spin(int angle) {
System.out.println("spin " + angle);
}

public void close() {
synchronized (waitingQueue) {
waitingQueue.forEach(c -> System.out.print("*****************" + c.seq + ", "));
System.out.println();
}

}
}

嘻嘻,解决啦~

1
上面通过对 小车(Car) 的改造,将【收到命令立即执行命令】改为【收到命令后先放入优先队列】,然后【对命令进行有序执行】,顺利的解决了命令乱序的问题,真的是 Awesome ~

三、消息丢失问题

按顺序看下来的小伙伴们可以从上面了解到,在处理命令乱序问题时候,我们默认消息是不丢失的;但是呢,这个消息可并不是那么乐观,可能存在丢失的情况,这可就不好了,既然消息丢失,那我们的命令很可能无法接收到,也会出现很严重的后果!

当然,这个我们也是有解滴有小伙伴可能会讲了:这不就用上面说到的TCP的可靠传输的方案来解决不就可以了嘛?没错,可以!但是笔者在这里小小偷了个懒,使用了一种不是很好但能快速解决问题的方案:多倍发包。至于TCP的可靠传输,可以下来自己尝试实现一下

我们看一下,什么是多倍发包。

举个栗子:快要过年了,小明想要抢票回家,但是呢,一票难求,大家都懂。所以小明在放票前,打开了10个相同的买票窗口,在到点时,快速的把每个窗口都点了一遍买票。由于小明的“多倍发包”,成功抢到了回家的票~

对于这个消息丢失,我们可以通过每次执行命令,发多个相同的指令,在小车接收端,对于相同序号的指令只执行一遍,这样就可以在即使部分指令丢了的情况下,也最大程度保证消息的完整。下面我们来实现一下吧~

首先,模拟消息丢失,在 Car 中的 startUp() 开始监听消息的方法中,加上随机丢失的代码,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Car.class
/**
* 开启小车
*/
public synchronized void startUp() {

// 开始监听 uuid room 的 rtc的消息
rtcClient.onListening(receiveMsg -> {
// 判断如果是心跳,则更新上一次探活时间
if (receiveMsg.key == null) {
lastHeartbeatTime = receiveMsg.time;
} else {
// 执行rtc消息传来的命令,让小车进行相应的动作

if(new Random().nextBoolean()){
// 模拟随机丢包
System.out.println("消息丢失啦【" + receiveMsg.seq + ", "+receiveMsg.key + ": " + receiveMsg.value + "】");
return;
}
// executeCommand(receiveMsg.key, receiveMsg.value);
// System.out.println("[ Seq: " + receiveMsg.seq + ", Cmd: " + receiveMsg.key + ", val: " + receiveMsg.value + " ]");
addWaitingQueue(receiveMsg); // 添加等待队列,来代替直接消费,这里使用优先队列保证顺序
}
});
}

我们执行一下,下图就是模拟在接收数据时,1、4两个命令都丢失了,导致程序无法正常执行命令。

丢包情况

接下来,添加多倍发包,在RtcClient中模拟,在OnListening()方法中,同样的指令发送多次,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// RtcClient.class
// 这里可以设置多倍发包的倍数
private static final int MULTIPLE_MESSAGE_SEND_TIMES = 3;
// 监听发送来的心跳和消息,用线程来模拟
public void onListening(RtcEvent e) {

// 定时发送心跳的线程
Thread t = new Thread(() -> {
for (int i = 0; !shutdown ; i++) {
RtcMessage heartbeat = new RtcMessage();
heartbeat.key = null;
e.handleMessage(heartbeat);
System.out.println("心跳:"+ i);

// 模拟中间接收到消息
if (i == 10) {

// 正确顺序,0-MOVE、1-SPIN、2-MOVE、3-SPIN、4-MOVE
// 先生成顺序消息
List<RtcMessage> msgList = new ArrayList<>(); // 使用list装
for(int j = 0; j < 5; j++){
RtcMessage cmdMsg = new RtcMessage();
if ((j & 1) == 0){
// 执行命令
cmdMsg.key = "MOVE";
cmdMsg.value = 1000;
}else {
// 执行命令
cmdMsg.key = "SPIN";
cmdMsg.value = 90;
}
cmdMsg.seq = j;
msgList.add(cmdMsg);
}
// 然后开启多线程发送指令
msgList.forEach(msg -> {
new Thread(() -> {
// 多倍发包
for (int j = 0; j < MULTIPLE_MESSAGE_SEND_TIMES; j++) {
e.handleMessage(msg);
}
}).start();
});


}

// 模拟掉线
if (i == 15){
try {
Thread.sleep(2000);
break;
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
});
t.start();

}

执行,虽然这次能保证客户端能接收到每个Seq至少一条消息,但是,有些Seq的消息接收了多遍,如果都执行,那么也不符合预期。如下图,丢失了 【Seq=0、0、1、1、2、3、4、4】 这八条消息,接受到了【Seq=0、1、2、2、3、3、4】这七条消息,执行结果会如下,我们只能正确执行【0、1、2】三条消息的指令,不能继续往下执行了,因为当前队列头部为2,但是我们的lastSeq已经指向3,所以我们不能继续执行2.

多倍发包

所以我们继续修改Car小车接收端,如果命令被执行过,则直接丢掉,不进入阻塞队列,这样我们就可以继续有序的执行命令。只需要对监听 waitingQueue 的方法进行处理即可,让重复的命令包丢掉。下面是对Car类下的 initMsgWaitingQueue() 方法进行改造,在判断队列内消息的开始,先判断是否已经接收过这个Seq,接受过的话直接丢掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
Car.class
/**
* 初始化命令消费等待队列,开启线程,轮询等待队列,做到有顺序消费(执行命令)
*/
private void initMsgWaitingQueue() {
// 初始化首次消费命令顺序为0,以后每消费一次,序列号都+1,这个在tcp中可以作为ACK
new Thread(() -> {
for (; ; ) {
lock.lock();
System.out.println("wait for command...");
if (waitingQueue.size() == 0) {
// 还没有消息
// System.out.println("null message.");
} else {
RtcMessage receiveMsg = waitingQueue.peek();
// 如果队首的Command的Seq小于当前执行的lastSeq,那么说明这条命令已经被执行了,所以丢掉
if (receiveMsg.seq < lastSeq.get()){
waitingQueue.poll(); // 丢掉
continue; // 直接进行下一轮
}

// 如果本次有消息
// 如果这次的消息序列号不等于上次+1,即 seq != lastSeq + 1,则还需要再把它入队列,再等等
if (receiveMsg.seq != lastSeq.get()) {
// 既然不相等,那就需要再减回去
System.out.printf("receiveMsg seq: %d, last seq: %d\n", receiveMsg.seq, lastSeq.get());
System.out.println("not sequence message, continue wait...");
} else {
// 若 seq = lastSeq + 1,则直接执行命令,进行消费
executeCommand(receiveMsg.key, receiveMsg.value);
lastSeq.addAndGet(1); // 标记上次消费的位置
System.out.println("[ Seq: " + receiveMsg.seq + ", Cmd: " + receiveMsg.key + ", val: " + receiveMsg.value + " ]");
waitingQueue.poll();
continue;
}
}
lock.unlock();

// 等一下消息吧
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}


// try {
// RtcMessage receiveMsg = waitingQueue.poll(1, TimeUnit.SECONDS);
// if (receiveMsg == null) {
// System.out.println("null message.");
// continue;
// }
// waitingQueue.forEach(c -> System.out.println(c.seq));
// // 如果这次的消息序列号不等于上次+1,即 seq != lastSeq + 1,则还需要再把它入队列,再等等
// if (receiveMsg.seq != lastSeq.get() + 1) {
// addWaitingQueue(receiveMsg); // 再次入队
// Thread.sleep(1000); // 再等待一秒吧
// continue;
// }
// // 若 seq = lastSeq + 1,则直接执行命令
// executeCommand(receiveMsg.key, receiveMsg.value);
// lastSeq.addAndGet(1); // 标记上次消费的位置
// System.out.println("[ Seq: " + receiveMsg.seq + ", Cmd: " + receiveMsg.key + ", val: " + receiveMsg.value + " ]");
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}

}).start();
}

下面就是暴力发包的效果,即使丢了【1、2、2、3、3】消息,也可以看到后面按顺序执行了每一条指令,而且只执行了一次。

最终结果

以上,就是对不稳定消息服务的改造全部内容~其中许多地方写的不够严谨,例如客户端重连后,Seq会重置,比较重点不是在这里嘛,而且解决这个问题应该也不是太复杂,大家可以搞一下嘛。

Author: Leisurelybear

Link: https://blog.lebear.top/2022/01/12/452/

Copyright: Copyright © 2019-2022 LeisurelyBear All rights reserved.

NextPost >
Red Dead Redemption 2: 永远的亚瑟 · 摩根
CATALOG
  1. 1. 一、掉线问题
  2. 2. 二、消息时序问题
  3. 3. 三、消息丢失问题