增加事件驱动和业务回滚处理方式

This commit is contained in:
冯佳
2026-03-04 08:50:04 +08:00
parent cc4c361df6
commit 47a9dff6ef
25 changed files with 2626 additions and 1780 deletions

350
app/tcp_server.c Normal file
View File

@ -0,0 +1,350 @@
/*
* tcp_server.c
*
* Created on: 2026-03-03
* Author: RT-Thread
*
* 功能: TCP服务器模块实现
* 依赖: lwIP网络栈、SHT40传感器、osal、事件驱动模块
* 跨平台适配: 基于RT-Thread Nano使用标准lwIP API
*/
#include <rtthread.h>
#include "osal.h"
#include "lwip/init.h"
#include "lwip/netif.h"
#include "lwip/tcpip.h"
#include "lwip/sockets.h"
#include "tcp_server.h"
#include "sht40.h"
#include "event_trigger.h"
#include "state_manager.h"
#include "transaction.h"
/* 外部变量声明 */
extern struct netif gnetif;
extern osal_sem_t sem_ip_ready;
/* 全局标志 */
volatile int data_upload_success = 0;
/* 客户端连接数组 */
static client_conn_t clients[TCP_SERVER_MAX_CLIENTS];
/* 辅助函数: 初始化客户端连接 */
static void init_client_connections(void)
{
for (int i = 0; i < TCP_SERVER_MAX_CLIENTS; i++) {
clients[i].sock = -1;
clients[i].connected = 0;
memset(&clients[i].addr, 0, sizeof(clients[i].addr));
}
}
/* 辅助函数: 查找空闲客户端槽位 */
static int find_free_client_slot(void)
{
for (int i = 0; i < TCP_SERVER_MAX_CLIENTS; i++) {
if (!clients[i].connected) {
return i;
}
}
return -1;
}
/* 辅助函数: 接受新客户端连接 */
static void accept_new_connection(int server_sock)
{
int client_sock;
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
client_sock = accept(server_sock, (struct sockaddr *)&client_addr, &client_addr_len);
if (client_sock < 0) {
if (errno != EWOULDBLOCK) {
osal_log_e("Accept error: %d", errno);
}
return;
}
/* 设置客户端socket为非阻塞模式 */
int flags = fcntl(client_sock, F_GETFL, 0);
if (fcntl(client_sock, F_SETFL, flags | O_NONBLOCK) < 0) {
osal_log_e("Failed to set non-blocking mode for client: %d", errno);
closesocket(client_sock);
return;
}
/* 查找空闲客户端槽位 */
int slot = find_free_client_slot();
if (slot < 0) {
osal_log_w("No free client slots available");
closesocket(client_sock);
return;
}
/* 将客户端添加到连接数组 */
clients[slot].sock = client_sock;
clients[slot].addr = client_addr;
clients[slot].connected = 1;
osal_log_i("Client connected from %s:%d",
inet_ntoa(client_addr.sin_addr),
ntohs(client_addr.sin_port));
/* 触发客户端连接事件 */
event_trigger(EVENT_TYPE_TCP_CLIENT_CONNECTED, EVENT_PRIORITY_NORMAL, NULL, 0);
}
/* 辅助函数: 向所有连接的客户端发送数据 */
static void send_data_to_all_clients(const char *data, int len)
{
for (int i = 0; i < TCP_SERVER_MAX_CLIENTS; i++) {
if (clients[i].connected) {
if (send(clients[i].sock, data, len, 0) < 0) {
if (errno != EWOULDBLOCK) {
osal_log_e("Send error to client %d: %d", i, errno);
closesocket(clients[i].sock);
clients[i].sock = -1;
clients[i].connected = 0;
}
}
}
}
}
/* 辅助函数: 检查客户端连接 */
static void check_client_connections(void)
{
char recv_buf[128];
for (int i = 0; i < TCP_SERVER_MAX_CLIENTS; i++) {
if (clients[i].connected) {
/* 检查 incoming data */
fd_set rset;
struct timeval tv;
FD_ZERO(&rset);
FD_SET(clients[i].sock, &rset);
tv.tv_sec = 0;
tv.tv_usec = 10000; // 10ms timeout
int n = select(clients[i].sock + 1, &rset, NULL, NULL, &tv);
if (n > 0) {
int bytes_received = recv(clients[i].sock, recv_buf, sizeof(recv_buf) - 1, 0);
if (bytes_received > 0) {
recv_buf[bytes_received] = '\0';
osal_log_i("Received from client %d: %s", i, recv_buf);
} else if (bytes_received == 0) {
osal_log_w("Client %d disconnected", i);
closesocket(clients[i].sock);
clients[i].sock = -1;
clients[i].connected = 0;
/* 触发客户端断开连接事件 */
event_trigger(EVENT_TYPE_TCP_CLIENT_DISCONNECTED, EVENT_PRIORITY_NORMAL, NULL, 0);
} else {
if (errno != EWOULDBLOCK) {
osal_log_e("Recv error from client %d: %d", i, errno);
closesocket(clients[i].sock);
clients[i].sock = -1;
clients[i].connected = 0;
/* 触发客户端断开连接事件 */
event_trigger(EVENT_TYPE_TCP_CLIENT_DISCONNECTED, EVENT_PRIORITY_NORMAL, NULL, 0);
}
}
} else if (n < 0) {
osal_log_e("Select error for client %d: %d", i, errno);
closesocket(clients[i].sock);
clients[i].sock = -1;
clients[i].connected = 0;
/* 触发客户端断开连接事件 */
event_trigger(EVENT_TYPE_TCP_CLIENT_DISCONNECTED, EVENT_PRIORITY_NORMAL, NULL, 0);
}
}
}
}
/* 辅助函数: 检查是否有客户端连接 */
static int has_client_connections(void)
{
for (int i = 0; i < TCP_SERVER_MAX_CLIENTS; i++) {
if (clients[i].connected) {
return 1;
}
}
return 0;
}
/* 辅助函数: 创建和配置socket */
static int create_and_configure_socket(void)
{
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
{
osal_log_e("Socket creation error: %d", errno);
return -1;
}
/* 设置非阻塞模式 */
int flags = fcntl(sock, F_GETFL, 0);
if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) < 0)
{
osal_log_e("Failed to set non-blocking mode: %d", errno);
closesocket(sock);
return -1;
}
/* 设置socket选项以提高性能 */
int keepalive = 1;
int keepidle = 60;
int keepintvl = 10;
int keepcnt = 3;
setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive));
setsockopt(sock, IPPROTO_TCP, TCP_KEEPIDLE, &keepidle, sizeof(keepidle));
setsockopt(sock, IPPROTO_TCP, TCP_KEEPINTVL, &keepintvl, sizeof(keepintvl));
setsockopt(sock, IPPROTO_TCP, TCP_KEEPCNT, &keepcnt, sizeof(keepcnt));
return sock;
}
/* TCP服务器线程 */
void tcp_server_entry(void *parameter)
{
int server_sock = -1;
struct sockaddr_in server_addr;
/* 等待IP地址就绪 */
if (osal_sem_take(sem_ip_ready, OSAL_WAIT_FOREVER) != OSAL_OK)
{
osal_log_e("Failed to take IP ready semaphore");
return;
}
osal_log_i("TCP Server Starting...");
/* 初始化客户端连接 */
init_client_connections();
while (1)
{
/* 检查链路状态 */
if (!netif_is_link_up(&gnetif)) {
osal_log_w("Link down, waiting for link up...");
osal_thread_mdelay(1000);
continue;
}
/* 创建和配置服务器socket */
if (server_sock < 0) {
server_sock = create_and_configure_socket();
if (server_sock < 0) {
osal_log_e("Failed to create server socket, retrying...");
osal_thread_mdelay(1000);
continue;
}
/* 准备服务器地址 */
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(TCP_SERVER_PORT);
server_addr.sin_addr.s_addr = INADDR_ANY;
memset(&(server_addr.sin_zero), 0, sizeof(server_addr.sin_zero));
/* 绑定socket */
if (bind(server_sock, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) < 0) {
osal_log_e("Bind error: %d", errno);
closesocket(server_sock);
server_sock = -1;
osal_thread_mdelay(1000);
continue;
}
/* 开始监听 */
if (listen(server_sock, TCP_SERVER_MAX_CLIENTS) < 0) {
osal_log_e("Listen error: %d", errno);
closesocket(server_sock);
server_sock = -1;
osal_thread_mdelay(1000);
continue;
}
osal_log_i("TCP Server listening on port %d", TCP_SERVER_PORT);
}
/* 接受新连接 */
accept_new_connection(server_sock);
/* 检查客户端连接 */
check_client_connections();
/* 定期读取并发送温湿度数据 */
static osal_tick_t last_report_time = 0;
int report_interval = 5000; // 5秒上报一次温湿度数据
osal_tick_t current_time = osal_tick_get();
if ((unsigned int)(current_time - last_report_time) >= (unsigned int)report_interval)
{
/* 检查是否有客户端连接 */
if (has_client_connections())
{
/* 开始事务 */
int tx_id = transaction_begin(NULL, NULL);
if (tx_id > 0)
{
float temperature = 0.0f, humidity = 0.0f;
int result = sht40_read_temperature_humidity(&temperature, &humidity);
if (result == 0)
{
char sensor_data[64];
int temp_int = (int)(temperature * 100);
int hum_int = (int)(humidity * 100);
/* 处理负值温度显示 */
int temp_integer = temp_int / 100;
int temp_decimal = temp_int % 100;
if (temp_decimal < 0) temp_decimal = -temp_decimal;
int hum_integer = hum_int / 100;
int hum_decimal = hum_int % 100;
if (hum_decimal < 0) hum_decimal = -hum_decimal;
if (temp_integer >= 0)
{
snprintf(sensor_data, sizeof(sensor_data), "TEMP=+%d.%02d℃,HUM=%d.%02d%%RH\n",
temp_integer, temp_decimal, hum_integer, hum_decimal);
}
else
{
snprintf(sensor_data, sizeof(sensor_data), "TEMP=%d.%02d℃,HUM=%d.%02d%%RH\n",
temp_integer, temp_decimal, hum_integer, hum_decimal);
}
/* 发送给所有连接的客户端 */
send_data_to_all_clients(sensor_data, strlen(sensor_data));
osal_log_i("Sent sensor data: %s", sensor_data);
/* 设置数据上传成功标志 */
data_upload_success = 1;
/* 提交事务 */
transaction_commit(tx_id);
/* 触发传感器数据事件 */
event_trigger(EVENT_TYPE_SENSOR_DATA, EVENT_PRIORITY_NORMAL, NULL, 0);
}
else
{
osal_log_e("Failed to read sensor data, result: %d", result);
/* 回滚事务 */
transaction_rollback(tx_id);
}
}
}
last_report_time = current_time;
}
/* 向所有客户端发送心跳 */
send_data_to_all_clients("Heartbeat\n", 9);
osal_thread_mdelay(TCP_SERVER_HEARTBEAT_INTERVAL);
}
}