|
|
@@ -1,121 +1,180 @@
|
|
|
-<!--
|
|
|
- * @Description: mqtt连接和消息发送组件 页面
|
|
|
- * @Author: mhf
|
|
|
- * @Date: 2024-05-25 00:49:23
|
|
|
- * @params:
|
|
|
--->
|
|
|
+<!-- MqttComp.vue -->
|
|
|
<template>
|
|
|
- <div></div>
|
|
|
+ <div class="mqtt-comp" style="display: none"></div>
|
|
|
</template>
|
|
|
|
|
|
<script>
|
|
|
-import mqtt from "mqtt";
|
|
|
+import mqtt from "mqtt/dist/mqtt.min.js";
|
|
|
|
|
|
export default {
|
|
|
- name: "mqttComp",
|
|
|
+ name: "MqttComp",
|
|
|
props: {
|
|
|
- topic: String, // 订阅主题
|
|
|
mqttUrl: {
|
|
|
type: Object,
|
|
|
- default: () => {
|
|
|
- return {
|
|
|
- head: "ws", // 必须是 ws 或 wss (mqtt:// 或 mqtts:// 必须让后端开放websocket服务)
|
|
|
- host: "8.148.78.124", // ip地址
|
|
|
- port: 8083, // 服务端口
|
|
|
- tailPath: "mqtt", // 服务路径
|
|
|
- };
|
|
|
- },
|
|
|
- }, // 服务地址
|
|
|
+ default: () => ({
|
|
|
+ head: "ws", // ws 或 wss
|
|
|
+ host: "8.148.78.124",
|
|
|
+ port: 8083,
|
|
|
+ path: "/mqtt",
|
|
|
+ }),
|
|
|
+ },
|
|
|
mqttOpts: {
|
|
|
type: Object,
|
|
|
- default: () => {
|
|
|
- return {
|
|
|
- keepalive: 60,
|
|
|
- clientId: "clientId-" + Math.random().toString(16).substr(2, 8),
|
|
|
- username: "username",
|
|
|
- password: "password",
|
|
|
- connectTimeout: 10 * 1000,
|
|
|
- path: "/mqtt"
|
|
|
- };
|
|
|
- },
|
|
|
- }, // 连接配置
|
|
|
+ default: () => ({
|
|
|
+ keepalive: 60,
|
|
|
+ clientId: "clientId-" + Math.random().toString(16).substr(2, 8),
|
|
|
+ username: "",
|
|
|
+ password: "",
|
|
|
+ clean: true,
|
|
|
+ connectTimeout: 10 * 1000,
|
|
|
+ reconnectPeriod: 2000,
|
|
|
+ }),
|
|
|
+ },
|
|
|
+ topics: {
|
|
|
+ type: Array,
|
|
|
+ default: () => [], // 可一次性传入多个 topic
|
|
|
+ },
|
|
|
},
|
|
|
data() {
|
|
|
return {
|
|
|
- client: "",
|
|
|
- clientCount: 0,
|
|
|
- receivedMessage: null, // 用于存储接收到的消息
|
|
|
+ client: null,
|
|
|
+ isConnected: false,
|
|
|
+ subscribedTopics: [], // 内部管理的已订阅 topic
|
|
|
};
|
|
|
},
|
|
|
watch: {
|
|
|
- topic(newTopic) {
|
|
|
- if (newTopic && this.client) {
|
|
|
- this.client.unsubscribe(this.topic);
|
|
|
- this.client.subscribe(newTopic);
|
|
|
- }
|
|
|
+ topics: {
|
|
|
+ handler(newTopics) {
|
|
|
+ if (!this.client || !this.isConnected) return;
|
|
|
+
|
|
|
+ // 取消订阅已经移除的 topic
|
|
|
+ this.subscribedTopics.forEach((t) => {
|
|
|
+ if (!newTopics.includes(t)) {
|
|
|
+ this.unsubscribeTopic(t);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ // 订阅新增 topic
|
|
|
+ newTopics.forEach((t) => {
|
|
|
+ if (!this.subscribedTopics.includes(t)) {
|
|
|
+ this.subscribeTopic(t);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ },
|
|
|
+ deep: true,
|
|
|
+ immediate: true,
|
|
|
},
|
|
|
},
|
|
|
methods: {
|
|
|
- async initMqtt() {
|
|
|
- let opts = JSON.parse(JSON.stringify(this.mqttOpts));
|
|
|
- this.client = mqtt.connect(
|
|
|
- `${this.mqttUrl.head}://${this.mqttUrl.host}:${this.mqttUrl.port}/${this.mqttUrl.tailPath}`,
|
|
|
- opts
|
|
|
- );
|
|
|
- this.client.on("connect", this.handleConnect);
|
|
|
- this.client.on("message", this.handleMessage);
|
|
|
- this.client.on("reconnect", this.handleReconnect);
|
|
|
- this.client.on("error", this.handleError);
|
|
|
- },
|
|
|
+ // 初始化 MQTT 连接
|
|
|
+ initMqtt() {
|
|
|
+ const { head, host, port, path } = this.mqttUrl;
|
|
|
+ const connectUrl = `${head}://${host}:${port}${path || ""}`;
|
|
|
+ console.log("MQTT连接地址:", connectUrl);
|
|
|
|
|
|
- handleConnect() {
|
|
|
- console.log("mqtt_连接成功");
|
|
|
- this.client.subscribe(this.topic);
|
|
|
- },
|
|
|
+ this.client = mqtt.connect(connectUrl, this.mqttOpts);
|
|
|
+
|
|
|
+ this.client.on("connect", () => {
|
|
|
+ this.isConnected = true;
|
|
|
+ console.log("✅ MQTT 已连接");
|
|
|
+ this.$emit("mqtt-connected");
|
|
|
+
|
|
|
+ // 自动订阅传入的 topic
|
|
|
+ this.topics.forEach((topic) => this.subscribeTopic(topic));
|
|
|
+ });
|
|
|
+
|
|
|
+ this.client.on("message", (topic, message) => {
|
|
|
+ let msg;
|
|
|
+ try {
|
|
|
+ msg = JSON.parse(message.toString());
|
|
|
+ } catch {
|
|
|
+ msg = message.toString();
|
|
|
+ }
|
|
|
+ this.$emit("message-received", { topic, message: msg });
|
|
|
+ });
|
|
|
|
|
|
- handleMessage(topic, message) {
|
|
|
- this.receivedMessage = JSON.parse(message?.toString() || {});
|
|
|
- this.$emit("message-received", this.receivedMessage);
|
|
|
+ this.client.on("reconnect", () => {
|
|
|
+ console.log("♻️ MQTT 正在重连...");
|
|
|
+ this.$emit("mqtt-reconnect");
|
|
|
+ });
|
|
|
+
|
|
|
+ this.client.on("error", (err) => {
|
|
|
+ console.error("❌ MQTT连接失败:", err);
|
|
|
+ this.$emit("mqtt-error", err);
|
|
|
+ });
|
|
|
+
|
|
|
+ this.client.on("close", () => {
|
|
|
+ this.isConnected = false;
|
|
|
+ console.log("🔌 MQTT连接已关闭");
|
|
|
+ this.$emit("mqtt-close");
|
|
|
+ });
|
|
|
},
|
|
|
|
|
|
- handleReconnect(error) {
|
|
|
- console.log(`正在第${this.clientCount}重连`, error);
|
|
|
- this.clientCount++;
|
|
|
- if (this.clientCount >= 10) {
|
|
|
- this.client.end();
|
|
|
+ // 订阅单个 topic
|
|
|
+ subscribeTopic(topic) {
|
|
|
+ if (!this.client || !this.isConnected) {
|
|
|
+ console.warn("MQTT 未连接,无法订阅:", topic);
|
|
|
+ return;
|
|
|
}
|
|
|
+ if (!topic || this.subscribedTopics.includes(topic)) return;
|
|
|
+
|
|
|
+ this.client.subscribe(topic, (err) => {
|
|
|
+ if (!err) {
|
|
|
+ console.log("订阅成功:", topic);
|
|
|
+ this.subscribedTopics.push(topic);
|
|
|
+ this.$emit("topic-subscribed", topic);
|
|
|
+ }
|
|
|
+ });
|
|
|
},
|
|
|
|
|
|
- handleError(error) {
|
|
|
- console.log("连接失败", error);
|
|
|
+ // 取消订阅单个 topic
|
|
|
+ unsubscribeTopic(topic) {
|
|
|
+ if (!this.client || !this.isConnected) return;
|
|
|
+ if (!topic || !this.subscribedTopics.includes(topic)) return;
|
|
|
+
|
|
|
+ this.client.unsubscribe(topic, (err) => {
|
|
|
+ if (!err) {
|
|
|
+ console.log("取消订阅:", topic);
|
|
|
+ this.subscribedTopics = this.subscribedTopics.filter((t) => t !== topic);
|
|
|
+ this.$emit("topic-unsubscribed", topic);
|
|
|
+ }
|
|
|
+ });
|
|
|
},
|
|
|
|
|
|
- /**
|
|
|
- * MQTT实现发送消息
|
|
|
- * @param: topic: 主题
|
|
|
- * @param: message: 消息体
|
|
|
- * @author: mhf
|
|
|
- * @time: 2024-05-24 14:26:51
|
|
|
- **/
|
|
|
- mqttPublish(topic, message) {
|
|
|
- this.client.publish(topic, JSON.stringify(message));
|
|
|
+ // 发布消息
|
|
|
+ publish(topic, message) {
|
|
|
+ if (!this.client || !this.isConnected) {
|
|
|
+ console.warn("MQTT 未连接,消息未发送:", topic);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ console.log("sdfdsf",typeof message === "object");
|
|
|
+
|
|
|
+ const payload = typeof message === "object" ? JSON.stringify(message) : String(message);
|
|
|
+ this.client.publish(topic, payload);
|
|
|
+ },
|
|
|
+
|
|
|
+ // 断开连接,取消所有订阅
|
|
|
+ disconnect() {
|
|
|
+ if (this.client) {
|
|
|
+ // 先取消所有订阅
|
|
|
+ this.subscribedTopics.forEach((t) => this.unsubscribeTopic(t));
|
|
|
+ this.client.end(true, () => {
|
|
|
+ console.log("🔌 MQTT手动断开成功");
|
|
|
+ });
|
|
|
+ }
|
|
|
},
|
|
|
},
|
|
|
- async mounted() {
|
|
|
- await this.initMqtt();
|
|
|
+ mounted() {
|
|
|
+ this.initMqtt();
|
|
|
},
|
|
|
-
|
|
|
- beforeDestroy() {
|
|
|
- this.$emit("mqtt-close") // 关闭mqtt链接需要的前置操作
|
|
|
- // 使用延迟确保消息发送完成后再关闭连接
|
|
|
- setTimeout(() => {
|
|
|
- this.client.end(true, {}, () => {
|
|
|
- console.log("MQTT连接已成功关闭");
|
|
|
- });
|
|
|
- }, 100); // 延迟时间根据实际情况调整,确保消息发送完成
|
|
|
- // this.client.end();
|
|
|
+ beforeDestroy() {
|
|
|
+ this.disconnect();
|
|
|
},
|
|
|
};
|
|
|
</script>
|
|
|
|
|
|
-<style lang="scss" scoped></style>
|
|
|
+<style scoped>
|
|
|
+.mqtt-comp {
|
|
|
+ display: none;
|
|
|
+}
|
|
|
+</style>
|