Spring Boot 下优雅的 MQTT 消息通信封装工具
让你的开发像一杯玛奇朵一样顺滑
// 接收消息 - 仅需一行代码
MqttPut.of("rsp/")
.response((topic, message) -> {
System.out.println("收到消息: " + message);
}).start();
// 发布消息 - 简洁优雅
MqttPush mqttPush = new MqttPush();
mqttPush.push("your/topic", "Hello MQTT",
MQTTQos.AT_LEAST_ONCE);
仅需几行配置即可启动 MQTT 通信,告别繁琐的样板代码
屏蔽复杂的 API 调用,提供简洁明了的接口设计
支持多服务连接和手动配置,满足各种复杂场景
内置断线重连机制,让连接更加稳定可靠
<dependency>
<groupId>io.github.rururunu</groupId>
<artifactId>MQTT-Macchiatto</artifactId>
<version>0.1.5</version>
</dependency>
mto-mqtt:
host: tcp://your-host:1883
username: your-username
password: your-password
timeout: 10000
keepalive: 60
reconnect-frequency-ms: 5000
@SpringBootApplication(
scanBasePackages = {
"your.project.path",
"io.github.rururunu"
}
)
// 基础监听 - 一行代码搞定
MqttPut.of("rsp/")
.response((topic, message) -> {
System.out.println("收到消息: " + message);
// 在这里处理你的业务逻辑
}).start();
// 带配置的监听
MqttPut.of()
.topic("sensor/data")
.serviceId("sensor-service")
.cleanSession(true)
.response((message) -> {
// 处理传感器数据
processSensorData(message);
})
.start();
// 自定义服务监听
MqttPut.of("iot/devices/")
.host("tcp://iot.example.com:1883")
.username("iot-user")
.password("secure-password")
.timeout(10000)
.keepalive(60)
.cleanSession(false)
.reconnectFrequencyMs(5000)
.response((topic, msg) -> {
System.out.println("设备消息: " + topic + " -> " + msg);
})
.start();
// 简单发布
MqttPush mqttPush = new MqttPush();
mqttPush.push("device/status", "online", MQTTQos.AT_LEAST_ONCE);
// 带回调的发布
mqttPush.push("sensor/temperature", "25.6°C",
MQTTQos.AT_LEAST_ONCE,
token -> System.out.println("温度数据发送成功"),
(token, throwable) -> System.err.println("发送失败: " + throwable)
);
// 使用 Builder 模式自定义连接
MqttPush customPush = new MqttPush.builder()
.host("tcp://production.mqtt.com:1883")
.username("prod-user")
.password("prod-password")
.timeout(15000)
.keepalive(120)
.cleanSession(false)
.build()
.init((e) -> {
System.out.println("MQTT 连接失败: " + e);
});
// 发布关键业务数据
customPush.push("business/orders", orderJson,
MQTTQos.EXACTLY_ONCE,
(token) -> logSuccess("订单数据已发送"),
(token, error) -> handleError("订单发送失败", error)
);
// 长连接发布示例
@Component
public class DeviceController {
private MqttPush mqttPush = new MqttPush();
public void reportDeviceStatus(String deviceId, String status) {
mqttPush.push("devices/" + deviceId + "/status",
status, MQTTQos.AT_LEAST_ONCE);
}
}
// 多服务连接管理
@Service
public class MqttService {
// 生产环境连接
private MqttPush productionPush = new MqttPush()
.host("tcp://prod.mqtt.com:1883")
.username("prod-user")
.password("prod-pass")
.cleanSession(false)
.reconnectFrequencyMs(3000);
// 测试环境连接
private MqttPush testPush = new MqttPush()
.host("tcp://test.mqtt.com:1883")
.username("test-user")
.password("test-pass");
@PostConstruct
public void init() {
productionPush.start();
testPush.start();
}
}
// 使用 MQTTMonitor 进行高级监听
MQTTMonitor monitor = new MQTTMonitor();
monitor.setClientId("advanced-client-" + UUID.randomUUID());
monitor.setCleanSession(false);
monitor.setQos(MQTTQos.EXACTLY_ONCE);
monitor.setMqttCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.err.println("连接丢失,正在重连...");
monitor.reconnect(); // 自动重连
}
@Override
public void messageArrived(String topic, MqttMessage message) {
// 处理接收到的消息
String payload = new String(message.getPayload());
System.out.println("高级监听收到: " + topic + " -> " + payload);
// 根据主题路由到不同的处理器
if (topic.startsWith("alerts/")) {
handleAlert(payload);
} else if (topic.startsWith("metrics/")) {
handleMetrics(payload);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("消息投递完成");
}
});
// 启动高级监听
monitor.start("system/+/status"); // 使用通配符监听