WaveshareCloud Wiki Help

Python SDK

本文主要介绍如何在 Python 项目中使用 paho-mqtt 客户端库 ,实现客户端与 MQTT 服务器的连接、订阅、取消订阅、收发消息等功能。

paho-mqtt 是目前 Python 中使用较多的 MQTT 客户端库, 它在 Python 2.7.9+ 或 3.6+ 上为客户端类提供了对 MQTT v5.0,v3.1 和 v3.1.1 的支持。它还提供了一些帮助程序功能,使将消息发布到 MQTT 服务器变得非常简单。

前置准备

获取连接参数

  • 通过登录WaveshareCloud 设备|属性列表创建一个可用的设备之后,点击右侧黄色: 查看地址获得相关信息

示例:

  • MqttPath: mqtt.waveshare.cloud

  • Port: 1883

  • Client ID: 052405b2

  • Pub Topic: Pub/10/16/052405b2

  • Sub Topic: Sub/10/16/052405b2

确认Python版本

本项目使用Python 3.10.9进行测试:

→ ~ python -V Python 3.10.9

安装Paho-mqtt

  1. pip是Python包管理工具,该工具提供了对Python包的查找、下载、安装、卸载的功能,使用以下命令安装 paho-mqtt。

pip install paho-mqtt
  1. 导入Paho-mqtt

import paho.mqtt.client as mqtt

连接方式

  • 设置MQTT 连接地址、端口、主题、客户端ID

HOST = "mqtt.waveshare.cloud" PORT = 1883 Client_ID = "052405b2" # 填入client id Publish_Topic = 'Pub/10/16/052405b2' # 填入上报主题 Subscribe_Topic = 'Sub/10/16/052405b2' # 填入订阅主题
  • 设置MQTTS 连接地址、端口、主题、客户端ID

  • 通过复制CA证书到本地在代码中加载证书

HOST = "mqtt.waveshare.cloud" PORT = 8883 Client_ID = "052405b2" # 填入client id Publish_Topic = 'Pub/10/16/052405b2' # 填入上报主题 Subscribe_Topic = 'Sub/10/16/052405b2' # 填入订阅主题 client = mqtt.Client(Client_ID) # SSL/TSL配置 client.tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=mqtt.ssl.CERT_NONE, tls_version=mqtt.ssl.PROTOCOL_TLS)
  • 设置MQTTS 连接地址、端口、主题、客户端ID

  • 通过复制CA证书到本地在代码中加载证书

HOST = "mqtt.waveshare.cloud" PORT = 8884 Client_ID = "052405b2" # 填入client id Publish_Topic = 'Pub/10/16/052405b2' # 填入上报主题 Subscribe_Topic = 'Sub/10/16/052405b2' # 填入订阅主题 client = mqtt.Client(Client_ID) # SSL/TSL证书配置 ca_cert = "./cert/rootCA.crt" # 替换为你的CA证书路径 client_cert = "./cert/server.crt" # 替换为你的客户端证书路径 client_key = "./cert/server.key" # 替换为你的客户端密钥路径 client.tls_set(ca_certs=ca_cert, certfile=client_cert, keyfile=client_key, cert_reqs=mqtt.ssl.CERT_REQUIRED, tls_version=mqtt.ssl.PROTOCOL_TLS)
  • 编写回调函数on_connect ,当进行MQTT连接之后会被直接调用,在函数中可以依据rc 来判断客户端连接是否成功,当rc为0时连接成功,此时调用paho.mqtt.client.subscribe来订阅消息

def on_connect(client, userdata, flags, rc): print("Connected with result code " + str(rc)) if rc == 0: client.subscribe(Subscribe_Topic)

订阅和发布

订阅主题已在上一点说明中实现

取消订阅

通过以下代码取消订阅,此时应指定取消订阅的主题

def unsubscribe(client: mqtt_client): client.on_message = None client.unsubscribe(topic)

发布主题

使用while循环发布消息体message的内容

while True: message = "Hello, MQTT!" client.publish(Publish_Topic, message) print("Published message: " + message) time.sleep(1)

接收消息

通过以下代码指定客户端对消息事件进行监听,并在收到消息后执行回调函数,将接收到的消息打印到控制台

def on_message(client, userdata, msg): print("Received message: " + msg.payload.decode())

断开连接

def disconnect(client: mqtt_client): client.loop_stop() client.disconnect()

完整代码

通过TCP端口连接

import paho.mqtt.client as mqtt import time HOST = "mqtt.waveshare.cloud" PORT = 1883 Client_ID = "" # 填入client id Publish_Topic = '' # 填入上报主题 Subscribe_Topic = '' # 填入订阅主题 client = mqtt.Client(Client_ID) def on_connect(client, userdata, flags, rc): print("Connected with result code " + str(rc)) if rc == 0: client.subscribe(Subscribe_Topic) def on_message(client, userdata, msg): print("Received message: " + msg.payload.decode()) def define(): client.on_connect = on_connect client.on_message = on_message client.connect(HOST, PORT, 60) client.loop_start() if __name__ == '__main__': define() try: while True: message = "Hello, MQTT!" client.publish(Publish_Topic, message) print("Published message: " + message) time.sleep(1) except KeyboardInterrupt: client.loop_stop() client.disconnect() print("MQTT client disconnected.")

通过SSL/TSL端口连接(免证书)

import paho.mqtt.client as mqtt import time HOST = "mqtt.waveshare.cloud" PORT = 8884 # 使用SSL连接的端口 Client_ID = "" # 填入client id Publish_Topic = '' # 填入上报主题 Subscribe_Topic = '' # 填入订阅主题 client = mqtt.Client(Client_ID) client.tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=mqtt.ssl.CERT_NONE, tls_version=mqtt.ssl.PROTOCOL_TLS) connected = False def on_connect(client, userdata, flags, rc): global connected print("Connected with result code " + str(rc)) if rc == 0: client.subscribe(Subscribe_Topic) connected = True def on_message(client, userdata, msg): print("Received message: " + msg.payload.decode()) def define(): client.on_connect = on_connect client.on_message = on_message client.connect(HOST, PORT, 60) client.loop_start() if __name__ == '__main__': define() while not connected: time.sleep(1) try: while True: message = "Hello, MQTT!" client.publish(Publish_Topic, message) print("Published message: " + message) time.sleep(1) except KeyboardInterrupt: client.loop_stop() client.disconnect() print("MQTT client disconnected.")

通过SSL/TSL端口连接(携带证书)

import paho.mqtt.client as mqtt import time HOST = "mqtt.waveshare.cloud" PORT = 8884 # 使用SSL连接的端口 Client_ID = "" # 填入client id Publish_Topic = '' # 填入上报主题 Subscribe_Topic = '' # 填入订阅主题 client = mqtt.Client(Client_ID) ca_cert = "Path/cert/rootCA.crt" # 替换为你的CA证书路径 client_cert = "Path/cert/server.crt" # 替换为你的客户端证书路径 client_key = "Path/cert/server.key" # 替换为你的客户端密钥路径 client.tls_set(ca_certs=ca_cert, certfile=client_cert, keyfile=client_key, cert_reqs=mqtt.ssl.CERT_REQUIRED, tls_version=mqtt.ssl.PROTOCOL_TLS) connected = False def on_connect(client, userdata, flags, rc): global connected print("Connected with result code " + str(rc)) if rc == 0: client.subscribe(Subscribe_Topic) connected = True def on_message(client, userdata, msg): print("Received message: " + msg.payload.decode()) def define(): client.on_connect = on_connect client.on_message = on_message client.connect(HOST, PORT, 60) client.loop_start() if __name__ == '__main__': define() while not connected: time.sleep(1) try: while True: message = "Hello, MQTT!" client.publish(Publish_Topic, message) print("Published message: " + message) time.sleep(1) except KeyboardInterrupt: client.loop_stop() client.disconnect() print("MQTT client disconnected.")
Last modified: 12 一月 2024