yinweiwen
3 years ago
2 changed files with 396 additions and 0 deletions
@ -0,0 +1,291 @@ |
|||||
|
# MQTT数据接收进程问题排查(之二) |
||||
|
|
||||
|
## 问题又来了 |
||||
|
|
||||
|
昨天通过EMQX的**共享订阅**,实现了recv进程的横向扩展,加之之前博客看到的问题是处理效率导致,兴高采烈的升级了程序。 |
||||
|
|
||||
|
运行不到一天,观察发现,3个实例中有2个出现了重启现象。之前的问题并没有能够解决。 |
||||
|
|
||||
|
通过KUBESPHERE监控查看: |
||||
|
|
||||
|
![image-20210525091201891](imgs/MQTT数据接收进程问题排查(之二)/image-20210525091201891.png) |
||||
|
|
||||
|
|
||||
|
|
||||
|
程序出现问题重启之前,从流量监控看,流入有大包(10min窗口内达6.15M)。同时流出的流量大概是100x的流入。之后一段时间内存成线性增长(后续消息堆积)直至程序崩溃。 |
||||
|
|
||||
|
结合之前日志中观察,很可能是因为出现大包(**Large message**),kafka发送异常、重试,导致资源耗尽。 |
||||
|
|
||||
|
## 先做保护 |
||||
|
|
||||
|
在Mqtt client消息回调函数中,过滤掉Kafka不能消化的大包 |
||||
|
|
||||
|
```java |
||||
|
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { |
||||
|
... |
||||
|
byte[] bts = mqttMessage.getPayload(); |
||||
|
if (bts.length > maxBytes) { |
||||
|
logger.info("large message. " + bts.length + ". Time:" + time + " Topic:" + topic + "-" + topic); |
||||
|
return; |
||||
|
} |
||||
|
... |
||||
|
} |
||||
|
``` |
||||
|
|
||||
|
|
||||
|
|
||||
|
## 为什么流出=100x流入? |
||||
|
|
||||
|
1. 已知的 2x3 倍 |
||||
|
|
||||
|
receive进程中设置了重试次数**3**, 如下: |
||||
|
|
||||
|
`dac.mqtt.recv.PahoMqttApp.java` |
||||
|
|
||||
|
```java |
||||
|
props.put("bootstrap.servers", this.props.getProperty("kafka.brokers")); |
||||
|
props.put("acks", this.props.getProperty("kafka.producer.acks", "1")); // 主分片应答 |
||||
|
props.put("retries", this.props.getProperty("kafka.producer.retries", "3")); // 重试次数3 |
||||
|
props.put("batch.size", this.props.getProperty("kafka.producer.batch.size", "1638400")); // 分批大小 |
||||
|
props.put("linger.ms", this.props.getProperty("kafka.producer.linger.ms", "5")); // 发送检查 5ms |
||||
|
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
||||
|
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
||||
|
props.put("max.request.size", 12695150); // 最大请求包 |
||||
|
``` |
||||
|
另外,发送的消息中ProducerRecord设置了Key属性(使用message整体作为Key,所以体积约为原先**2**倍) |
||||
|
|
||||
|
```java |
||||
|
ProducerRecord<String, String> record = new ProducerRecord<String, String>(kf, message, message); |
||||
|
``` |
||||
|
|
||||
|
2. 剩下的16倍。 |
||||
|
|
||||
|
**猜测**:`anxinyun_data` topic下有4个分片,分布在4个broker上, kafka重试的时候是否会对各个broker上的分片进行轮询尝试? |
||||
|
|
||||
|
查看源码中的处理。 |
||||
|
|
||||
|
|
||||
|
|
||||
|
### 消息发布语义 (Message Delivery Semantics) |
||||
|
|
||||
|
- *At most once*—Messages may be lost but are never redelivered. |
||||
|
- *At least once*—Messages are never lost but may be redelivered. |
||||
|
- *Exactly once*—this is what people actually want, each message is delivered once and only once. |
||||
|
|
||||
|
producer的ack参数 |
||||
|
|
||||
|
- 0 producer将消息发送到broker,不等待响应。 |
||||
|
- 1 发送后等待broker的响应,如果没有确认接收消息,producer将基于retry配置进行重试(retries 重试次数,默认0)。在此模式下,确认消息是由broker的主分片(Leader partition)发出,副本在拷贝过程中仍然可能出现数据丢失 |
||||
|
- ALL Broker在最小副本数同步完成后才会发出确认消息(The broker sends acknowledgment only after replication based on the `min.insync.replica` property. ) |
||||
|
|
||||
|
### 大消息处理 |
||||
|
|
||||
|
参考 [[调节kafka消费信息的大小]](https://www.cnblogs.com/xingfengzuolang/p/10762464.html) |
||||
|
|
||||
|
10k左右大小吞吐量性能最佳,当消息体过大时建议 |
||||
|
|
||||
|
- 文件存储,消息内发文件链接 |
||||
|
- 消息切片,消费端组合 |
||||
|
- 生产端 `compression.codec` 和`commpressed.topics`可以开启压缩功能,压缩算法可以使用GZip或Snappy。 |
||||
|
|
||||
|
broker配置: |
||||
|
|
||||
|
- **message.max.bytes** (默认:1000000; ~1M) – broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,否则broker就会因为消费端无法使用这个消息而挂起。 |
||||
|
- **log.segment.bytes** (默认: 1GB) – kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。 |
||||
|
- **replica.fetch.max.bytes** (默认: 1MB) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。 |
||||
|
|
||||
|
Consumer配置: |
||||
|
|
||||
|
- **fetch.message.max.bytes** (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。 |
||||
|
|
||||
|
Producer配置: |
||||
|
|
||||
|
- **max.request.size**:该参数是指定发送消息的最大尺寸,默认是1M,单位是字节。 |
||||
|
- **buffer.memory**:该参数是指定缓冲区的打小,默认是32M,单位是字节 |
||||
|
|
||||
|
|
||||
|
|
||||
|
### Producer源码 |
||||
|
|
||||
|
代码中部分概念: |
||||
|
|
||||
|
```json |
||||
|
Node -- Kafka Node |
||||
|
|
||||
|
``` |
||||
|
|
||||
|
发送部分源码摘要:<并未找到重试时对所有node的尝试代码 TODO> |
||||
|
|
||||
|
```java |
||||
|
// producer.internals.Sender |
||||
|
public class Sender{ |
||||
|
/** |
||||
|
* The main run loop for the sender thread |
||||
|
*/ |
||||
|
public void run() { |
||||
|
log.debug("Starting Kafka producer I/O thread."); |
||||
|
|
||||
|
// main loop, runs until close is called |
||||
|
while (running) { |
||||
|
try { |
||||
|
long pollTimeout = sendProducerData(now); |
||||
|
client.poll(pollTimeout, now); |
||||
|
} catch (Exception e) { |
||||
|
log.error("Uncaught error in kafka producer I/O thread: ", e); |
||||
|
} |
||||
|
} |
||||
|
... |
||||
|
} |
||||
|
|
||||
|
private long sendProducerData(long now) { |
||||
|
// 获取准备发送数据的分区信息 |
||||
|
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); |
||||
|
...准备更新partitions without leaders |
||||
|
...删除未准备的节点 |
||||
|
|
||||
|
// create produce requests |
||||
|
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, |
||||
|
this.maxRequestSize, now); |
||||
|
... |
||||
|
sendProduceRequests(batches, now); // client.send(clientRequest, now); |
||||
|
return pollTimeout; |
||||
|
} |
||||
|
} |
||||
|
... |
||||
|
} |
||||
|
|
||||
|
|
||||
|
// 队列缓存消息记录 |
||||
|
class RecordAccumulator{ |
||||
|
... |
||||
|
// 加入队列 |
||||
|
public RecordAppendResult append(TopicPartition tp,...){} |
||||
|
/** |
||||
|
* Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable |
||||
|
* partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated |
||||
|
* partition batches. |
||||
|
*/ |
||||
|
public ReadyCheckResult ready(Cluster cluster, long nowMs) { |
||||
|
|
||||
|
} |
||||
|
// Drain all the data for the given nodes and collate them into a list of batches |
||||
|
public Map<Integer, List<ProducerBatch>> drain(...){} |
||||
|
} |
||||
|
|
||||
|
class KafkaProducer{ |
||||
|
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { |
||||
|
... RecordAccumulator.append(...) // 队列满了 this.sender.wakeup();激活client |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// 底层的发送 网络客户端 |
||||
|
class NetworkClient(){ |
||||
|
... |
||||
|
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) { |
||||
|
... |
||||
|
RequestHeader header = clientRequest.makeHeader(request.version()); |
||||
|
... |
||||
|
Send send = request.toSend(nodeId, header); // make send (NetworkSend extends ByteBufferSend) |
||||
|
InFlightRequest inFlightRequest = new InFlightRequest(...); |
||||
|
this.inFlightRequests.add(inFlightRequest); // add to inflight |
||||
|
selector.send(inFlightRequest.send); |
||||
|
} |
||||
|
... |
||||
|
} |
||||
|
|
||||
|
// A nioSelector interface for doing non-blocking multi-connection network I/O. |
||||
|
public class Selector{ |
||||
|
public void send(Send send) { |
||||
|
channel.setSend(send); |
||||
|
} |
||||
|
|
||||
|
public void poll(long timeout){ |
||||
|
... |
||||
|
//poll from channels where the underlying socket has more data |
||||
|
pollSelectionKeys(readyKeys, false, endSelect); |
||||
|
} |
||||
|
|
||||
|
// handle any ready I/O on a set of selection keys |
||||
|
void pollSelectionKeys(Set<SelectionKey> selectionKeys, |
||||
|
boolean isImmediatelyConnected, |
||||
|
long currentTimeNanos) { |
||||
|
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */ |
||||
|
if (channel.ready() && key.isWritable()) { |
||||
|
Send send = channel.write(); |
||||
|
if (send != null) {...} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public class KafkaChannel{ |
||||
|
public void setSend(Send send) { |
||||
|
this.send = send; // Send : 处理中的数据发送接口模型。 包含:地址信息、完成、写、大小 |
||||
|
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); |
||||
|
} |
||||
|
|
||||
|
public Send write() throws IOException { |
||||
|
if (send != null && send(send)) { ... } |
||||
|
} |
||||
|
|
||||
|
private boolean send(Send send) throws IOException { |
||||
|
send.writeTo(transportLayer); |
||||
|
return send.completed(); |
||||
|
} |
||||
|
} |
||||
|
``` |
||||
|
|
||||
|
|
||||
|
|
||||
|
## 后续问题持续跟踪 |
||||
|
|
||||
|
### 升级问题 0525 |
||||
|
|
||||
|
在使用共享订阅的时候,**升级**recv导致原订阅者出现DISCONNECTED状态,出现数据丢失。如下图: |
||||
|
|
||||
|
![image-20210525173902767](imgs/MQTT数据接收进程问题排查(之二)/image-20210525173902767.png) |
||||
|
|
||||
|
这是因为代码里默认指定了Clean Session为false,即保留会话。这样在client id下线后,共享订阅的会话依然保留,数据就分流丢失了。 |
||||
|
|
||||
|
**解决方法:** 设置 Clean Session=true |
||||
|
|
||||
|
另外,因为我们指定了clientid为容器实例名称,进程异常重启时容器ID不变,mqtt客户端id也不会变化,重连后恢复保留会话,故数据没有丢失。 |
||||
|
|
||||
|
|
||||
|
|
||||
|
### EMQX代理问题 0531 |
||||
|
|
||||
|
emqx进程内存持续增长导致重启。 |
||||
|
|
||||
|
![image-20210531135639456](imgs/MQTT数据接收进程问题排查(之二)/image-20210531135639456.png) |
||||
|
|
||||
|
#### 查看EMQX代理飞窗和消息队列 |
||||
|
|
||||
|
https://docs.emqx.cn/broker/v4.3/advanced/inflight-window-and-message-queue.html#%E7%AE%80%E4%BB%8B |
||||
|
|
||||
|
emq将多个未确认的报文存放在飞行窗口(Inflight Window)中直至确认。 |
||||
|
|
||||
|
当报文超出限制(max_inflight)后续报文不再发送,存储在MessageQueue。 |
||||
|
|
||||
|
当客户端离线时,Message Queue 还会被用来存储 QoS 0 消息,这些消息将在客户端下次上线时被发送。这功能默认开启,当然你也可以手动关闭,见 `mqueue_store_qos0`。 |
||||
|
|
||||
|
需要注意的是,如果 Message Queue 也到达了长度限制,后续的报文将依然缓存到 Message Queue,但相应的 Message Queue 中最先缓存的消息将被丢弃。如果队列中存在 QoS 0 消息,那么将优先丢弃 QoS 0 消息。因此,根据你的实际情况配置一个合适的 Message Queue 长度限制(见 `max_mqueue_len`)是非常重要的。 |
||||
|
|
||||
|
| 配置项 | 类型 | 可取值 | 默认值 | 说明 | |
||||
|
| ----------------- | ------- | --------------- | ------------------------------------- | ------------------------------------------------------ | |
||||
|
| max_inflight | integer | >= 0 | 32 *(external)*, 128 *(internal)* | Inflight Window 长度限制,0 即无限制 | |
||||
|
| max_mqueue_len | integer | >= 0 | 1000 *(external)*, 10000 *(internal)* | Message Queue 长度限制,0 即无限制 | |
||||
|
| mqueue_store_qos0 | enum | `true`, `false` | true | 客户端离线时 EMQ X 是否存储 QoS 0 消息至 Message Queue | |
||||
|
|
||||
|
生产环境EMQ X配置如下: |
||||
|
|
||||
|
![image-20210531145227342](imgs/MQTT数据接收进程问题排查(之二)/image-20210531145227342.png) |
||||
|
|
||||
|
```shell |
||||
|
## Maximum queue length. Enqueued messages when persistent client disconnected, |
||||
|
## or inflight window is full. 0 means no limit. |
||||
|
## |
||||
|
## Value: Number >= 0 |
||||
|
zone.external.max_mqueue_len = 0 |
||||
|
# 改成10000 |
||||
|
``` |
||||
|
|
@ -0,0 +1,105 @@ |
|||||
|
# 通过共享订阅实现MQTT接收横向扩展 |
||||
|
|
||||
|
> 引言: 安心云的Receiver进程(MQTT->Kafka消息管道),运行一段时间后出现异常退出。虽做了重连/重启保护机制,但是还是会造成一部分数据丢失。 |
||||
|
|
||||
|
## 问题排查 |
||||
|
|
||||
|
MQTT重连后,订阅消息时报错 (另外收到一条信息) |
||||
|
|
||||
|
```shell |
||||
|
SEVERE: receiver.production20429120721: Timed out as no activity, keepAlive=60,000,000,000 lastOutboundActivity=8,545,217,349,987,626 lastInboundActivity=8,545,157,349,155,183 time=8,545,277,349,973,460 lastPing=8,545,217,349,991,742 |
||||
|
``` |
||||
|
|
||||
|
这里有类似问题的描述,作者通过抓包发现Java程序给emqx服务发送TCP ZeroWindow,告诉服务自己的接收buff已满,不要再发送数据。同理心跳包也不能发送了,emqx服务收不到心跳包,认为客户端已不存活,会主动断连。 |
||||
|
|
||||
|
https://blog.csdn.net/u012134942/article/details/103965155 |
||||
|
显然是因为emqx服务发送数据快,程序处理数据慢。审查代码才发现业务数据已经多线程处理,但emqx客户端的上下线消息并没有多线程处理,处理速度慢,导致tcp连接接收buffer被占满。 |
||||
|
|
||||
|
结论: |
||||
|
|
||||
|
**数据处理效率不足,接收缓冲满导致。** |
||||
|
|
||||
|
## [EMQX共享订阅](https://docs.emqx.cn/broker/v4.3/advanced/shared-subscriptions.html#%E5%B8%A6%E7%BE%A4%E7%BB%84%E7%9A%84%E5%85%B1%E4%BA%AB%E8%AE%A2%E9%98%85) |
||||
|
|
||||
|
共享订阅是在多个订阅者之间实现负载均衡的订阅方式: |
||||
|
|
||||
|
### 带群组的共享订阅 |
||||
|
|
||||
|
```bash |
||||
|
[subscriber1] got msg1 |
||||
|
msg1, msg2, msg3 / |
||||
|
[publisher] ----------------> "$share/g/topic" -- [subscriber2] got msg2 |
||||
|
\ |
||||
|
[subscriber3] got msg3 |
||||
|
``` |
||||
|
|
||||
|
上图中,共享 3 个 subscriber 用共享订阅的方式订阅了同一个主题 `$share/g/topic`,其中`topic` 是它们订阅的真实主题名,而 `$share/g/` 是共享订阅前缀。g是group名称,类似kafka的group.id。 |
||||
|
|
||||
|
### 不带群组的共享订阅 |
||||
|
|
||||
|
以 `$queue/` 为前缀的共享订阅是不带群组的共享订阅。它是 `$share` 订阅的一种特例,相当与所有订阅者都在一个订阅组里面: |
||||
|
|
||||
|
```bash |
||||
|
[s1] got msg1 |
||||
|
msg1,msg2,msg3 / |
||||
|
[emqx] ---------------> "$queue/topic" - [s2] got msg2 |
||||
|
\ |
||||
|
[s3] got msg3 |
||||
|
``` |
||||
|
|
||||
|
### 均衡策略与派发 Ack 配置 |
||||
|
|
||||
|
EMQ X 的共享订阅支持均衡策略与派发 Ack 配置: |
||||
|
|
||||
|
```bash |
||||
|
# etc/emqx.conf |
||||
|
|
||||
|
# 均衡策略 |
||||
|
broker.shared_subscription_strategy = random |
||||
|
|
||||
|
# 适用于 QoS1 QoS2 消息,启用时在其中一个组离线时,将派发给另一个组 |
||||
|
broker.shared_dispatch_ack_enabled = false |
||||
|
``` |
||||
|
| 均衡策略 | 描述 | |
||||
|
| :---------- | :--------------------------- | |
||||
|
| random | 在所有订阅者中随机选择 | |
||||
|
| round_robin | 按照订阅顺序 | |
||||
|
| sticky | 一直发往上次选取的订阅者 | |
||||
|
| hash | 按照发布者 ClientID 的哈希值 | |
||||
|
|
||||
|
|
||||
|
|
||||
|
## 应用修改 |
||||
|
|
||||
|
### 修改订阅的topic |
||||
|
|
||||
|
直接修改配置文件中的topic |
||||
|
|
||||
|
```properties |
||||
|
# 修改前 |
||||
|
topics=${topic.perfix}_data |
||||
|
# 修改后 |
||||
|
topics=$queue/${topic.perfix}_data |
||||
|
``` |
||||
|
|
||||
|
|
||||
|
|
||||
|
### 修改client.id |
||||
|
|
||||
|
多实例运行,需保证各MQTT客户端实例的ID不一样。 |
||||
|
|
||||
|
```java |
||||
|
String clientId = System.getenv("HOSTNAME"); // 直接取k8s的容器实例的名称作为MQTT Client.Id |
||||
|
if (clientId == null) { |
||||
|
clientId = this.props.getProperty("client.id") + UUID.randomUUID(); |
||||
|
} |
||||
|
logger.info("mqtt client id:" + clientId); |
||||
|
``` |
||||
|
|
||||
|
|
||||
|
|
||||
|
### k8s部署 |
||||
|
|
||||
|
直接增加副本数。后面可以通过KUBESPHERE的弹性伸缩配置,按需增减实例数。 |
||||
|
|
||||
|
![image-20210524161628975](imgs/通过共享订阅实现MQTT接收横向扩展/image-20210524161628975.png) |
Loading…
Reference in new issue