我們是通訊行業(yè)動(dòng)力維護(hù)專業(yè),因?yàn)橥馐须姡ㄈ嘟涣麟姡┑牟▌?dòng)會(huì)對(duì)我們的設(shè)備造成一定的影響。于是想做一個(gè)交流電監(jiān)測(cè)的應(yīng)用,監(jiān)測(cè)交流電的過壓或欠壓,所以我做了這么一個(gè)實(shí)時(shí)監(jiān)測(cè)的工具。它可以在過壓和欠壓一定程度的時(shí)候告警。或者進(jìn)行長(zhǎng)期監(jiān)測(cè)并記錄電壓數(shù)據(jù),分析可能發(fā)生過壓或欠壓的原因。
樹莓派作為一個(gè)簡(jiǎn)單易用功能強(qiáng)大的計(jì)算平臺(tái),很好地支持 Python。而且剛好有 MCC 118 數(shù)據(jù)采集模塊(HAT)可以直接擴(kuò)展出數(shù)據(jù)采集功能,MCC 118 提供 8 通道模擬電壓輸入,基于樹莓派的數(shù)據(jù)采集/數(shù)據(jù)記錄系統(tǒng)。每張 MCC 118 最大采樣率為 100kS/s,可以進(jìn)行單電壓點(diǎn)和電壓波形采集。
項(xiàng)目的代碼就是在 MCC DAQ HATs 提供的 SDK 示例代碼中,continuous_scan.py 的基礎(chǔ)上修改來的。實(shí)現(xiàn)一個(gè)高速的采集及告警設(shè)備,來監(jiān)測(cè)外市電的波動(dòng)情況。
長(zhǎng)期監(jiān)測(cè)主要通過長(zhǎng)期數(shù)據(jù)記錄,分析是否有未超出告警范圍的波動(dòng),以分析供電質(zhì)量。
這只是一個(gè)工程樣機(jī),可以進(jìn)行實(shí)時(shí)(延時(shí)3秒+)告警和儲(chǔ)存發(fā)生告警時(shí)的電壓波形;可以進(jìn)行長(zhǎng)期監(jiān)測(cè)(不實(shí)時(shí)告警)、也可以長(zhǎng)期監(jiān)測(cè)并實(shí)時(shí)告警。
內(nèi)部構(gòu)造如下圖,其中 MCC 118 在圖中正上方。
警告分為4種:本機(jī)的光警告、本機(jī)的聲警告、繼電器輸出的外部警告、物聯(lián)網(wǎng)平臺(tái)(OneNet)警告。
目前支持兩路三相交流電監(jiān)測(cè),每路三相電的 A、B、C 相分別用:黃、綠、紅,發(fā)光二極管警告。
物聯(lián)網(wǎng)平臺(tái)的警告頁(yè)面。分別是:一路三相電的一相警告,以及一路三相電的三相警告。
采集到的正常交流電波形如下圖。
數(shù)據(jù)分析部分是用 NumPy 進(jìn)行交流電每個(gè)周期的最大值、最小值判斷,然后確定是否報(bào)警,是否寫入硬盤。
還有一個(gè)程序每一做任何判斷,把采集到的所有數(shù)據(jù)都直接記錄到硬盤,大概一小時(shí)會(huì)生成 1.2G 的數(shù)據(jù)。
也是用 Python 寫的腳本,通過 NumPy 把數(shù)據(jù)做一個(gè)轉(zhuǎn)換 ,然后用 NumPy 的 amax 和 amin 對(duì)數(shù)值進(jìn)行分析判斷。如果數(shù)據(jù)超過閾值,就寫硬盤文件,同時(shí)用 GPIO 輸出控制 LED 來發(fā)出預(yù)警信號(hào)。
以下是完整代碼,其中關(guān)鍵代碼已經(jīng)添加了注釋。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
MCC 118 Functions Demonstrated:
mcc118.a_in_scan_start
mcc118.a_in_scan_read
mcc118.a_in_scan_stop
mcc118.a_in_scan_cleanup
Purpose:
Perform a continuous acquisition on 1 or more channels.
Description:
Continuously acquires blocks of analog input data for a
user-specified group of channels until the acquisition is
stopped by the user. The last sample of data for each channel
is displayed for each block of data received from the device.
MCC118共8個(gè)通道,可以監(jiān)測(cè)兩個(gè)三相變壓器的6路市電,分別為變壓器1的A、B、C相和變壓器2的A、B、C相
"""
from __future__ import print_function
from daqhats import mcc118, OptionFlags, HatIDs, HatError
from daqhats_utils import select_hat_device, enum_mask_to_string, \
chan_list_to_mask
import os
import threading
import numpy as np
import datetime
import time
from socket import *
import RPi.GPIO as GPIO
# ===GPIO setup=====================================================================
GPIO.setmode(GPIO.BCM)
led_list=[5,6,19,16,20,21] #LED使用的GPIO
GPIO.setup(led_list, GPIO.OUT, initial=GPIO.LOW)
delta = datetime.timedelta(minutes=5)
# ===================================================================================
READ_ALL_AVAILABLE = -1
CURSOR_BACK_2 = '\x1b[2D's
ERASE_TO_END_OF_LINE = '\x1b[0K'
# ===================================================================================
arraydata=[[0 for i in range(72000)] for h in range(10)] #定義一個(gè)采樣緩存數(shù)組,72000為read_request_size*信道數(shù),10為#連續(xù)采樣的次數(shù)c_time
# =======OneNet param===================================================================
device_id = "Your 設(shè)備ID" # Your 設(shè)備ID
api_key = "Your API_KEY" # Your API_KEY
HOST = 'api.heclouds.com'
PORT = 80
BUFSIZ = 1024
ADDR = (HOST, PORT)
# =======Channel set==================================================================
# Store the channels in a list and convert the list to a channel mask that
# can be passed as a parameter to the MCC 118 functions.
channels = [0, 1, 2, 3, 4, 5]
channel_mask = chan_list_to_mask(channels)
num_channels = len(channels)
samples_per_channel = 0
options = OptionFlags.CONTINUOUS
scan_rate = 10000.0 #采樣速率 最大值為100k/信道數(shù)
read_request_size = 12000 #每通道此次采樣的點(diǎn)數(shù)
c_time = 10 #連續(xù)采樣的次數(shù)
timeout = 5.0
# =======上下限=================================================================
volt=220 #根據(jù)需要設(shè)定標(biāo)準(zhǔn)電壓
thread=0.2 #根據(jù)需要設(shè)定
upline=volt*(1+thread)*1.414*0.013558 #0.013558是根據(jù)電阻分壓確定的
downline=volt*(1-thread)*1.414*0.013558 #0.013558是根據(jù)電阻分壓確定的
# =======LED監(jiān)測(cè)=================================================================
base_time=datetime.datetime.now()
#定義時(shí)間臨時(shí)數(shù)組
led_stime=[base_time,base_time,base_time,base_time,base_time,base_time]
#定義狀態(tài)臨時(shí)數(shù)組
led_status=[-1,-1,-1,-1,-1,-1]
#=====================================================================================
def led_detect():
global led_stime,led_status
#變壓器的A、B、C相
transno=["Trans1A","Trans1B","Trans1C","Trans2A","Trans2B","Trans2C"]
try:
#判斷每個(gè)LED的狀態(tài)
while PORT > 0 :
time.sleep(1)
if GPIO.input(5) == GPIO.HIGH and led_status[0]<0 : ??????
led_stime[0]=datetime.datetime.now()
led_status[0]=1
if GPIO.input(6) == GPIO.HIGH and led_status[1]<0 :
led_stime[1]=datetime.datetime.now()
led_status[1]=1
if GPIO.input(19) == GPIO.HIGH and led_status[2]<0 :
led_stime[2]=datetime.datetime.now()
led_status[2]=1
if GPIO.input(16) == GPIO.HIGH and led_status[3]<0 :
led_stime[3]=datetime.datetime.now()
led_status[3]=1
if GPIO.input(20) == GPIO.HIGH and led_status[4]<0 :
led_stime[4]=datetime.datetime.now()
led_status[4]=1
if GPIO.input(21) == GPIO.HIGH and led_status[5]<0 :
led_stime[5]=datetime.datetime.now()
led_status[5]=1
#相應(yīng)引腳延時(shí)5分鐘熄滅,并向平臺(tái)發(fā)送數(shù)據(jù)清除告警
for i in range(6):
now=datetime.datetime.now()
if now > ( led_stime[i] + delta ) and led_status[i]>=0 :
GPIO.output(led_list[i], GPIO.LOW)
led_status[i]=-1
t5 = threading.Thread(target=senddata, args=(transno[i],"0"))
t5.start()
except KeyboardInterrupt:
# Clear the '^C' from the display.
print(CURSOR_BACK_2, ERASE_TO_END_OF_LINE, '\n')
print('Stopping')
# ========senddata to OneNet==============================================================
def senddata(T_dev,alm):
Content = ""
Content += "{"datastreams":[{"id": ""+T_dev+"","datapoints": [{"value": " + alm + "}]}"
Content += "]}\r\n"
value = len(Content)
data = ""
data += "POST /devices/" + device_id + "/datapoints HTTP/1.1\r\n"
data += "Host:api.heclouds.com\r\n"
data += "Connection: close\r\n"
data += "api-key:" + api_key + "\r\n"
data += "Content-Length:" + str(value) + "\r\n"
data += "\r\n"
data += Content
tcpCliSock = socket(AF_INET, SOCK_STREAM)
tcpCliSock.connect(ADDR)
tcpCliSock.send(data.encode())
#data1 = tcpCliSock.recv(BUFSIZ).decode()
#print(data1)
# =====writefile==========================================================================
def writefile(arrayatt, fnat):
print("write file ", datetime.datetime.now())
np.savetxt(fnat, arrayatt.T, fmt="%1.2f", delimiter=" ")
print("\033[0;31m","finish write ", datetime.datetime.now(),"\033[0m")
#====Analyse and writefile and Alarm=======================================================
def analyse(array,fnat):
break_1_1 = -1
break_2_1 = -1
break_1_2 = -1
break_2_2 = -1
break_1_3 = -1
break_2_3 = -1
break_1 = -1
break_2 = -1
print("analyse file ", datetime.datetime.now())
#等于read_request_size = 12000
lll = read_request_size
file1 = "1-"+fnat
file2 = "2-"+fnat
a=np.array(array)
b = np.empty([3, c_time*read_request_size], dtype=float) # 變壓器1定義一個(gè)數(shù)組
c = np.empty([3, c_time*read_request_size], dtype=float) # 變壓器2定義一個(gè)數(shù)組
#板子的數(shù)據(jù)記錄格式是:CH0、CH1、CH2、CH3、CH4、CH5、CH0、CH1、CH2、CH3、CH4、CH5、CH0、CH1、CH2、CH3、CH4、CH5、CH0、CH1、CH2、CH3、CH4、CH5、CH0、CH1、CH2、CH3、CH4、CH5、
#通過下面拆分成6個(gè)CH每個(gè)CH一個(gè)list以便進(jìn)行分CH判斷
b[0] = np.concatenate((a[0, 0::6], a[1, 0::6], a[2, 0::6], a[3, 0::6], a[4, 0::6], a[5, 0::6], a[6, 0::6],a[7, 0::6], a[8, 0::6], a[9, 0::6])) #提取變壓器1,A相數(shù)據(jù)
b[1] = np.concatenate((a[0, 1::6], a[1, 1::6], a[2, 1::6], a[3, 1::6], a[4, 1::6], a[5, 1::6], a[6, 1::6],a[7, 1::6], a[8, 1::6], a[9, 1::6])) #提取變壓器1,B相數(shù)據(jù)
b[2] = np.concatenate((a[0, 2::6], a[1, 2::6], a[2, 2::6], a[3, 2::6], a[4, 2::6], a[5, 2::6], a[6, 2::6],a[7, 2::6], a[8, 2::6], a[9, 2::6])) #提取變壓器1,C相數(shù)據(jù)
c[0] = np.concatenate((a[0, 3::6], a[1, 3::6], a[2, 3::6], a[3, 3::6], a[4, 3::6], a[5, 3::6], a[6, 3::6],a[7, 3::6], a[8, 3::6], a[9, 3::6])) #提取變壓器2,A相數(shù)據(jù)
c[1] = np.concatenate((a[0, 4::6], a[1, 4::6], a[2, 4::6], a[3, 4::6], a[4, 4::6], a[5, 4::6], a[6, 4::6],a[7, 4::6], a[8, 4::6], a[9, 4::6])) #提取變壓器2,B相數(shù)據(jù)
c[2] = np.concatenate((a[0, 5::6], a[1, 5::6], a[2, 5::6], a[3, 5::6], a[4, 5::6], a[5, 5::6], a[6, 5::6],a[7, 5::6], a[8, 5::6], a[9, 5::6])) #提取變壓器2,C相數(shù)據(jù)
#=====判斷越線=================================================================================
# 200 是 scan_rate = 10000.0 除以 50(交流電的頻率)得出 每個(gè)周期 采樣200個(gè)點(diǎn)
# 600 是 120000/200=600 read_request_size*c_time本次總的采樣點(diǎn)數(shù),除以每周期200點(diǎn),總共600個(gè)周期
# 如果 scan_rate = 5000.0 就是 100, alens 是 120000/100=1200
for i in range(600):
#每個(gè)CH拆分成每個(gè)正弦波周期進(jìn)行峰值判斷
a1 = b[0][(i * 200):((i + 1) * 200 - 1)]
b1 = b[1][(i * 200):((i + 1) * 200 - 1)]
c1 = b[2][(i * 200):((i + 1) * 200 - 1)]
a2 = c[0][(i * 200):((i + 1) * 200 - 1)]
b2 = c[1][(i * 200):((i + 1) * 200 - 1)]
c2 = c[2][(i * 200):((i + 1) * 200 - 1)]
#每一項(xiàng)分別判斷上下限并分別告警
if np.amax(a1) > upline or np.amax(a1) < downline :
if break_1_1 <0:
GPIO.output(5, GPIO.HIGH) #相應(yīng)引腳置高電平,點(diǎn)亮LED
break_1_1 = 1
t3 = threading.Thread(target=senddata, args=("Trans1A","100")) #調(diào)用上傳進(jìn)程
t3.start()
if np.amax(b1) > upline or np.amax(b1) < downline ?:
if break_1_2< 0:
GPIO.output(6, GPIO.HIGH) #相應(yīng)引腳置高電平,點(diǎn)亮LED
break_1_2 = 1
t3 = threading.Thread(target=senddata, args=("Trans1B","100")) #調(diào)用上傳進(jìn)程
t3.start()
if np.amax(c1) > upline or np.amax(c1) < downline:
if break_1_3 < 0:
GPIO.output(19, GPIO.HIGH) #相應(yīng)引腳置高電平,點(diǎn)亮LED
break_1_3 = 1
t3 = threading.Thread(target=senddata, args=("Trans1C","100")) #調(diào)用上傳進(jìn)程
t3.start()
if np.amax(a2) > upline or np.amax(a2) < downline:
if break_2_1 < 0:
GPIO.output(16, GPIO.HIGH) #相應(yīng)引腳置高電平,點(diǎn)亮LED
break_2_1=1
t3 = threading.Thread(target=senddata, args=("Trans2A","100")) #調(diào)用上傳進(jìn)程
t3.start()
if np.amax(b2) > upline or np.amax(b2) < downline:
if break_2_2 < 0:
GPIO.output(20, GPIO.HIGH) #相應(yīng)引腳置高電平,點(diǎn)亮LED
break_2_1 =1
t3 = threading.Thread(target=senddata, args=("Trans2B","100")) #調(diào)用上傳進(jìn)程
t3.start()
if np.amax(c2) > upline or np.amax(c2) < downline:
if break_2_3 < 0:
GPIO.output(21, GPIO.HIGH) #相應(yīng)引腳置高電平,點(diǎn)亮LED
break_2_2 = 1
t3 = threading.Thread(target=senddata, args=("Trans2C","100")) #調(diào)用上傳進(jìn)程
t3.start()
#每個(gè)變壓器任何一項(xiàng)有告警就調(diào)用寫文件進(jìn)程寫文件
if np.amax(a1) > upline or np.amax(a1) < downline or np.amax(b1) > upline or np.amax(b1) < downline or np.amax(c1) > upline or np.amax(c1) < downline:
if break_1 < 0:
t1 = threading.Thread(target=writefile, args=(b, file1,)) #調(diào)用寫文件進(jìn)程
t1.start()
break_1 = 2
if np.amax(a2) > upline or np.amax(a2) < downline or np.amax(b2) > upline or np.amax(b2) < downline or np.amax(c2) > upline or np.amax(c2) < downline:
if break_2 < 0:
t1 = threading.Thread(target=writefile, args=(c, file2,)) #調(diào)用寫文件進(jìn)程
t1.start()
break_2 = 2
print("\033[0;32m","analyse finish ", datetime.datetime.now(),"\033[0m") #font color
# ============================================================================================
def main():
"""
This function is executed automatically when the module is run directly.
"""
# ============================================================================================
try:
# Select an MCC 118 HAT device to use.
address = select_hat_device(HatIDs.MCC_118)
hat = mcc118(address)
'''
print('\nSelected MCC 118 HAT device at address', address)
actual_scan_rate = hat.a_in_scan_actual_rate(num_channels, scan_rate)
print('\nMCC 118 continuous scan example')
print(' Functions demonstrated:')
print(' mcc118.a_in_scan_start')
print(' mcc118.a_in_scan_read')
print(' mcc118.a_in_scan_stop')
print(' Channels: ', end='')
print(', '.join([str(chan) for chan in channels]))
print(' Requested scan rate: ', scan_rate)
print(' Actual scan rate: ', actual_scan_rate)
print(' Options: ', enum_mask_to_string(OptionFlags, options))
# ============================================================================================
try:
input('\nPress ENTER to continue ...')
except (NameError, SyntaxError):
pass
'''
# Configure and start the scan.
# Since the continuous option is being used, the samples_per_channel
# parameter is ignored if the value is less than the default internal
# buffer size (10000 * num_channels in this case). If a larger internal
# buffer size is desired, set the value of this parameter accordingly.
print('Starting scan ... Press Ctrl-C to stop\n')
hat.a_in_scan_start(channel_mask, samples_per_channel, scan_rate,options)
# ============================================================================================
try:
while True:
fna = str(datetime.datetime.now()) + ".txt"
#連續(xù)采樣10次
for i in range(c_time):
read_result = hat.a_in_scan_read(read_request_size, timeout) # read channel Data
arraydata[i]=read_result.data
if read_result.hardware_overrun:
print('\n\nHardware overrun\n')
break
elif read_result.buffer_overrun:
print('\n\nBuffer overrun\n')
break
hat.a_in_scan_stop()
hat.a_in_scan_cleanup()
hat.a_in_scan_start(channel_mask, samples_per_channel, scan_rate,options)
#調(diào)用分析進(jìn)程
arraydatat = arraydata
t2 = threading.Thread(target=analyse, args=(arraydatat, fna, ))
t2.start()
except KeyboardInterrupt:
# Clear the '^C' from the display.
print(CURSOR_BACK_2, ERASE_TO_END_OF_LINE, '\n')
print('Stopping')
hat.a_in_scan_stop()
hat.a_in_scan_cleanup()
except (HatError, ValueError) as err:
print('\n', err)
if __name__ == '__main__':
#表用進(jìn)程實(shí)時(shí)監(jiān)測(cè)告警狀態(tài),判斷是否到時(shí)間消除告警
t4 = threading.Thread(target=led_detect)
t4.start()
main()
程序部署
參考這篇教程安裝好 MCC DAQ HATs 的 SDK 和代碼示例。
https://shumeipai.nxez.com/2018/10/18/get-started-with-the-evaluation-for-mcc-118.html
將上面的程序保存為 main.py 然后在程序所在目錄運(yùn)行下面的命令即可啟動(dòng)程序。
-
交流電
+關(guān)注
關(guān)注
14文章
651瀏覽量
33927 -
樹莓派
+關(guān)注
關(guān)注
116文章
1698瀏覽量
105524
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論