#include "main.h" #include "os-app.h" #include #include #include #include "usart-board.h" #include #include "transport.h" #include #include "mqtt-board.h" #include "netdrv.h" #include "stdlib.h" #include "data.h" #if !defined(LOG_TAG) #define LOG_TAG "mqtt" #endif #include ALIGN (RT_ALIGN_SIZE) static char thread_stack[2048]; static struct rt_thread mqtt_thread; static rt_timer_t timer; static char mqtt_tx_buff[500]; static char mqtt_rx_buff[500]; static enum {CONN = 1, DISC = 0} State = DISC; MqttDrv_t gMqttDrv = { .State = DISC, .txbuff = mqtt_tx_buff, .tx_size = 0, .tx_index = 0, .rxbuff = mqtt_rx_buff, .rx_size = 0, .rx_index = 0 }; // 定义mqtt 的数据存储buffer #define MQTT_BUFF_LENGTH 1500 static uint8_t mqtt_buffer[MQTT_BUFF_LENGTH]; //static uint8_t mqtt_payload[1200]; //// 定义mqtt消息的 数据长度 //static int mqtt_payload_size = 0; // 定义mqtt消息的 数据长度 static int mqtt_buflen = 0; // 定义qos static int qos[SUBTOPIC_COUNT + 1] = {0}; // 定义message id static int message_id = 0; // 定义mqtt 链接参数 MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer; MQTTString subtopic[SUBTOPIC_COUNT + 1] = {MQTTString_initializer}; MQTTString pubtopic[SUBTOPIC_COUNT + 1] = {MQTTString_initializer}; char sub_topic_str[SUBTOPIC_COUNT + 1][64] = {NULL}; char pub_topic_str[SUBTOPIC_COUNT + 1][64] = {NULL}; char client_id_str[64] = {NULL}; static char device_id[32] = {NULL}; // 设备编码 static char user_topic[32] = {NULL}; // topic static void net_data_rxdone (void) { rt_event_send (gUartEvents, EVENT_UART_ETH1_RX); } static int timerout_timer = 0; static void timerouter (void * parameter) { timerout_timer ++; if (timerout_timer > 90) // 超过90s 没有收到数据,重启网络 { rt_event_send (gTrigEvents, EVENT_TRIG_MQTT_RST); timerout_timer = 0; } } static void thread_entry (void * argument) { int count = 0; // 初始化串口驱动 rt_kprintf ("%s was start!\r\n", __FILE__); timer = rt_timer_create ("timer", timerouter, RT_NULL, 1000, RT_TIMER_FLAG_PERIODIC); // 异常检测定时器 // 读取配置文件 char * p = ef_get_env ("device_id"); rt_sprintf (device_id, "%s", p); p = ef_get_env ("mqtt_topic"); rt_memcpy (user_topic, p, strlen (p)); int id = rand() % 1000; rt_sprintf (client_id_str, "GID_SLUICE_V2_220000%s", device_id) ; rt_kprintf ("clinet:%s\r\n", client_id_str); connectData.clientID.cstring = client_id_str; connectData.keepAliveInterval = 120; connectData.cleansession = 1; connectData.username.cstring = "admin"; connectData.password.cstring = "admin"; rt_sprintf (sub_topic_str[0], "device/%s/tx/220000%s", "update", device_id); rt_sprintf (pub_topic_str[0], "device/%s/rx/220000%s", "update", device_id); // 读取设备编码 rt_sprintf (sub_topic_str[1], "device/%s/tx/220000%s", user_topic, device_id); rt_sprintf (pub_topic_str[1], "device/%s/rx/220000%s", user_topic, device_id); #if (SUBTOPIC_COUNT > 1) rt_sprintf (sub_topic_str[2], "device/%s/tx/2200002308170002", user_topic); rt_sprintf (pub_topic_str[2], "device/%s/rx/2200002308170002", user_topic); #endif for (int i = 0; i < SUBTOPIC_COUNT + 1; i++) { subtopic[i].cstring = sub_topic_str[i]; pubtopic[i].cstring = pub_topic_str[i]; } rt_timer_start (timer); while (1) { MQTTINIT: if (g_netDrv->State != Conn) { //rt_kprintf ("mqtt tcp-connected fail\r\n"); rt_thread_delay (1000); // 延迟3s 等待模块连接TCP 服务成功 goto MQTTINIT; } // 链接MQTT 服务器 rt_kprintf ("mqtt connected\r\n"); mqtt_buflen = MQTTSerialize_connect (mqtt_buffer, MQTT_BUFF_LENGTH, &connectData); netdrv_service_tx (mqtt_buffer, mqtt_buflen); if (RT_EOK != rt_event_recv (gTrigEvents, EVENT_TRIG_NETMODE_RX1, RT_EVENT_FLAG_AND | RT_EVENT_FLAG_CLEAR, 10000, RT_NULL)) { goto MQTTINIT; } rt_thread_mdelay (10); if (MQTTPacket_read (mqtt_buffer, MQTT_BUFF_LENGTH, netdrv_getdata) == CONNACK) { unsigned char sessionPresent, connack_rc; if (MQTTDeserialize_connack (&sessionPresent, &connack_rc, mqtt_buffer, MQTT_BUFF_LENGTH) != 1 || connack_rc != 0) { rt_kprintf ("Unable to connect, return code %d\n", connack_rc); rt_thread_delay (3000); netdrv_clear(); goto MQTTINIT; } } else { rt_kprintf ("mqtt error"); netdrv_clear(); goto MQTTINIT; } netdrv_clear(); rt_kprintf ("connect to mqtt successful\n"); // 订阅topic mqtt_buflen = MQTTSerialize_subscribe (mqtt_buffer, MQTT_BUFF_LENGTH, 0, message_id++, SUBTOPIC_COUNT + 1, subtopic, qos); netdrv_service_tx (mqtt_buffer, mqtt_buflen); if (RT_EOK != rt_event_recv (gTrigEvents, EVENT_TRIG_NETMODE_RX1, RT_EVENT_FLAG_AND | RT_EVENT_FLAG_CLEAR, 10000, RT_NULL)) { netdrv_clear(); goto MQTTINIT; } if (MQTTPacket_read (mqtt_buffer, MQTT_BUFF_LENGTH, netdrv_getdata) == SUBACK) /* wait for suback */ { unsigned short submsgid; int subcount; int granted_qos; MQTTDeserialize_suback (&submsgid, 1, &subcount, &granted_qos, mqtt_buffer, MQTT_BUFF_LENGTH); if (granted_qos != 0) { rt_kprintf ("granted qos != 0, %d\n", granted_qos); netdrv_clear(); goto MQTTINIT; } } else { rt_kprintf ("sub topic faild\r\n"); rt_thread_delay (1000); netdrv_clear(); goto MQTTINIT; } netdrv_clear(); rt_kprintf ("subtopic topic %s\r\n", subtopic[0].cstring); rt_event_send (gTrigEvents, EVENT_TRIG_MQTT_SUB); netdrv_clear(); rt_thread_delay (500); State = CONN; while (1) { // 接受到数据 if (RT_EOK == rt_event_recv (gTrigEvents, EVENT_TRIG_NETMODE_RX1, RT_EVENT_FLAG_AND | RT_EVENT_FLAG_CLEAR, 0, RT_NULL)) { if (MQTTPacket_read (mqtt_buffer, MQTT_BUFF_LENGTH, netdrv_getdata) == PUBLISH) { unsigned char dup; int qos; unsigned char retained; unsigned short msgid; int payloadlen_in; unsigned char* payload_in; int rc; MQTTString receivedTopic; rc = MQTTDeserialize_publish (&dup, &qos, &retained, &msgid, &receivedTopic, &payload_in, &payloadlen_in, mqtt_buffer, MQTT_BUFF_LENGTH); rt_memcpy (gMqttDrv.rxbuff, payload_in, payloadlen_in) ; gMqttDrv.rx_size = payloadlen_in; //rt_memcpy(gMqttDrv.topic,receivedTopic.lenstring.data,receivedTopic.lenstring.len); for (int var = 0; var < SUBTOPIC_COUNT + 1; ++var) { if (rt_memcmp (subtopic[var].cstring, receivedTopic.lenstring.data, receivedTopic.lenstring.len) == 0) { gMqttDrv.rx_index = var; rt_kprintf ("recv from :%d,%*.s\r\n", gMqttDrv.rx_index, receivedTopic.lenstring.len, receivedTopic.lenstring.data); break; } } if (gMqttDrv.rx_index == 0) // 0 通道为升级通道 { rt_event_send (gTrigEvents, EVENT_TRIG_UPGRADE_RX); } else { rt_event_send (gTrigEvents, EVENT_TRIG_COMM_RX); } timerout_timer = 0; } netdrv_clear(); } // 发送数据 if (RT_EOK == rt_event_recv (gTrigEvents, EVENT_TRIG_MQTT_TX, RT_EVENT_FLAG_AND | RT_EVENT_FLAG_CLEAR, 0, RT_NULL)) { mqtt_buflen = MQTTSerialize_publish (mqtt_buffer, MQTT_BUFF_LENGTH, 0, 0, 0, 0, pubtopic[gMqttDrv.tx_index], (uint8_t *) gMqttDrv.txbuff, gMqttDrv.tx_size); netdrv_service_tx (mqtt_buffer, mqtt_buflen); } if (RT_EOK == rt_event_recv (gTrigEvents, EVENT_TRIG_MQTT_RST, RT_EVENT_FLAG_AND | RT_EVENT_FLAG_CLEAR, 0, RT_NULL)) { rt_kprintf ("mqtt conn lost\r\n"); netdrv_reset(); State = DISC; rt_event_send (gTrigEvents, EVENT_TRIG_COMM_RST); goto MQTTINIT; } // 网络链接丢失 if (g_netDrv->State != Conn) { rt_kprintf ("mqtt conn lost\r\n"); netdrv_reset(); State = DISC; rt_event_send (gTrigEvents, EVENT_TRIG_COMM_RST); goto MQTTINIT; } rt_thread_delay (100); } } } void MQTT_SendData (void) { // topic_index = index; //// rt_memcpy(mqtt_payload,data,size); //// mqtt_payload_size = size ; rt_event_send (gTrigEvents, EVENT_TRIG_MQTT_TX); } int MQTT_Subscribed (void) { return State; } void MQTTThreadStart (void) { /* 创建线程1,名称是thread1,入口是thread1_entry*/ rt_thread_init (&mqtt_thread, "mqtt_thread", thread_entry, RT_NULL, &thread_stack[0], sizeof (thread_stack), MQTT_THREAD_PRIORITY, MQTT_THREAD_TIMESLICE); rt_thread_startup (&mqtt_thread); return ; } void shell_mqtt_time (void) { rt_kprintf ("lost_timer:%u\r\n", timerout_timer) ; }