外观
使用Docker Compose构建MQTT消息记录服务
2025-06-19
思路
使用两个Broker,将主Broker消息单向桥接至存储Broker,挂载一个中间件订阅存储Broker收到的所有消息并存储在数据库中。
实现
主Broker
mosquitto.conf
persistence true
persistence_location /mosquitto/data
log_dest file /mosquitto/log/mosquitto.log
listener 9001
port 1883
allow_anonymous true
connection mosquitto_storage
address mosquitto_storage:1883
topic # out 0
start_type automatic
存储Broker
mosquitto.conf
persistence true
persistence_location /mosquitto/data
log_dest file /mosquitto/log/mosquitto.log
listener 9001
port 1883
allow_anonymous true
数据库
init.sh
#!/bin/bash
psql -v ON_ERROR_STOP=1 --username "postgres" <<-EOSQL
CREATE DATABASE mqtt_data;
EOSQL
中间件
main.py
import paho.mqtt.client as mqtt
import psycopg2
from psycopg2 import sql
import logging
import time
class Middleware:
def __init__(self, main_broker, storage_broker, db_name, db_user, db_password, db_host, db_port, blank_topic_name):
self.main_broker = main_broker
self.storage_broker = storage_broker
self.db_name = db_name
self.db_user = db_user
self.db_password = db_password
self.db_host = db_host
self.db_port = db_port
self.blank_topic_name = blank_topic_name
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
# 初始化数据库连接
self.postgres_conn = None
self.connect_to_database()
# 初始化MQTT客户端
self.storage_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
self.storage_client.on_connect = self.on_connect
self.storage_client.on_message = self.on_message
self.storage_client.on_disconnect = self.on_disconnect
def connect_to_database(self):
"""连接到PostgreSQL数据库,支持重试"""
max_retries = 5
retry_delay = 5 # 秒
for attempt in range(max_retries):
try:
self.postgres_conn = psycopg2.connect(
dbname=self.db_name,
user=self.db_user,
password=self.db_password,
host=self.db_host,
port=self.db_port
)
self.logger.info("Successfully connected to PostgreSQL database")
return
except psycopg2.OperationalError as e:
self.logger.warning(f"Database connection failed (attempt {attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
retry_delay *= 2 # 指数退避
else:
self.logger.error("Failed to connect to database after multiple attempts")
raise
def start(self):
"""启动MQTT客户端并连接"""
try:
self.logger.info(f"Connecting to MQTT broker: {self.storage_broker}")
self.storage_client.connect(self.storage_broker, 1883, 60)
self.storage_client.loop_forever()
except Exception as e:
self.logger.error(f"Error in MQTT loop: {e}")
finally:
self.cleanup()
def on_connect(self, client, userdata, flags, reason_code, properties):
if reason_code.is_failure:
self.logger.error(f"Failed to connect to MQTT broker: {reason_code}")
return
self.logger.info(f"Connected to MQTT broker with code {reason_code}")
client.subscribe("#")
self.logger.info("Subscribed to all topics (#)")
def on_disconnect(self, client, userdata, reason_code):
if reason_code != 0:
self.logger.warning(f"Unexpected MQTT disconnection: {reason_code}")
# 尝试重新连接
time.sleep(5)
try:
client.reconnect()
except Exception as e:
self.logger.error(f"Reconnection failed: {e}")
def on_message(self, client, userdata, msg):
try:
payload = msg.payload.decode('utf-8')
self.logger.info(f"Received message from topic: {msg.topic}")
# 处理表名
topic_parts = msg.topic.split("/")
table_name = topic_parts[0] if topic_parts else self.blank_topic_name
table_name = table_name.replace("/", "_").replace(" ", "_").replace(".", "_")[:50] # 表名长度限制
# 分表
now = time.localtime()
year_month = f"{now.tm_year % 100:02d}{now.tm_mon:02d}"
table_name = f"{table_name}_{year_month}"
# 处理消息内容
if not payload.strip():
self.logger.warning(f"Empty payload received on topic: {msg.topic}")
return
self.store_message(table_name, msg.topic, payload)
except UnicodeDecodeError:
self.logger.error(f"Failed to decode message payload from topic: {msg.topic}")
except Exception as e:
self.logger.error(f"Error processing message: {e}")
def store_message(self, table_name, topic, payload):
"""将消息存储到数据库,尝试提取数值"""
cursor = None
try:
cursor = self.postgres_conn.cursor()
# 创建表(如果不存在),包含数值列
create_table_query = sql.SQL("""
CREATE TABLE IF NOT EXISTS {} (
id SERIAL PRIMARY KEY,
topic TEXT NOT NULL,
payload TEXT NOT NULL,
value NUMERIC,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""").format(sql.Identifier(table_name))
cursor.execute(create_table_query)
# 尝试将payload转换为数值
value_num = None
try:
# 尝试直接转换为浮点数
value_num = float(payload)
self.logger.info(f"Payload converted to numeric value: {value_num}")
except (ValueError, TypeError):
# 转换失败,保持为NULL
pass
# 插入数据
insert_query = sql.SQL("""
INSERT INTO {} (topic, payload, value)
VALUES (%s, %s, %s)
""").format(sql.Identifier(table_name))
cursor.execute(insert_query, (topic, payload, value_num))
self.postgres_conn.commit()
self.logger.info(f"Stored message in table '{table_name}'")
except psycopg2.InterfaceError:
self.logger.warning("Database connection lost, attempting to reconnect...")
self.connect_to_database()
except psycopg2.DatabaseError as e:
self.logger.error(f"Database error: {e}")
if cursor:
cursor.execute("ROLLBACK")
except Exception as e:
self.logger.error(f"Unexpected error: {e}")
if cursor:
cursor.execute("ROLLBACK")
finally:
if cursor:
cursor.close()
def cleanup(self):
"""清理资源"""
self.logger.info("Cleaning up resources...")
try:
if self.storage_client.is_connected():
self.storage_client.disconnect()
except Exception as e:
self.logger.error(f"Error disconnecting MQTT client: {e}")
try:
if self.postgres_conn and not self.postgres_conn.closed:
self.postgres_conn.close()
except Exception as e:
self.logger.error(f"Error closing database connection: {e}")
if __name__ == "__main__":
# 配置参数
MAIN_BROKER = "mosquitto_main"
STORAGE_BROKER = "mosquitto_storage"
DB_NAME = "mqtt_data"
DB_USER = "postgres"
DB_PASSWORD = "mosquitto"
DB_HOST = "postgresql"
DB_PORT = "5432"
BLANK_TOPIC_NAME = "blank_topic"
# 创建中间件实例并启动
middleware = Middleware(
main_broker=MAIN_BROKER,
storage_broker=STORAGE_BROKER,
db_name=DB_NAME,
db_user=DB_USER,
db_password=DB_PASSWORD,
db_host=DB_HOST,
db_port=DB_PORT,
blank_topic_name=BLANK_TOPIC_NAME
)
try:
middleware.start()
except KeyboardInterrupt:
middleware.logger.info("Shutting down by user request")
middleware.cleanup()
except Exception as e:
middleware.logger.error(f"Unexpected shutdown: {e}")
middleware.cleanup()
Docker Compose
services:
mosquitto_main:
image: eclipse-mosquitto
container_name: mosquitto_main
ports:
- "custom_port:1883"
- "custom_port:9001"
volumes:
- /custom_path/mosquitto/main_broker/config:/mosquitto/config
- /custom_path/mosquitto/main_broker/data:/mosquitto/data
- /custom_path/mosquitto/main_broker/log:/mosquitto/log
environment:
- TZ=Asia/Shanghai
restart: always
mosquitto_storage:
image: eclipse-mosquitto
container_name: mosquitto_storage
ports:
- "custom_port:1883"
- "custom_port:9001"
volumes:
- /custom_path/mosquitto/storage_broker/config:/mosquitto/config
- /custom_path/mosquitto/storage_broker/data:/mosquitto/data
- /custom_path/mosquitto/storage_broker/log:/mosquitto/log
environment:
- TZ=Asia/Shanghai
restart: always
postgresql:
image: postgres:latest
container_name: mosquitto_postgresql
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: mosquitto
PGDATA: /var/lib/postgresql/data/pgdata
TZ: Asia/Shanghai
ports:
- "custom_port:5432"
volumes:
- /custom_path/mosquitto/postgresql/init.sh:/docker-entrypoint-initdb.d/init-db.sh
- postgresql_data:/var/lib/postgresql/data
restart: always
middleware:
image: python:3.10-slim
container_name: mqtt_middleware
volumes:
- /custom_path/mosquitto/middleware/requirements.txt:/requirements.txt
- /custom_path/mosquitto/middleware/main.py:/middleware.py
command: >
sh -c "pip install -i https://pypi.mirrors.ustc.edu.cn/simple --upgrade pip &&
pip install -i https://pypi.mirrors.ustc.edu.cn/simple -r /requirements.txt &&
python /middleware.py"
environment:
- TZ=Asia/Shanghai
depends_on:
- mosquitto_storage
- postgresql
restart: always
volumes:
postgresql_data:
driver: local
driver_opts:
type: none
o: bind
device: /data/mosquitto_postgresql_data
直接docker compose up -d
拉起即可