feat(mqtt): 更新 MQTT 主题并优化消息处理
- 将 MQTT 主题从 "/v1/devices/+/datas" 更改为 "/r1/devices/+/datas" -启用按字节接收消息并进行相应处理 - 更新 RecordData 实体,移除未使用的 attachmentId 字段
This commit is contained in:
parent
153bd8cd40
commit
6e7b557c3d
@ -26,8 +26,6 @@ public class RecordData {
|
|||||||
|
|
||||||
private String clientId;
|
private String clientId;
|
||||||
|
|
||||||
private String attachmentId;
|
|
||||||
|
|
||||||
@TableField(typeHandler = JsonbTypeHandler.class)
|
@TableField(typeHandler = JsonbTypeHandler.class)
|
||||||
private Map<String, Object> content;
|
private Map<String, Object> content;
|
||||||
|
|
||||||
|
@ -25,11 +25,6 @@ import org.springframework.messaging.MessageChannel;
|
|||||||
import org.springframework.messaging.MessageHandler;
|
import org.springframework.messaging.MessageHandler;
|
||||||
import org.springframework.util.StringUtils;
|
import org.springframework.util.StringUtils;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.text.ParseException;
|
|
||||||
import java.text.SimpleDateFormat;
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.TimeZone;
|
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ -108,7 +103,7 @@ public class MqttConfig {
|
|||||||
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
|
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
|
||||||
"clientId-" + UUID.randomUUID(),
|
"clientId-" + UUID.randomUUID(),
|
||||||
mqttPahoClientFactory(),
|
mqttPahoClientFactory(),
|
||||||
"/v1/devices/+/datas"
|
"/r1/devices/+/datas"
|
||||||
);
|
);
|
||||||
adapter.setCompletionTimeout(5000);
|
adapter.setCompletionTimeout(5000);
|
||||||
|
|
||||||
@ -116,7 +111,7 @@ public class MqttConfig {
|
|||||||
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
|
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
|
||||||
|
|
||||||
// 按字节接收消息
|
// 按字节接收消息
|
||||||
// defaultPahoMessageConverter.setPayloadAsBytes(true);
|
defaultPahoMessageConverter.setPayloadAsBytes(true);
|
||||||
adapter.setConverter(defaultPahoMessageConverter);
|
adapter.setConverter(defaultPahoMessageConverter);
|
||||||
adapter.setQos(qos);
|
adapter.setQos(qos);
|
||||||
adapter.setOutputChannel(mqttInputChannel());
|
adapter.setOutputChannel(mqttInputChannel());
|
||||||
@ -136,8 +131,9 @@ public class MqttConfig {
|
|||||||
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
|
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
|
||||||
String clientId = topic.split("/")[3];
|
String clientId = topic.split("/")[3];
|
||||||
String deviceGatherer = clientId.substring(0, 3);
|
String deviceGatherer = clientId.substring(0, 3);
|
||||||
if (topic.matches("/v1/devices/\\d{12}/datas") && deviceGatherer.equals(gatherer)) {
|
if (topic.matches("/r1/devices/\\d{12}/datas") && deviceGatherer.equals(gatherer)) {
|
||||||
JSONObject payload = JSONObject.parseObject(message.getPayload().toString());
|
String payloadString = new String((byte[]) message.getPayload());
|
||||||
|
JSONObject payload = JSONObject.parseObject(payloadString);
|
||||||
log.info("获取设备编码:" + clientId);
|
log.info("获取设备编码:" + clientId);
|
||||||
log.info("负载:" + payload);
|
log.info("负载:" + payload);
|
||||||
JSONObject data = (JSONObject) JSONPath.eval(payload, "$.devices[0].services[0].data");
|
JSONObject data = (JSONObject) JSONPath.eval(payload, "$.devices[0].services[0].data");
|
||||||
|
Loading…
Reference in New Issue
Block a user