MQTT Macchiatto

Spring Boot 下优雅的 MQTT 消息通信封装工具

让你的开发像一杯玛奇朵一样顺滑

Maven Central License Stars
MqttExample.java
// 接收消息 - 仅需一行代码
MqttPut.of("rsp/")
    .response((topic, message) -> {
        System.out.println("收到消息: " + message);
    }).start();

// 发布消息 - 简洁优雅
MqttPush mqttPush = new MqttPush();
mqttPush.push("your/topic", "Hello MQTT", 
              MQTTQos.AT_LEAST_ONCE);

为什么选择 MQTT Macchiatto?

快速集成

仅需几行配置即可启动 MQTT 通信,告别繁琐的样板代码

高度封装

屏蔽复杂的 API 调用,提供简洁明了的接口设计

灵活扩展

支持多服务连接和手动配置,满足各种复杂场景

自动重连

内置断线重连机制,让连接更加稳定可靠

使用原生 MQTT 😰

  • 多层回调配置
  • 错误处理繁琐
  • 多服务连接困难
  • 代码冗余复杂

使用 MQTT Macchiatto 😊

  • 一行配置连接服务
  • 高度封装的工具类
  • 清晰的代码组织
  • 优雅的响应处理

快速开始

1

添加依赖

<dependency>
    <groupId>io.github.rururunu</groupId>
    <artifactId>MQTT-Macchiatto</artifactId>
    <version>0.1.5</version>
</dependency>
2

配置 application.yml

mto-mqtt:
    host: tcp://your-host:1883
    username: your-username
    password: your-password
    timeout: 10000
    keepalive: 60
    reconnect-frequency-ms: 5000
3

启动类配置

@SpringBootApplication(
    scanBasePackages = {
        "your.project.path",
        "io.github.rururunu"
    }
)

优秀代码示例

消息监听 - 简洁优雅

// 基础监听 - 一行代码搞定
MqttPut.of("rsp/")
    .response((topic, message) -> {
        System.out.println("收到消息: " + message);
        // 在这里处理你的业务逻辑
    }).start();

// 带配置的监听
MqttPut.of()
    .topic("sensor/data")
    .serviceId("sensor-service")
    .cleanSession(true)
    .response((message) -> {
        // 处理传感器数据
        processSensorData(message);
    })
    .start();

// 自定义服务监听
MqttPut.of("iot/devices/")
    .host("tcp://iot.example.com:1883")
    .username("iot-user")
    .password("secure-password")
    .timeout(10000)
    .keepalive(60)
    .cleanSession(false)
    .reconnectFrequencyMs(5000)
    .response((topic, msg) -> {
        System.out.println("设备消息: " + topic + " -> " + msg);
    })
    .start();

消息发布 - 功能强大

// 简单发布
MqttPush mqttPush = new MqttPush();
mqttPush.push("device/status", "online", MQTTQos.AT_LEAST_ONCE);

// 带回调的发布
mqttPush.push("sensor/temperature", "25.6°C", 
    MQTTQos.AT_LEAST_ONCE,
    token -> System.out.println("温度数据发送成功"),
    (token, throwable) -> System.err.println("发送失败: " + throwable)
);

// 使用 Builder 模式自定义连接
MqttPush customPush = new MqttPush.builder()
    .host("tcp://production.mqtt.com:1883")
    .username("prod-user")
    .password("prod-password")
    .timeout(15000)
    .keepalive(120)
    .cleanSession(false)
    .build()
    .init((e) -> {
        System.out.println("MQTT 连接失败: " + e);
    });

// 发布关键业务数据
customPush.push("business/orders", orderJson, 
    MQTTQos.EXACTLY_ONCE,
    (token) -> logSuccess("订单数据已发送"),
    (token, error) -> handleError("订单发送失败", error)
);

// 长连接发布示例
@Component
public class DeviceController {
    private MqttPush mqttPush = new MqttPush();
    
    public void reportDeviceStatus(String deviceId, String status) {
        mqttPush.push("devices/" + deviceId + "/status", 
                     status, MQTTQos.AT_LEAST_ONCE);
    }
}

高级用法 - 企业级特性

// 多服务连接管理
@Service
public class MqttService {
    
    // 生产环境连接
    private MqttPush productionPush = new MqttPush()
        .host("tcp://prod.mqtt.com:1883")
        .username("prod-user")
        .password("prod-pass")
        .cleanSession(false)
        .reconnectFrequencyMs(3000);
    
    // 测试环境连接
    private MqttPush testPush = new MqttPush()
        .host("tcp://test.mqtt.com:1883")
        .username("test-user")
        .password("test-pass");
    
    @PostConstruct
    public void init() {
        productionPush.start();
        testPush.start();
    }
}

// 使用 MQTTMonitor 进行高级监听
MQTTMonitor monitor = new MQTTMonitor();
monitor.setClientId("advanced-client-" + UUID.randomUUID());
monitor.setCleanSession(false);
monitor.setQos(MQTTQos.EXACTLY_ONCE);

monitor.setMqttCallback(new MqttCallback() {
    @Override
    public void connectionLost(Throwable cause) {
        System.err.println("连接丢失,正在重连...");
        monitor.reconnect(); // 自动重连
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) {
        // 处理接收到的消息
        String payload = new String(message.getPayload());
        System.out.println("高级监听收到: " + topic + " -> " + payload);
        
        // 根据主题路由到不同的处理器
        if (topic.startsWith("alerts/")) {
            handleAlert(payload);
        } else if (topic.startsWith("metrics/")) {
            handleMetrics(payload);
        }
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("消息投递完成");
    }
});

// 启动高级监听
monitor.start("system/+/status"); // 使用通配符监听

联系我

邮箱

guolvaita@gmail.com

微信

AfterTheMoonlight

支持项目

如果你觉得这个项目对你有帮助,请不要吝啬点一个 ⭐ Star,这是我持续优化的最大动力!

给个 Star