1. 概述
本篇的重点是解析paho mqtt官方例程,学习它的一些结构体、API和回调的处理,然后再学习下Linux中的线程和线程间数据通信,掌握相关API的使用,最后综合起来,使用多线程publish或处理subscribe的主题的消息,将订阅的温湿度消息解析出来在LVGL的表格中用折线图的方式显示出来。
温湿度监控系统应用开发所有文章
- 【嵌入式Linux应用开发】移植LVGL到Linux开发板
- 【嵌入式Linux应用开发】初步移植MQTT到Ubuntu和Linux开发板
- 【嵌入式Linux应用开发】SquareLine Studio与LVGL模拟器
- 【嵌入式Linux应用开发】温湿度监控系统——绘制温湿度折线图
- 【嵌入式Linux应用开发】温湿度监控系统——学习paho mqtt的基本操作
- 【嵌入式Linux应用开发】温湿度监控系统——多线程与温湿度的获取显示
- 【嵌入式Linux应用开发】设计温湿度采集MCU子系统
适用开发板
适用于百问网的STM32MP157开发板和IMX6ULL开发板及其对应的屏幕,需要注意的是编译链要对应更改。
2. paho mqtt基本操作
我们使用mqtt有如下几个操作:
- 创建/销毁客户端
- 连接/断开连接服务器
- 订阅/取消订阅主题
- 处理订阅消息
- 发布主题消息
这些操作在paho mqtt的源码中,基于同步处理方式和异步处理方式又有不同的API,所以我们在使用paho mqtt的时候要在一开始就要定位好要选择哪种方式。
2.1 创建客户端
2.1.1 创建同步客户端
- 客户端句柄:
MQTTClient
1 | // MQTTClient.h |
- 创建客户端
1 | int MQTTClient_create(MQTTClient* handle, |
可以看到这个函数调用了另一个函数MQTTClient_createWithOptions
,它的原型定义:
1 | int MQTTClient_createWithOptions(MQTTClient* handle, |
关于这个函数参数的释义:
参数名称 | 作用描述 |
---|---|
handle | MQTT客户端句柄 |
serverURI | MQTT服务器地址 |
clientId | 登录MQTT服务器时使用的客户端ID |
persistence_type | 客户端的持续类型: MQTTCLIENT_PERSISTENCE_NONE: 当客户端运行失败或者下线了,正在发布的消息会立刻丢失,即便是QOS1或QOS2类型的消息质量; MQTTCLIENT_PERSISTENCE_DEFAULT:客户端下线后正在发布的消息会被保存到指定的目录persistence_context中,此时persistence_context可以被指定为NULL; MQTTCLIENT_PERSISTENCE_USER:客户端下线后正在发布的消息会被保存到指定的目录persistence_context中,此时persistence_context必须被指定不能为NULL |
persistence_context | 保存消息的目录 |
options | 创建客户端的额外操作 |
返回值 | 如果成功返回MQTTCLIENT_SUCCESS 否则返回错误码 |
1 |
|
2.1.2 创建异步客户端
- 客户端句柄:
MQTTAsync
1 | // MQTTAsync.h |
- 创建客户端
1 | int MQTTAsync_create(MQTTAsync* handle, |
1 | int MQTTAsync_createWithOptions(MQTTAsync* handle, |
其实和同步的创建是差不多的,参数意义也几乎是一样的,就不在重复解释了,下面是应用:
1 |
|
2.2 销毁客户端
- 销毁同步客户端
1 | void MQTTClient_destroy(MQTTClient* handle) |
传入的参数就是同步客户端的句柄:
1 | MQTTClient client; |
- 销毁异步客户端
1 | void MQTTAsync_destroy(MQTTAsync* handle) |
传入的参数就是异步客户端的句柄:
1 | MQTTAsync client; |
2.3 回调函数设置
我们可以使用这个设置回调函数的API将MQTT的一些消息处理放到多线程里面,在paho mqtt中有个设置回调函数的API,在这个API中指定下面几个操作的处理回调函数:
- 和服务器的连接异常丢失了:这种情况通常会在发布过程、发送心跳包没有得到响应的情况下被发现和服务器的连接断开了;
- 处理订阅的消息;
- 成功发布消息后的处理;
函数原型:
- 同步客户端
1 | int MQTTClient_setCallbacks(MQTTClient handle, |
参数名称 | 描述 |
---|---|
handle | mqtt客户端 |
context | 用户自定义的背景信息处理回调函数:客户端ID、用户名和密码这些信息 |
cl | 连接丢失处理回调函数 |
ma | 处理订阅消息的回调函数 |
dc | 成功发布消息后的回到函数 |
返回值 | MQTTCLIENT_SUCCESS或者错误码 |
- 异步客户端
1 | int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, |
参数作用和同步客户端的是一样的。
2.4 和服务器建立连接
2.4.1 同步客户端建立连接
和服务器建立连接需要定义一个连接控制包,其结构体的定义如下(删掉了注释):
1 | typedef struct |
下表是对参数的解释:
参数名称 | 解释 |
---|---|
struct_id | 必须是’M’’Q’’T’’C’ |
struct_version | 0-没有SSL和多服务器; 1-没有多服务器; 2-没有MQTT版本; 3-建立连接后没有返回值; 4-不支持二进制密码操作; 5-不支持maxInflightMessages和清除开始标志; 6-不支持HTTP headers; 7-不支持HTTP代理 |
keepAliveInterval | 保活周期,客户端向服务器发送心跳包的周期,单位秒 |
cleansession | 会话清楚标志,用来告诉服务器是否清除当前客户端上一次建立连接后产生的会话消息,1清除,0保持 |
reliable | 是否支持发布多重消息;0支持同时最多10消息处于发布状态;1不支持多消息发布,必须等待上一次发布完成后才能发布下一条消息 |
will | 遗嘱操作,设置遗嘱topic和消息 |
username | 登录服务器用的用户名 |
password | 登录服务器用的密码 |
connectTimeout | 连接超时时间 |
retryInterval | 发布消息但没有收到服务器响应的话,重发retryInterval这么多秒;0关闭重发; |
ssl | ssl操作,设置SSL属性 |
serverURIcount | 服务器数量,默认是0 |
serverURIs | 服务器地址 |
MQTTVersion | MQTT版本: 0-默认版本3.1.1,连接失败的话回退使用3.1版本; 3-只会尝试用3.1版本和服务器建立连接; 4-只会尝试用3.1.1版本和服务器建立连接; 5-5.0版本 |
returned | 建立连接成功后服务器的响应信息 |
binarypwd | 登录服务器用的二进制密码,里面设置密码长度和密码内容 |
maxInflightMessages | 如果支持多消息一起发布的话,这个值设置多消息的最大数量 |
cleanstart | 5.0版本下的MQTT支持清除开始标志 |
httpHeaders | http头部信息 |
httpProxy | http代理 |
httpsProxy | https的代理 |
paho mqtt使用宏定义了几个初始化的连接控制包:
1 | 1. 通用3.x版本的无遗嘱无SSL的连接控制包 |
我们在初始化连接的时候,需要先定义连接控制包结构体变量,然后将例如用户名、密码等信息复制后在调用连接API登录服务器。和服务器建立连接的API原型:
1 | LIBMQTT_API int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options); |
第一个参数就是客户端句柄,第二个参数是连接控制包的结构体变量指针,所以我们的用法是:
1 | MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; |
2.4.2 异步客户端建立连接
异步客户端的连接方法和同步客户端几乎是一样的,只是连接控制包结构体的名称和连接函数名不一样,并且异步客户端可以将连接成功/失败的处理放入回调函数中,用异步的方式处理:
1 | typedef struct |
用法也类似:
1 | MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; |
2.5 和服务器断开连接
在断开连接上,同步客户端和异步客户端区别有点大,传入的参数有差异。
2.5.1 同步客户端断开连接
同步客户端断开服务器连接的API原型:
1 | LIBMQTT_API int MQTTClient_disconnect(MQTTClient handle, int timeout); |
只需要两个参数:MQTT客户端和断开连接超时时间:
1 | MQTTClient client; |
2.5.2 异步客户端断开连接
异步客户端断开和服务器的连接需要操作一个控制结构体:
1 | typedef struct |
结构体成员较少且英文也不难看懂,就不做翻译工作了,异步客户端断开服务器连接的API原型:
1 | LIBMQTT_API int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions* options); |
来看一下具体的用法:
1 | int disc_finished = 0; |
2.6 获取连接状态
如果客户端喝服务器处于连接状态啧返回true,否则返回false。
- 同步客户端:
LIBMQTT_API int MQTTClient_isConnected(MQTTClient handle);
- 异步客户端:
LIBMQTT_API int MQTTAsync_isConnected(MQTTAsync handle);
2.7 订阅主题
在订阅主题上,同步客户端喝异步客户端的差别就是异步客户端可以将订阅动作的结果放到异步回调函数中处理,而同步客户端没有这个功能。
2.7.1 同步客户端订阅
1 | LIBMQTT_API int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos); |
只需要传入客户端句柄、订阅的主题以及消息质量即可:
1 |
|
2.7.2 异步客户端订阅
1 | LIBMQTT_API int MQTTAsync_subscribe(MQTTAsync handle, const char* topic, int qos, |
可以看到多了一个参数叫做响应操作,指向的就是对订阅操作动作的响应回调函数:
1 | typedef struct MQTTAsync_responseOptions |
用法:
1 |
|
消息到来后的处理是在设置回调函数的时候就设置了处理订阅消息的回调函数的:
1 | int rc; |
我们就可以在这个回调函数中处理服务器下发过来的消息了。
2.8 发布消息
在发布消息上,同步客户端既支持同步机制也支持异步机制,同步机制就是必须要等到上一次的消息发布完成才能发布下一次的消息,异步则是支持一次处理多条消息;而异步客户端则只有异步机制。某种程度上来说,异步发布是兼容同步发布的。
发布消息有一个消息结构体,将消息的长度、消息内容、消息质量等抽象集成到了一起,这个结构体在同步客户端喝异步客户端也是不同的。
2.8.1 同步客户端的同步发布
消息结构体:
1 | typedef struct |
发布消息的API:
1 | LIBMQTT_API int MQTTClient_publishMessage(MQTTClient handle, const char* topicName, MQTTClient_message* msg, |
最后个参数是发布消息的分发序号,在异步机制中起作用,同步机制中复制为NULL即可。来看一下具体的用法:
1 |
|
2.8.2 同步客户端的异步发布
消息包喝分发序号以及发布API就不多说了,区别在于用异步发布的时候,在设置回调函数的时候需要将发布动作的回调函数指定:
1 | MQTTClient_deliveryToken deliveredtoken; |
发布消息还是一样需要对消息包结构体进行赋值然后调用API发布出去:
1 | MQTTClient_message pubmsg = MQTTClient_message_initializer; |
但是异步机制下如果发布没有成功的话,是不会进入回调函数中的,而异步客户端则有对发布失败的回调处理。
2.8.3 异步客户端发布
在异步客户端中,它既支持同步客户端的异步机制中的分发回调函数处理,也支持将发布消息成功和失败的回调处理器,它将这一操作放到了响应操作结构体中:
1 | MQTTAsync_responseOptions |
通过对相应操作机构体的赋值以及发送消息的API调用,来进行回调处理设置:
1 | MQTTAsync client = (MQTTAsync)context; |
3. 示例代码
1 |
|
- 本文作者: 摘星星的小朋友
- 本文链接: http://slhking.github.io/2022/07/04/LinuxApp-5-MultiThreadAndShowTemp/
- 版权声明: 本博客所有文章除特别声明外,均采用 MIT 许可协议。转载请注明出处!