import paho.mqtt.client as mqtt import sqlite3 import sys import json import threading from datetime import datetime from queue import Queue from flask import Flask, render_template, request from flask_socketio import SocketIO, emit from flask import request app = Flask(__name__) socketio = SocketIO(app) # MQTT Broker 配置 mqtt_server = { "remote": "192.168.1.106", "port": 1883, "user_name": "admin", "password": "miler502" } topic = "measure/#" database = "mqtt_messages.db" # 初始化数据库连接和表 def init_db(): with sqlite3.connect(database) as conn: cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, topic TEXT NOT NULL, alarm_id TEXT NOT NULL, alarm_value TEXT NOT NULL, car_position_msg TEXT NOT NULL, timestamp DATETIME NOT NULL ) ''') conn.commit() # 初始化数据库 init_db() # 消息队列 message_queue = Queue() # Function to fetch historical data from the database def get_historical_data(topic): with sqlite3.connect(database) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM messages WHERE topic=?", (topic,)) return cursor.fetchall() # 后端 API @app.route('/') def index(): return render_template('index.html') @app.route('/history/') def history(topic): topic = topic.replace('%2F', '/') # 修正路径中的斜杠 historical_data = get_historical_data(topic) return render_template('history.html', topic=topic, historical_data=historical_data) #有问题暂时未实现 @app.route('/clear_history//', methods=['POST']) def clear_history(topic): topic = topic.replace('%3F', '/') # 修正路径中的斜杠 try: with sqlite3.connect(database) as conn: cursor = conn.cursor() cursor.execute("DELETE FROM messages WHERE topic=?", (topic,)) conn.commit() return 'History cleared successfully' except Exception as e: return f"Failed to clear history: {e}", 500 # 定时器线程函数 def process_messages(): while True: try: topic, alarm_id, alarm_value, car_position_msg = message_queue.get(timeout=3) with sqlite3.connect(database) as conn: cursor = conn.cursor() current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') cursor.execute(''' INSERT INTO messages (topic, alarm_id, alarm_value, car_position_msg, timestamp) VALUES (?, ?, ?, ?, ?) ''', (topic, alarm_id, alarm_value, car_position_msg, current_time)) conn.commit() # 将消息通过 WebSocket 发送给前端 socketio.emit('new_message', { 'topic': topic, 'alarm_id': alarm_id, 'alarm_value': alarm_value, 'car_position_msg': car_position_msg, 'timestamp': current_time }) # print(f"Successfully inserted and sent message: {topic}, {alarm_id}, {alarm_value}, {car_position_msg}, {current_time}") except Queue.Empty: pass except Exception as e: print(f"Failed to insert message into database: {e}", file=sys.stderr) # 启动定时器线程 threading.Thread(target=process_messages, daemon=True).start() # MQTT 回调函数 def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected successfully") client.subscribe(topic) else: print(f"Connect failed with code {rc}") def on_message(client, userdata, msg): try: payload = msg.payload.decode() data = json.loads(payload) alarm_id = data.get("AlarmId", "") alarm_value = data.get("AlarmMessage", {}).get("value", "") car_position_msg = data.get("CarPositionMsg", "") message_queue.put((msg.topic, alarm_id, alarm_value, car_position_msg)) except Exception as e: print(f"Failed to process message: {e}", file=sys.stderr) # 设置 MQTT 客户端 client = mqtt.Client() client.username_pw_set(mqtt_server["user_name"], mqtt_server["password"]) client.on_connect = on_connect client.on_message = on_message # 启动 Flask 应用和 MQTT 客户端 if __name__ == '__main__': try: mqtt_thread = threading.Thread(target=lambda: client.connect(mqtt_server["remote"], mqtt_server["port"], 60)) mqtt_thread.start() client.loop_start() socketio.run(app, host='0.0.0.0', port=5000) except Exception as e: print(f"Failed to start server: {e}", file=sys.stderr) finally: client.loop_stop()