|
楼主 |
发表于 2024-8-6 18:08
|
显示全部楼层
本帖最后由 浅冰Column 于 2024-8-12 11:07 编辑
基于 Python 的气象要素数据获取程序,通过 Modbus-RTU 协议在 RS-485 上与传感器通信。
异步采集和保存传感器的风速、风向、温度、湿度、气压和降水量数据,并于每分钟、每小时产生时段数据。
记录日志,各传感器配置可独立自定义,程序模块化。
(24/08/12:更新程序)
程序代码
代码内关于单位矢量平均法计算平均风速的部分,我在教程区新开了一帖《使用单位矢量平均法计算平均风向(公式、代码)》,供各位参考。
其他详见代码内注释。
- import os
- import json
- import signal
- import logging
- import asyncio
- from logging.handlers import TimedRotatingFileHandler
- from datetime import datetime, timedelta
- from pymodbus.client import AsyncModbusSerialClient as ModbusClient
- from pymodbus.exceptions import ModbusException, ConnectionException
- from dataclasses import dataclass, asdict
- import math
- # Modbus 连接配置
- PORTS = {
- "wind": {
- "port": "/dev/ttyUSB0",
- "baudrate": 4800,
- "stopbits": 1,
- "bytesize": 8,
- "timeout": 3,
- },
- "wind_dir": {
- "port": "/dev/ttyUSB1",
- "baudrate": 4800,
- "stopbits": 2,
- "bytesize": 7,
- "timeout": 3,
- },
- "thp": {
- "port": "/dev/ttyUSB2",
- "baudrate": 9600,
- "stopbits": 1,
- "bytesize": 8,
- "timeout": 3,
- },
- "rain": {
- "port": "/dev/ttyUSB3",
- "baudrate": 4800,
- "stopbits": 1,
- "bytesize": 8,
- "timeout": 3,
- },
- }
- LOG_FILE = "sensors.log"
- MINUTE_DATA_DIR = "/data/minute"
- HOUR_DATA_DIR = "/data/hour"
- # 日志配置,使用按日切割的文件处理器记录日志
- logging.basicConfig(
- handlers=[TimedRotatingFileHandler(LOG_FILE, when="D")],
- level=logging.DEBUG,
- format="[%(asctime)s][%(name)s][%(levelname)s] %(message)s",
- )
- # SIGINT 退出信号捕获配置
- def handle_sigint(signal_number, frame):
- for task in asyncio.all_tasks():
- task.cancel()
- loop = asyncio.get_event_loop()
- loop.stop()
- # 数据类,用于存储传感器数据
- @dataclass
- class SensorData:
- wind_speed_avg: float
- wind_direction_avg: float
- max_wind_speed: float
- max_wind_speed_direction: float
- temperature: float
- humidity: float
- pressure: float
- rainfall: float
- # Modbus 客户端交互类
- class ModbusClientHandler:
- def __init__(self, port, baudrate, stopbits, bytesize, timeout):
- self.port = port
- self.baudrate = baudrate
- self.stopbits = stopbits
- self.bytesize = bytesize
- self.timeout = timeout
- self.client = None
- # 异步上下文管理器,用于连接和断开 Modbus 客户端
- async def __aenter__(self):
- self.client = await ModbusClient(
- port=self.port,
- baudrate=self.baudrate,
- stopbits=self.stopbits,
- bytesize=self.bytesize,
- timeout=self.timeout,
- method="rtu",
- )
- if not self.client.protocol:
- raise ConnectionException(f"无法连接到 {self.port}")
- return self
- async def __aexit__(self, exc_type, exc, tb):
- if self.client:
- await self.client.close()
- # 通用读寄存器方法
- async def read_registers(self, address, count, slave=1):
- response = await self.client.read_holding_registers(address, count, slave)
- if response.isError():
- logging.error(f"Modbus 错误: {response}")
- return ["/"] * count
- return response.registers
- # 获取风速和风级数据
- async def get_wind_speed_scale(self):
- data = await self.read_registers(address=0x00, count=0x02)
- return {"wind_speed": data[0] * 0.1, "wind_scale": data[1]}
- # 获取风向数据
- async def get_wind_direction(self):
- data = await self.read_registers(address=0x00, count=0x02)
- return {"wind_direction": data[0]}
- # 获取温度、湿度、气压数据
- async def get_temperature_humidity_pressure(self):
- data = await self.read_registers(address=0x00, count=0x03)
- return {
- "temperature": data[0] * 0.1,
- "humidity": data[1] * 0.1,
- "pressure": data[2] * 0.1,
- }
- # 获取降雨量数据
- async def get_rainfall(self):
- data = await self.read_registers(address=0x00, count=0x01)
- return {"rainfall": data[0] * 0.1}
- # 单位矢量平均法计算平均风向
- def calculate_average_wind_direction(wind_directions):
- # 转为弧度制
- wind_directions_rad = [math.radians(d) for d in wind_directions]
- # 计算平均值
- X_avg = sum(math.sin(d) for d in wind_directions_rad) / len(wind_directions)
- Y_avg = sum(math.cos(d) for d in wind_directions_rad) / len(wind_directions)
- # 计算风向平均值的弧度
- average_wind_direction_rad = math.atan2(X_avg, Y_avg)
- # 转回角度制
- average_wind_direction_deg = math.degrees(average_wind_direction_rad)
- # 出界修正
- if average_wind_direction_deg < 0:
- average_wind_direction_deg += 360
- return average_wind_direction_deg
- # 获取并处理分钟实况数据
- async def collect_minute_data(clients):
- wind_speed_data, wind_direction_data = [], []
- temperature_data, humidity_data, pressure_data, rainfall_data = [], [], [], []
- for c in range(60):
- wind = await clients["wind"].get_wind_speed_scale() # 每秒采样一次风速和风级数据
- wind_speed_data.append(wind["wind_speed"])
- wind_dir = await clients["wind_dir"].get_wind_direction() # 每秒采样一次风向数据
- wind_direction_data.append(wind_dir["wind_direction"])
- if c % 10 == 0: # 每6秒采样一次温度、湿度、气压数据
- thp = await clients["thp"].get_temperature_humidity_pressure()
- temperature_data.append(thp["temperature"])
- humidity_data.append(thp["humidity"])
- pressure_data.append(thp["pressure"])
- if c == 0: # 每分钟采样一次降雨量数据
- rainfall = await clients["rain"].get_rainfall()
- rainfall_data.append(rainfall["rainfall"])
- await asyncio.sleep(1)
- # 计算各种平均值和最大值
- wind_speed_avg = sum(wind_speed_data) / len(wind_speed_data)
- wind_direction_avg = calculate_average_wind_direction(wind_direction_data)
- max_wind_speed = max(wind_speed_data)
- max_wind_speed_direction = wind_direction_data[wind_speed_data.index(max_wind_speed)]
- # 返回分钟数据
- return SensorData(
- wind_speed_avg=wind_speed_avg,
- wind_direction_avg=wind_direction_avg,
- max_wind_speed=max_wind_speed,
- max_wind_speed_direction=max_wind_speed_direction,
- temperature=sum(temperature_data) / len(temperature_data),
- humidity=sum(humidity_data) / len(humidity_data),
- pressure=sum(pressure_data) / len(pressure_data),
- rainfall=sum(rainfall_data) / len(rainfall_data),
- )
- # 获取并保存分钟实况数据到本地
- async def save_minute_data(clients):
- while True:
- data = await collect_minute_data(clients) # 获取分钟数据
- now = datetime.now()
- file_name = now.strftime("%Y-%m-%d %H:%M") # 生成文件名
- directory = MINUTE_DATA_DIR
- if not os.path.exists(directory):
- os.makedirs(directory) # 如果目录不存在,则创建目录
- try:
- # 将数据保存为JSON格式文件
- with open(os.path.join(directory, file_name), "w") as file:
- json.dump(asdict(data), file, indent=4)
- except Exception as e:
- logging.error(f"文件错误: {e}")
- await asyncio.sleep(60) # 每分钟获取并保存一次
- # 根据时段内分钟实况数据产生小时数据
- async def collect_hourly_data():
- now = datetime.now()
- hour_start = now.replace(minute=0, second=0, microsecond=0) # 获取当前小时的起始时间
- file_names = [
- (hour_start + timedelta(minutes=i)).strftime("%Y-%m-%d %H:%M") for i in range(60)
- ] # 生成当前小时内所有分钟文件的名称
- directory = MINUTE_DATA_DIR
- data_points = []
- for file_name in file_names:
- file_path = os.path.join(directory, file_name)
- if os.path.exists(file_path):
- try:
- with open(file_path, "r") as file:
- data_points.append(json.load(file)) # 读取并加载分钟数据
- except Exception as e:
- logging.error(f"文件错误: {e}")
- if len(data_points) < 60:
- return None # 如果数据点不足60个,则跳过本次(不产生小时数据)
- # 计算各种数据的平均值和最大值
- wind_speed = [d["wind_speed_avg"] for d in data_points]
- wind_direction = [d["wind_direction_avg"] for d in data_points]
- max_wind_speed = max([d["max_wind_speed"] for d in data_points])
- max_wind_speed_direction = data_points[[d["max_wind_speed"] for d in data_points].index(max_wind_speed)][
- "max_wind_speed_direction"
- ]
- # 返回小时数据
- return {
- "wind_speed_avg_2min": (sum(wind_speed[-2:]) / 2 if len(wind_speed) >= 2 else "/"),
- "wind_speed_avg_10min": (sum(wind_speed[-10:]) / 10 if len(wind_speed) >= 10 else "/"),
- "wind_direction_avg_2min": (
- calculate_average_wind_direction(wind_direction[-2:]) if len(wind_direction) >= 2 else "/"
- ),
- "wind_direction_avg_10min": (
- calculate_average_wind_direction(wind_direction[-10:]) if len(wind_direction) >= 10 else "/"
- ),
- "max_wind_speed": max_wind_speed,
- "max_wind_speed_direction": max_wind_speed_direction,
- "temperature_avg": sum([d["temperature"] for d in data_points]) / len(data_points),
- "humidity_avg": sum([d["humidity"] for d in data_points]) / len(data_points),
- "pressure_avg": sum([d["pressure"] for d in data_points]) / len(data_points),
- "rainfall_avg": sum([d["rainfall"] for d in data_points]) / len(data_points),
- }
- # 获取并保存小时数据到本地
- async def save_hourly_data():
- while True:
- await asyncio.sleep(3600) # 每小时获取并保存一次
- data = await collect_hourly_data()
- if data is None:
- logging.warning("数据点不足,跳过本次小时数据生成。")
- continue
- now = datetime.now()
- file_name = now.strftime("%Y-%m-%d %H") # 生成小时文件名
- directory = HOUR_DATA_DIR
- if not os.path.exists(directory):
- os.makedirs(directory) # 如果目录不存在,则创建目录
- try:
- # 将小时数据保存为JSON格式文件
- with open(os.path.join(directory, file_name), "w") as file:
- json.dump(data, file, indent=4)
- except Exception as e:
- logging.error(f"文件错误: {e}")
- # 主函数
- async def main():
- # 注册 SIGINT 退出信号处理器
- signal.signal(signal.SIGINT, handle_sigint)
- # 采用异步上下文管理器
- async with asyncio.TaskGroup() as tg:
- clients = {key: await tg.start(ModbusClientHandler(**port)) for key, port in PORTS.items()}
- # 创建并启动任务
- minute_task = asyncio.create_task(save_minute_data(clients))
- hourly_task = asyncio.create_task(save_hourly_data())
- # 等待所有任务完成
- try:
- await asyncio.gather(minute_task, hourly_task)
- except asyncio.CancelledError:
- logging.info("正在退出")
- if __name__ == "__main__":
- try:
- asyncio.run(main()) # 运行主程序
- except (ConnectionException, ModbusException) as e:
- logging.error(f"Modbus 错误: {e}")
复制代码 |
|