需求概述
主要實現一個流程:
1.采集Modbus電表數據上傳到MQTT指定topic
2.如果傳感器未響應需要發(fā)送報警信息到指定topic
3.MQTT連接成功時發(fā)送注冊信息
4.周期推送心跳到指定topic
5.云端下發(fā)控制EG8200繼電器并回復
對接資料
1.南向接口 RS485對接協(xié)議(標準modbus)
點位 | 字段 | 寄存器地址 | 解析方式 |
A相電壓 | voltageA | 40001~40002 | float32 ABCD |
B相電壓 | voltageB | 40003~40004 | float32 ABCD |
C相電壓 | voltageC | 40005~40006 | float32 ABCD |
A相電壓 | currentA | 40007~40008 | float32 ABCD |
B相電壓 | currentB | 40009~40010 | float32 ABCD |
C相電壓 | currentC | 40011~40012 | float32 ABCD |
2.北向接口協(xié)議說明:
MQTT連接信息 | |||
Broker | 139.129.229.113 | ||
Clientid | TestClient | ||
Username | 82000000305E144F | ||
Password | EG12345678 | ||
Topic報文格式 | |||
功能 | Topic | 數據流向 | 報文示例 |
信息注冊 | data/sg/${sn}/info | 網關->平臺 |
{ "sn":"82000000305E144F", "time":"2023-01-01 12:00:00" } |
數據上報 | data/sg/${sn}/report | 網關->平臺 |
{ "sn":"82000000305E144F", "data":{ "voltageA":0, "voltageB":0, "voltageC":0, "currentA":0, "currentB":0, "currentC":0 }, time:"2023-01-01 12:00:00" } |
心跳 | data/sg/${sn}/heart | 網關->平臺 |
{ "sn":"82000000305E144F", "data":{}, "message":"heart", "time":"2023-01-01 12:00:00" } |
報警 | data/sg/${sn}/warn | 網關->平臺 |
{ "sn":"82000000305E144F", "data":{}, "message":"offline", "time":"2023-01-01 12:00:00" } |
平臺控制 | /data/sg/${sn}/request | 平臺->網關 |
{ "event_id":"HsUCigC4Jk", "data": { "parameter": "DO1", "value": 0 } } |
控制回復 | data/sg/${sn}/response | 網關->平臺 |
{ "event_id":"HsUCigC4Jk", "message":"OK", "data":{}, "time":"2023-01-01 12:00:00" } |
需求分析
注冊與連接工作
a.建立MQTT連接,發(fā)布到注冊主題(MQTT發(fā)布節(jié)點)注:MQTT訂閱發(fā)布節(jié)點連接信息共享,其他MQTT無需在配置連接參數,選擇第一次配置的連接參數即可
b.監(jiān)聽MQTT連接狀態(tài)(狀態(tài)變化節(jié)點)
c.封裝注冊信息(函數節(jié)點)
數據上報
a.modbus讀取電表數據(modbus讀節(jié)點)
b.判斷傳感器是否有回復并封裝上報報文(函數節(jié)點)
c.配置對應的發(fā)布主題(MQTT發(fā)布節(jié)點)
心跳推送
a.注入節(jié)點周期觸發(fā)(注入節(jié)點)
b.封裝心跳報文(函數節(jié)點)
c.MQTT發(fā)布數據(MQTT發(fā)布節(jié)點)
下發(fā)控制
a.MQTT訂閱云端下發(fā)主題(MQTT訂閱節(jié)點)
b.解析云端數據,并控制對應DO(函數節(jié)點)
c.響應MQTT控制情況(函數節(jié)點)
d.上報響應報文(MQTT發(fā)布)
實現流程框架
需求實現
1.監(jiān)聽MQTT狀態(tài)并發(fā)布注冊信息
a.從節(jié)點庫拖出一個MQTT發(fā)布節(jié)點,函數節(jié)點,調試節(jié)點,狀態(tài)監(jiān)測節(jié)點和條件判斷節(jié)點,MQTT發(fā)布用于消息發(fā)布,函數節(jié)點用于注冊信息封裝,狀態(tài)檢測節(jié)點用于檢測MQTT的連接狀態(tài),調試節(jié)點用于打印發(fā)布的報文
b.從上可以看見日志打印了注冊報文,以及MQTTx端以及接收到了注冊信息
c.注冊流程使用的代碼塊:
const fmt = dateFormat("YYYY-mm-dd HH:MM:SS", new Date()) let obj = { "sn": "82000000305E144F", "time": fmt } function dateFormat(fmt, timestamp) { let ret; const opt = { "Y+": timestamp.getFullYear().toString(), // 年 "m+": (timestamp.getMonth() + 1).toString(), // 月 "d+": timestamp.getDate().toString(), // 日 "H+": timestamp.getHours().toString(), // 時 "M+": timestamp.getMinutes().toString(), // 分 "S+": timestamp.getSeconds().toString() // 秒 // 有其他格式化字符需求可以繼續(xù)添加,必須轉化成字符串 }; for (let k in opt) { ret = new RegExp("(" + k + ")").exec(fmt); if (ret) { fmt = fmt.replace(ret[1], (ret[1].length == 1) ? (opt[k]) : (opt[k].padStart(ret[1].length, "0"))) }; }; return fmt; } msg.payload = JSON.stringify(obj) return msg
2.數據上報
a.從節(jié)點庫拖出一個注入節(jié)點,函數節(jié)點和一個MQTT發(fā)布節(jié)點,注入節(jié)點用于周期觸發(fā),函數節(jié)點用于封裝心跳報文,MQTT發(fā)布節(jié)點用于心跳發(fā)布
b.從上面可以看到日志窗口打印了數據報文,MQTTx也接收到了上報數據
c.數據上報流程使用的代碼塊:
function dateFormat(fmt, timestamp) { let ret; const opt = { "Y+": timestamp.getFullYear().toString(), // 年 "m+": (timestamp.getMonth() + 1).toString(), // 月 "d+": timestamp.getDate().toString(), // 日 "H+": timestamp.getHours().toString(), // 時 "M+": timestamp.getMinutes().toString(), // 分 "S+": timestamp.getSeconds().toString() // 秒 // 有其他格式化字符需求可以繼續(xù)添加,必須轉化成字符串 }; for (let k in opt) { ret = new RegExp("(" + k + ")").exec(fmt); if (ret) { fmt = fmt.replace(ret[1], (ret[1].length == 1) ? (opt[k]) : (opt[k].padStart(ret[1].length, "0"))) }; }; return fmt; } var flag = msg.status var data = msg.payload var payload const fmt = dateFormat("YYYY-mm-dd HH:MM:SS", new Date()) if (flag == "TIMEOUT") { payload = { "sn": "82000000305E144F", "data": {}, "message": "offline", "time": fmt } msg.payload = JSON.stringify(payload) return [msg, null] } else if (flag == "OK") { payload = { sn: "02C00081275A574E", data, time: fmt } msg.payload = JSON.stringify(payload) return [null, msg] }
3.心跳推送
a.從節(jié)點庫拿出一個注入節(jié)點,函數節(jié)點,MQTT發(fā)布節(jié)點。注入節(jié)點用于周期發(fā)發(fā)送心跳,函數節(jié)點用于封裝心跳報文,MQTT發(fā)布節(jié)點用于發(fā)布到MQTT
b.從上面可以看到日志窗口打印了心跳報文,MQTTx也接收到了心跳報
心跳上報流程使用的代碼塊:
const fmt = dateFormat("YYYY-mm-dd HH:MM:SS", new Date()) function dateFormat(fmt, timestamp) { let ret; const opt = { "Y+": timestamp.getFullYear().toString(), // 年 "m+": (timestamp.getMonth() + 1).toString(), // 月 "d+": timestamp.getDate().toString(), // 日 "H+": timestamp.getHours().toString(), // 時 "M+": timestamp.getMinutes().toString(), // 分 "S+": timestamp.getSeconds().toString() // 秒 // 有其他格式化字符需求可以繼續(xù)添加,必須轉化成字符串 }; for (let k in opt) { ret = new RegExp("(" + k + ")").exec(fmt); if (ret) { fmt = fmt.replace(ret[1], (ret[1].length == 1) ? (opt[k]) : (opt[k].padStart(ret[1].length, "0"))) }; }; return fmt; } let obj = { "sn": "82000000305E144F", "data": {}, "message": "heart", "time": fmt } msg.payload = JSON.stringify(obj) return msg;
4.下發(fā)控制及響應
a.從節(jié)點庫拿出一個MQTT訂閱節(jié)點,MQTT發(fā)布節(jié)點,函數節(jié)點和DO節(jié)點,MQTT訂閱節(jié)點和MQTT發(fā)布節(jié)點用于接收云端發(fā)來的數據和控制響應,函數節(jié)點用于解析下發(fā)的報文和控制的響應報文封裝
b.從上面可以看到日志窗口打印了報文,MQTTx也接收到了控制響應
c.控制流程使用的代碼塊
//云端下發(fā)解析 //############################ if (msg.payload.length < 10) { return } if (typeof (msg.payload) == "object") { var obj = msg.payload //信息獲取 let event_id = obj.event_id; global.set("event_id", event_id) let parameter = obj.data.parameter let value = obj.data.value let arr = [] switch (parameter) { case "DO1": arr = value break case "DO2": arr = value break } msg.payload = arr return msg; } //############################ //反饋響應封裝 //############################ let event_id = global.get("event_id") const fmt = dateFormat("YYYY-mm-dd HH:MM:SS", new Date()) function dateFormat(fmt, timestamp) { let ret; const opt = { "Y+": timestamp.getFullYear().toString(), // 年 "m+": (timestamp.getMonth() + 1).toString(), // 月 "d+": timestamp.getDate().toString(), // 日 "H+": timestamp.getHours().toString(), // 時 "M+": timestamp.getMinutes().toString(), // 分 "S+": timestamp.getSeconds().toString() // 秒 // 有其他格式化字符需求可以繼續(xù)添加,必須轉化成字符串 }; for (let k in opt) { ret = new RegExp("(" + k + ")").exec(fmt); if (ret) { fmt = fmt.replace(ret[1], (ret[1].length == 1) ? (opt[k]) : (opt[k].padStart(ret[1].length, "0"))) }; }; return fmt; } let obj = { "event_id": event_id, "message": "OK", "data": {}, "time": fmt } msg.payload = JSON.stringify(obj); return msg; //############################
通過以上步驟,你可以在邊緣網關上成功實現 Modbus 協(xié)議到 MQTT 協(xié)議的轉換。這種集成方式不僅簡化了工業(yè)設備與物聯(lián)網平臺的通信,還增強了數據的實時性和可靠性。希望本文能為你提供有用的指導,助你更好地利用 Modbus 和 MQTT 技術
審核編輯 黃宇
-
MODBUS
+關注
關注
28文章
1762瀏覽量
76843 -
網關
+關注
關注
9文章
4308瀏覽量
50957 -
MQTT
+關注
關注
5文章
649瀏覽量
22435
發(fā)布評論請先 登錄
相關推薦
評論