艾體寶干貨 | IOTA流量分析秘籍第三招:檢測(cè)黑名單上的IP地址
IOTA 設(shè)備提供 RESTful API,允許直接訪問(wèn)存儲(chǔ)在設(shè)備上的數(shù)據(jù)。這對(duì)于集成到各種場(chǎng)景中非常有用。在本例中,可以過(guò)濾當(dāng)前捕獲內(nèi)容中的黑名單 IP。用例可能是分析某些內(nèi)部 IP 是否未傳播到特定訪問(wèn)級(jí)別之外,或者某些設(shè)備是否從不應(yīng)訪問(wèn)的區(qū)域進(jìn)行訪問(wèn)。
IOTA簡(jiǎn)介
IOTA 是一款功能強(qiáng)大的網(wǎng)絡(luò)捕獲和分析解決方案,適用于邊緣和核心網(wǎng)絡(luò)。IOTA 系列包括便攜式 EDGE 型號(hào)、高速 CORE 型號(hào)和 IOTA CM 集中設(shè)備管理系統(tǒng)。IOTA 解決方案可為分支機(jī)構(gòu)、中小企業(yè)和核心網(wǎng)絡(luò)(如數(shù)據(jù)中心)提供快速高效的網(wǎng)絡(luò)分析和故障排除功能。
IOTA 設(shè)備提供 RESTful API,允許直接訪問(wèn)存儲(chǔ)在設(shè)備上的數(shù)據(jù)。這對(duì)于集成到各種場(chǎng)景中非常有用。在本例中,可以過(guò)濾當(dāng)前捕獲內(nèi)容中的黑名單 IP。用例可能是分析某些內(nèi)部 IP 是否未傳播到特定訪問(wèn)級(jí)別之外,或者某些設(shè)備是否從不應(yīng)訪問(wèn)的區(qū)域進(jìn)行訪問(wèn)。
一、前提條件
二、設(shè)置 IOTA 設(shè)備
- 連接 IOTA 設(shè)備
- 將 IOTA 設(shè)備連接到要監(jiān)控的網(wǎng)段。
- 確定正確的網(wǎng)段至關(guān)重要。通過(guò)目視檢查 IOTA 設(shè)備上的流量,確保在捕獲期間使用正確的網(wǎng)段。
- 訪問(wèn) IOTA 網(wǎng)絡(luò)界面
- 打開(kāi)網(wǎng)絡(luò)瀏覽器并輸入 IOTA 設(shè)備的 IP 地址。
- 例如:https://(用 IOTA 設(shè)備的實(shí)際 IP 地址替換)。
- 使用正確的憑證登錄。該腳本使用的用戶(hù)名/密碼組合應(yīng)允許它查看必要的信息。
- 配置捕獲設(shè)置
- 設(shè)置用于捕獲數(shù)據(jù)包的網(wǎng)絡(luò)接口。
- 從設(shè)備的 Web 界面選擇要監(jiān)控的接口。
- 根據(jù)需要配置過(guò)濾器,以關(guān)注特定流量類(lèi)型。
- 您可以根據(jù)需要設(shè)置過(guò)濾器,只捕獲特定類(lèi)型的流量,如 HTTP 或 FTP。
三、使用 IOTA 數(shù)據(jù)庫(kù)查詢(xún)引擎
查詢(xún)引擎允許任何用戶(hù)以機(jī)器可讀的方式與 IOTA 設(shè)備交互,以查詢(xún)?cè)獢?shù)據(jù)數(shù)據(jù)庫(kù)。用戶(hù)只需輸入用戶(hù)名/密碼,即可通過(guò) RESTful API 訪問(wèn)數(shù)據(jù)庫(kù)。
為確保敏感信息不被傳播,以及設(shè)備上的任何信息都不會(huì)被用戶(hù)篡改,建議創(chuàng)建一個(gè)新的特殊用戶(hù)。在本例中,創(chuàng)建了一個(gè)用戶(hù)名為 “apitest ”的用戶(hù)。請(qǐng)注意,該用戶(hù)已作為 “查看器 ”自動(dòng)添加到組織中。不需要配置任何其他功能,而且該角色只允許只讀訪問(wèn),可以防止任何誤傳。
四、安全考慮
這里使用的解決方案是為使用腳本創(chuàng)建一個(gè)專(zhuān)用的用戶(hù)/密碼組合。由于建議保護(hù) IOTA 的直接訪問(wèn)不受任何外部干擾,并使用 HTTPS 連接,因此這樣做沒(méi)有問(wèn)題。但請(qǐng)注意,良好的做法是確保該用戶(hù)/密碼組合不在任何其他情況下使用,并且該用戶(hù)僅具有查看器角色(新用戶(hù)默認(rèn)具有該角色)。
五、檢測(cè)黑名單 IP 的 Python 腳本
import requests
import re
import time
import urllib.parse
import json
import argparse
from urllib3.exceptions import InsecureRequestWarning
def main():
# CLI options
parser = argparse.ArgumentParser(description='Compare IP blacklist files against IOTA metadata. Blacklist files must contain a list of single IPv4/IPv6 addresses or IP subnet in CIDR notation.')
required_args = parser.add_argument_group('required')
required_args.add_argument('-d', '--device_ip', help='Device IP address')
required_args.add_argument('-i', '--infile', nargs='+', help = 'Blacklist file(s)')
parser.add_argument('-u', '--device_username', default='admin', help='Device IP username (default: admin)')
parser.add_argument('-p', '--device_password', default='admin', help='Device IP password (default: admin)')
parser.add_argument('-l', '--query_time_window_s', type=int, default=600, help='Number of seconds from the current time to query IOTA metadata (default: 600)')
args = parser.parse_args()
if args.device_ip:
iota_ip_address = args.device_ip
if args.device_username:
iota_username = args.device_username
if args.device_password:
iota_password = args.device_password
if args.query_time_window_s:
query_time_window_s = args.query_time_window_s
if args.infile:
blacklist_files = args.infile
query_time_window_end = int(time.time())
query_time_window_start = query_time_window_end - query_time_window_s
results = {}
session = requests.Session()
session.auth = (iota_username, iota_password)
# Suppress warnings from urllib3
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
# Open blacklist files, iterate over them
for blacklist_file in blacklist_files:
count_fail = 0
print(f' - {blacklist_file}')
with open(blacklist_file) as f:
for line in f:
# Skip comments or empty lines
if re.match(r"^[;#s]", line):
continue
# Current IP address or subnet
ip = line.split()[0]
print(f' - {ip:45}', end='')
# Handle CIDR subnet query
if re.match(r"^(?:(?:(?:[0-9]{1,3}.){3}[0-9]{1,3}/[0-9]{1,2})|(?:[0-9a-fA-F:]{2,39}/[0-9]{1,2}))$", ip):
query = urllib.parse.quote(f"SELECT COUNT(DISTINCT FLOW_ID) AS matching_flows FROM flows.base WHERE (isIPAddressInRange(IP_SRC, '{ip}') OR isIPAddressInRange(IP_DST, '{ip}')) AND DATE_PKT_MS_FIRST >= toDateTime64({query_time_window_start}, 3) AND DATE_PKT_MS_FIRST <= toDateTime64({query_time_window_end}, 3) FORMAT JSON", safe='()*')
response = session.get('https://'+iota_ip_address+'/api/datasources/proxy/3/?query='+query, verify=False)
matching_flows = json.loads(response.content)['data'][0]['matching_flows']
if matching_flows == "0":
print('Ok')
else:
count_fail += 1
print(f'Match (flows: {matching_flows})')
# Handle single IP address query
elif re.match(r"^(?:(?:(?:[0-9]{1,3}.){3}[0-9]{1,3})|(?:[0-9a-fA-F:]{2,39}))$", ip):
query = urllib.parse.quote(f"SELECT COUNT(DISTINCT FLOW_ID) AS matching_flows FROM flows.base WHERE (IP_SRC = '{ip}' OR IP_DST = '{ip}') AND DATE_PKT_MS_FIRST >= toDateTime64({query_time_window_start}, 3) AND DATE_PKT_MS_FIRST <= toDateTime64({query_time_window_end}, 3) FORMAT JSON", safe='()*')
response = session.get('https://'+iota_ip_address+'/api/datasources/proxy/3/?query='+query, verify=False)
matching_flows = json.loads(response.content)['data'][0]['matching_flows']
if matching_flows == "0":
print('Ok')
else:
count_fail += 1
print(f'Match (flows: {matching_flows})')
# Some unknown case
else:
print('Unhandled case')
# Record results after completing file
results[blacklist_file] = count_fail
# Print results
print('nResultsn===')
for file in results:
print(f' - {file:47}{str(results[file])} matches')
if __name__ == '__main__':
main()
六、腳本分步說(shuō)明
1.導(dǎo)入必要的庫(kù)
import requests
import re
import time
import urllib.parse
import json
import argparse
from urllib3.exceptions import InsecureRequestWarning
2.設(shè)置配置
# CLI options
parser = argparse.ArgumentParser(description='Compare IP blacklist files against IOTA metadata. Blacklist files must contain a list of single IPv4/IPv6 addresses or IP subnet in CIDR notation.')
required_args = parser.add_argument_group('required')
required_args.add_argument('-d', '--device_ip', help='Device IP address')
required_args.add_argument('-i', '--infile', nargs='+', help = 'Blacklist file(s)')
parser.add_argument('-u', '--device_username', default='apitest', help='Device IP username (default: apitest)')
parser.add_argument('-p', '--device_password', default='apitest', help='Device IP password (default: apitest)')
parser.add_argument('-l', '--query_time_window_s', type=int, default=600, help='Number of seconds from the current time to query IOTA metadata (default: 600)')
- -d / -device_ip
- IOTA 設(shè)備的 IP。
- -i / -infile
- 包含要查找的 IP 地址的文本文件。也可以提供地址范圍的 CIDR 子網(wǎng)查詢(xún)。文件可能包含以 ; 或 # 為前綴的注釋。任何空行或以空格開(kāi)頭的行都將被省略。通過(guò)多次提供此參數(shù),可以向腳本提交多個(gè)輸入文件。
- -U / -device_username
- 上一步中定義的用戶(hù)名。
- -p / -device_password
- 上述步驟中定義的密碼。
- -l / -query_time_window_s
- 查詢(xún)應(yīng)在過(guò)去多少時(shí)間內(nèi)進(jìn)行。默認(rèn)值為 600 秒,相當(dāng)于 10 分鐘。
輸入文件:
以輸入文件為例,它將搜索特定子網(wǎng)以及內(nèi)部 DNS 服務(wù)器的所有 IP 地址。
# IP Address of DNS server
192.168.1.250
# Host range for internal testing
10.40.0.0/24
3.檢測(cè)黑名單 IP 的主邏輯
主邏輯打開(kāi)黑名單文件,逐步進(jìn)行解析,并查詢(xún)內(nèi)部數(shù)據(jù)庫(kù),進(jìn)行相應(yīng)的子網(wǎng)查詢(xún)或直接 IP 地址查詢(xún)。
query = urllib.parse.quote(f"SELECT COUNT(DISTINCT FLOW_ID) AS matching_flows FROM flows.base WHERE (isIPAddressInRange(IP_SRC, '{ip}') OR isIPAddressInRange(IP_DST, '{ip}')) AND DATE_PKT_MS_FIRST >= toDateTime64({query_time_window_start}, 3) AND DATE_PKT_MS_FIRST <= toDateTime64({query_time_window_end}, 3) FORMAT JSON", safe='()*')
response = session.get('https://'+iota_ip_address+'/api/datasources/proxy/3/?query='+query, verify=False)
matching_flows = json.loads(response.content)['data'][0]['matching_flows']
首先,設(shè)置查詢(xún)。在本例中,我們只需要查詢(xún)找到的條目的數(shù)量,這就是我們執(zhí)行 COUNT 操作的原因。數(shù)據(jù)庫(kù)以 SQL 格式進(jìn)行查詢(xún)。
創(chuàng)建查詢(xún)后,將建立一個(gè)會(huì)話,該會(huì)話將使用 HTTP GET 操作來(lái)查詢(xún)信息。查詢(xún)結(jié)果以 JSON 數(shù)據(jù)形式顯示,目前正在讀取第一個(gè)條目。
對(duì)于找到的每個(gè) IP 地址,將打印匹配條目的數(shù)量,對(duì)于每個(gè)文件,將打印匹配條目的總數(shù)。
七、運(yùn)行腳本
建議在 Python 環(huán)境中運(yùn)行腳本。可以使用以下命令來(lái)實(shí)現(xiàn):
python3 -m venv ./Development/iota
source ./Development/iota/bin/activate
python3 -m pip install argparse requests
這將創(chuàng)建一個(gè) Python3 venv 環(huán)境(更多信息請(qǐng)參見(jiàn)此處),并安裝默認(rèn)環(huán)境中不存在的所需庫(kù)。
然后,假設(shè) Python 腳本和 infile 文件都在當(dāng)前目錄下,下面的命令將運(yùn)行該腳本:
在這種情況下,IOTA 的定位是在出站廣域網(wǎng)連接上進(jìn)行捕獲。在最后 10 分鐘內(nèi),內(nèi)部 DNS 服務(wù)器沒(méi)有進(jìn)行任何出站 DNS 查詢(xún),但內(nèi)部測(cè)試網(wǎng)絡(luò)通過(guò)廣域網(wǎng)連接發(fā)送了 4 次數(shù)據(jù)。
在本例中,腳本被用作白名單測(cè)試,但也可輕松配置為黑名單測(cè)試。
結(jié)論
通過(guò)本指南,您可以有效地利用 IOTA 設(shè)備及其 REST API 來(lái)監(jiān)控網(wǎng)絡(luò)流量,并使用 Python 檢測(cè)黑名單 IP 地址。這種方法為在自動(dòng)化監(jiān)控環(huán)境中維護(hù)網(wǎng)絡(luò)安全提供了強(qiáng)大的工具。
審核編輯 黃宇
-
API
+關(guān)注
關(guān)注
2文章
1452瀏覽量
61418 -
網(wǎng)絡(luò)安全
+關(guān)注
關(guān)注
10文章
3032瀏覽量
59050 -
Iota
+關(guān)注
關(guān)注
0文章
25瀏覽量
8346
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論