Python SDK
Last modified: 12 一月 2024本文主要介绍如何在 Python 项目中使用 paho-mqtt 客户端库 ,实现客户端与 MQTT 服务器的连接、订阅、取消订阅、收发消息等功能。
paho-mqtt 是目前 Python 中使用较多的 MQTT 客户端库, 它在 Python 2.7.9+ 或 3.6+ 上为客户端类提供了对 MQTT v5.0,v3.1 和 v3.1.1 的支持。它还提供了一些帮助程序功能,使将消息发布到 MQTT 服务器变得非常简单。
前置准备
获取连接参数
通过登录WaveshareCloud 设备|属性列表创建一个可用的设备之后,点击右侧黄色: 查看地址获得相关信息
warning
下述示例已不再使用,请登录平台创建自己的设备查看相关参数
示例:
MqttPath: mqtt.waveshare.cloud
Port: 1883
Client ID: 052405b2
Pub Topic: Pub/10/16/052405b2
Sub Topic: Sub/10/16/052405b2
确认Python版本
本项目使用Python 3.10.9进行测试:
→ ~ python -V
Python 3.10.9
安装Paho-mqtt
pip是Python包管理工具,该工具提供了对Python包的查找、下载、安装、卸载的功能,使用以下命令安装 paho-mqtt。
pip install paho-mqtt
导入Paho-mqtt
import paho.mqtt.client as mqtt
连接方式
tip
具体参数已经在上述步骤获取到了,TCP Port已经在文章: 客户端接入 说明
设置MQTT 连接地址、端口、主题、客户端ID
HOST = "mqtt.waveshare.cloud"
PORT = 1883
Client_ID = "052405b2" # 填入client id
Publish_Topic = 'Pub/10/16/052405b2' # 填入上报主题
Subscribe_Topic = 'Sub/10/16/052405b2' # 填入订阅主题
warning
证书内容、SSL/TSL Port已经在文章: 客户端接入 说明
设置MQTTS 连接地址、端口、主题、客户端ID
通过复制CA证书到本地在代码中加载证书
HOST = "mqtt.waveshare.cloud"
PORT = 8883
Client_ID = "052405b2" # 填入client id
Publish_Topic = 'Pub/10/16/052405b2' # 填入上报主题
Subscribe_Topic = 'Sub/10/16/052405b2' # 填入订阅主题
client = mqtt.Client(Client_ID)
# SSL/TSL配置
client.tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=mqtt.ssl.CERT_NONE, tls_version=mqtt.ssl.PROTOCOL_TLS)
warning
证书内容、SSL/TSL Port已经在文章: 客户端接入 说明
设置MQTTS 连接地址、端口、主题、客户端ID
通过复制CA证书到本地在代码中加载证书
HOST = "mqtt.waveshare.cloud"
PORT = 8884
Client_ID = "052405b2" # 填入client id
Publish_Topic = 'Pub/10/16/052405b2' # 填入上报主题
Subscribe_Topic = 'Sub/10/16/052405b2' # 填入订阅主题
client = mqtt.Client(Client_ID)
# SSL/TSL证书配置
ca_cert = "./cert/rootCA.crt" # 替换为你的CA证书路径
client_cert = "./cert/server.crt" # 替换为你的客户端证书路径
client_key = "./cert/server.key" # 替换为你的客户端密钥路径
client.tls_set(ca_certs=ca_cert, certfile=client_cert, keyfile=client_key, cert_reqs=mqtt.ssl.CERT_REQUIRED, tls_version=mqtt.ssl.PROTOCOL_TLS)
编写回调函数on_connect ,当进行MQTT连接之后会被直接调用,在函数中可以依据rc 来判断客户端连接是否成功,当rc为0时连接成功,此时调用paho.mqtt.client.subscribe来订阅消息
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
if rc == 0:
client.subscribe(Subscribe_Topic)
订阅和发布
订阅主题已在上一点说明中实现
取消订阅
通过以下代码取消订阅,此时应指定取消订阅的主题
def unsubscribe(client: mqtt_client):
client.on_message = None
client.unsubscribe(topic)
发布主题
使用while循环发布消息体message的内容
while True:
message = "Hello, MQTT!"
client.publish(Publish_Topic, message)
print("Published message: " + message)
time.sleep(1)
接收消息
通过以下代码指定客户端对消息事件进行监听,并在收到消息后执行回调函数,将接收到的消息打印到控制台
def on_message(client, userdata, msg):
print("Received message: " + msg.payload.decode())
断开连接
def disconnect(client: mqtt_client):
client.loop_stop()
client.disconnect()
完整代码
import paho.mqtt.client as mqtt
import time
HOST = "mqtt.waveshare.cloud"
PORT = 1883
Client_ID = "" # 填入client id
Publish_Topic = '' # 填入上报主题
Subscribe_Topic = '' # 填入订阅主题
client = mqtt.Client(Client_ID)
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
if rc == 0:
client.subscribe(Subscribe_Topic)
def on_message(client, userdata, msg):
print("Received message: " + msg.payload.decode())
def define():
client.on_connect = on_connect
client.on_message = on_message
client.connect(HOST, PORT, 60)
client.loop_start()
if __name__ == '__main__':
define()
try:
while True:
message = "Hello, MQTT!"
client.publish(Publish_Topic, message)
print("Published message: " + message)
time.sleep(1)
except KeyboardInterrupt:
client.loop_stop()
client.disconnect()
print("MQTT client disconnected.")
import paho.mqtt.client as mqtt
import time
HOST = "mqtt.waveshare.cloud"
PORT = 8884 # 使用SSL连接的端口
Client_ID = "" # 填入client id
Publish_Topic = '' # 填入上报主题
Subscribe_Topic = '' # 填入订阅主题
client = mqtt.Client(Client_ID)
client.tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=mqtt.ssl.CERT_NONE, tls_version=mqtt.ssl.PROTOCOL_TLS)
connected = False
def on_connect(client, userdata, flags, rc):
global connected
print("Connected with result code " + str(rc))
if rc == 0:
client.subscribe(Subscribe_Topic)
connected = True
def on_message(client, userdata, msg):
print("Received message: " + msg.payload.decode())
def define():
client.on_connect = on_connect
client.on_message = on_message
client.connect(HOST, PORT, 60)
client.loop_start()
if __name__ == '__main__':
define()
while not connected:
time.sleep(1)
try:
while True:
message = "Hello, MQTT!"
client.publish(Publish_Topic, message)
print("Published message: " + message)
time.sleep(1)
except KeyboardInterrupt:
client.loop_stop()
client.disconnect()
print("MQTT client disconnected.")
import paho.mqtt.client as mqtt
import time
HOST = "mqtt.waveshare.cloud"
PORT = 8884 # 使用SSL连接的端口
Client_ID = "" # 填入client id
Publish_Topic = '' # 填入上报主题
Subscribe_Topic = '' # 填入订阅主题
client = mqtt.Client(Client_ID)
ca_cert = "Path/cert/rootCA.crt" # 替换为你的CA证书路径
client_cert = "Path/cert/server.crt" # 替换为你的客户端证书路径
client_key = "Path/cert/server.key" # 替换为你的客户端密钥路径
client.tls_set(ca_certs=ca_cert, certfile=client_cert, keyfile=client_key, cert_reqs=mqtt.ssl.CERT_REQUIRED, tls_version=mqtt.ssl.PROTOCOL_TLS)
connected = False
def on_connect(client, userdata, flags, rc):
global connected
print("Connected with result code " + str(rc))
if rc == 0:
client.subscribe(Subscribe_Topic)
connected = True
def on_message(client, userdata, msg):
print("Received message: " + msg.payload.decode())
def define():
client.on_connect = on_connect
client.on_message = on_message
client.connect(HOST, PORT, 60)
client.loop_start()
if __name__ == '__main__':
define()
while not connected:
time.sleep(1)
try:
while True:
message = "Hello, MQTT!"
client.publish(Publish_Topic, message)
print("Published message: " + message)
time.sleep(1)
except KeyboardInterrupt:
client.loop_stop()
client.disconnect()
print("MQTT client disconnected.")