跳转至

MQTT 连接与消息传递概述

MQTT 集成允许 WebView(JavaScript)层通过原生 Android 服务(BleService)连接到 MQTT 代理、发布消息,以及订阅/取消订阅主题。所有交互均通过 bridgeWebView 处理器完成。

MQTT 通信工作流程

┌──────────┐
│   START  │
└────┬─────┘
     │
     ▼
┌─────────────────────────┐
│ Register Callbacks      │
│ - connectCallback       │
│ - messageArrivedCallback│
└────┬────────────────────┘
     │
     ▼
┌─────────────────────────┐
│ Connect to Broker       │ ──────> connectMqtt({
└────┬────────────────────┘           hostname,
     │                                 port,
     │ (Async Connection)              clientId
     │                               })
     ▼
┌─────────────────────────┐
│  connectCallback        │
│  Called                 │
└────┬────────────────────┘
     │
     ├───────────┬───────────┐
     ▼           ▼           ▼
  Success    Failed      Timeout
     │
     ▼
┌─────────────────────────┐
│ Subscribe to Topics     │ ──────> mqttSubTopic({
└────┬────────────────────┘           topic,
     │                                 qos
     │                               })
     │
     ├─────────────────────────────┐
     │                             │
     │                             ▼
     │                    ┌─────────────────┐
     │                    │ Publish Message │
     │                    │                 │
     │                    │ mqttPublishMsg({│
     │                    │   topic,        │
     │                    │   content,      │
     │                    │   qos           │
     │                    │ })              │
     │                    └─────────────────┘
     │
     ▼
┌─────────────────────────┐
│ Wait for Messages       │
│                         │
│ messageArrivedCallback  │ <──── (Messages arrive)
│ fired when msg received │
└────┬────────────────────┘
     │
     │ (Process messages...)
     │
     ▼
┌─────────────────────────┐
│ Unsubscribe             │ ──────> mqttUnSubTopic()
└────┬────────────────────┘
     │
     ▼
┌──────────┐
│   END    │
└──────────┘

从整体来看,MQTT 流程如下:

  1. 通过 connectMqtt 连接到 MQTT 代理
  2. 通过 mqttSubTopic 订阅主题
  3. 通过 mqttMsgArrivedCallBack JavaScript 回调接收消息
  4. 通过 mqttPublishMsg 发布消息
  5. 不再需要时通过 mqttUnSubTopic 取消订阅

在内部,MQTT 连接由 BleService 管理,它封装了一个 MqttClientUtil 实例。

暴露给 JavaScript 的 MQTT 处理器

所有 MQTT 操作都作为桥接处理器注册在 BaseWebViewActivity 内的 bridgeWebView 上:

  • connectMqtt
  • mqttSubTopic
  • mqttUnSubTopic
  • mqttPublishMsg

此外,来自代理的消息通过以下回调从原生推送到 JavaScript:mqttMsgArrivedCallBack

每个处理器接受 JSON 字符串格式的参数,并向 JavaScript 回调返回 JSON 编码的 Result 对象。

在 JS 中,通常使用:

const result = JSON.parse(response);
if (result.success) {
  // ...
}

1. 连接到 MQTT 代理 – connectMqtt

目的: - 使用 JavaScript 提供的配置建立与 MQTT 代理的连接。

桥接处理器

bridgeWebView.registerHandler("connectMqtt", new BridgeHandler() {
    @Override
    public void handler(String data, CallBackFunction function) {
        // ...
    }
});

输入(来自 JavaScript)

  • data:表示 MqttConfig 对象的 JSON 字符串。

最小必需字段(在 BaseWebViewActivity 中验证): - hostname – 代理主机/IP - port – 代理端口(如 1883 / 8883) - clientId – 唯一的 MQTT 客户端标识符

JS 示例负载:

const config = {
  hostname: "mqtt.example.com",
  port: 1883,
  clientId: "my-client-id-123",
  // Possible additional fields depending on MqttConfig:
  // username, password, ssl, keepAlive, etc.
};

bridgeWebView.callHandler("connectMqtt", JSON.stringify(config), function (response) {
  const result = JSON.parse(response);
  if (result.success) {
    console.log("MQTT connect request accepted, waiting for final callback...");
  } else {
    console.error("Invalid MQTT config:", result);
  }
});

原生逻辑(简化)

1. 验证负载

  • 如果 data 为空 → 返回 Result.fail(PARAMETER_ERROR, false)
  • 将 JSON 解析为 MqttConfig mqttConfig
  • 检查必需字段:hostnameportclientId

如果任何必需字段缺失或无效 → 返回 Result.fail(PARAMETER_ERROR, false)

2. 立即响应 JS

如果验证通过,原生立即向调用的 JS 回调返回 Result.ok(true)。 实际连接在后台线程中异步完成。

3. 异步连接

  • ThreadPool.getExecutor().execute(() -> { boolean b = bleService.connectMqtt(mqttConfig); ... })
  • bleService.connectMqtt(mqttConfig) 完成后,结果被传回 UI 线程。

4. 向 JS 的最终结果回调

  • 成功时:
    bridgeWebView.callHandler("connectMqttCallBack",
        gson.toJson(Result.ok(true)), ...);
    
  • 失败时(如代理不可达、认证失败):
    bridgeWebView.callHandler("connectMqttCallBack",
        gson.toJson(Result.fail(MQTT_CONNECT_FAIL, false)), ...);
    

JavaScript 处理方式

您必须在 JS 侧定义 connectMqttCallBack 处理器,以了解连接实际成功或失败的情况:

bridgeWebView.registerHandler("connectMqttCallBack", function (data) {
  const result = JSON.parse(data);
  if (result.success) {
    console.log("MQTT connected successfully");
    // Now it's safe to subscribe and publish
  } else {
    console.error("MQTT connection failed:", result);
  }
});

2. 订阅主题 – mqttSubTopic

目的

以指定的 QoS 级别为当前 MQTT 客户端订阅给定主题。

桥接处理器

bridgeWebView.registerHandler("mqttSubTopic", new BridgeHandler() {
    @Override
    public void handler(String data, CallBackFunction function) {
        // ...
    }
});

输入(来自 JavaScript)

  • data 是包含以下内容的 JSON 字符串:
    • topic – MQTT 主题字符串(如 "devices/abc123/status"
    • qos – QoS 级别(012

示例调用:

const payload = {
  topic: "devices/abc123/status",
  qos: 1,
};

bridgeWebView.callHandler("mqttSubTopic", JSON.stringify(payload), function (response) {
  const result = JSON.parse(response);
  if (result.success) {
    console.log("Subscribed to topic successfully");
  } else {
    console.error("Failed to subscribe:", result);
  }
});
示例调用:
const payload = {
  topic: "devices/abc123/status",
  qos: 1,
};

bridgeWebView.callHandler("mqttSubTopic", JSON.stringify(payload), function (response) {
  const result = JSON.parse(response);
  if (result.success) {
    console.log("Subscribed to topic successfully");
  } else {
    console.error("Failed to subscribe:", result);
  }
});

原生逻辑(简化)

  1. data 解析为 JSONObject
  2. 提取 topicqos
  3. 获取 MQTT 客户端:

    MqttClientUtil mqttClientUtil = bleService.getMqttClientUtil();
    

  4. 检查连接:

    • 如果 mqttClientUtil == null 或 !mqttClientUtil.isConnected()Result.fail(MQTT_CURRENT_NOT_CONNECTED, false)

调用:

boolean subscribe = mqttClientUtil.subscribe(topic, qos);
6. 返回: - 如果 subscribe == trueResult.ok(true) - 否则 →Result.fail(FAIL, false)

发生任何 exceptionResult.fail(PARAMETER_ERROR, false)

3. 取消订阅主题 – mqttUnSubTopic

目的

取消订阅之前已订阅的主题。

桥接处理器

bridgeWebView.registerHandler("mqttUnSubTopic", new BridgeHandler() {
    @Override
    public void handler(String data, CallBackFunction function) {
        // ...
    }
});

输入(来自 JavaScript)

data 是包含以下内容的 JSON 字符串: - topic – 要取消订阅的 MQTT 主题

示例调用:

const payload = {
  topic: "devices/abc123/status",
};

bridgeWebView.callHandler("mqttUnSubTopic", JSON.stringify(payload), function (response) {
  const result = JSON.parse(response);
  if (result.success) {
    console.log("Unsubscribed from topic");
  } else {
    console.error("Failed to unsubscribe:", result);
  }
});

原生逻辑(简化)

  1. 从 JSON 解析 topic
  2. 通过 bleService.getMqttClientUtil() 获取 MqttClientUtil
  3. 如果 MQTT 未连接 → Result.fail(MQTT_CURRENT_NOT_CONNECTED, false)
  4. 调用:
    boolean ok = mqttClientUtil.unSubscribe(topic);
    
  5. 如果 okResult.ok(true) 否则 → Result.fail(FAIL, false)
  6. 发生 exceptionResult.fail(PARAMETER_ERROR, false)

4. 发布消息 – mqttPublishMsg

目的

以指定 QoS 级别将消息发布到特定 MQTT 主题。

桥接处理器

bridgeWebView.registerHandler("mqttPublishMsg", new BridgeHandler() {
    @Override
    public void handler(String data, CallBackFunction function) {
        // ...
    }
});

输入(来自 JavaScript)

  • data 是 JSON 字符串:
  • topic – 要发布的 MQTT 主题
  • content – 消息的负载/正文(字符串)
  • qos – QoS 级别(012

示例调用:

const payload = {
  topic: "devices/abc123/commands",
  content: JSON.stringify({ action: "reboot" }),
  qos: 1,
};

bridgeWebView.callHandler("mqttPublishMsg", JSON.stringify(payload), function (response) {
  const result = JSON.parse(response);
  if (result.success) {
    console.log("Message published");
  } else {
    console.error("Failed to publish:", result);
  }
});

原生逻辑(简化)

  1. 从 JSON 解析 topiccontentqos
  2. 通过 bleService.getMqttClientUtil() 获取 MqttClientUtil
  3. 检查连接;如果已断开 → Result.fail(MQTT_CURRENT_NOT_CONNECTED, false)
  4. 发布:
    boolean ok = mqttClientUtil.publish(topic, qos, content.getBytes(StandardCharsets.UTF_8));
    

5. 接收 MQTT 消息 – mqttMsgArrivedCallBack

目的

将来自原生(通过 EventBus)的传入 MQTT 消息传递给 JavaScript。

原生流程

BleService(通过 MqttClientUtil)接收 MQTT 消息,并将其发布到标记为 EventBusEnum.MQTT_MSG_ARRIVED 的 EventBus 通道。BaseWebViewActivity 监听这些事件:

@Subscribe(threadMode = ThreadMode.MAIN)
public void onMessageEvent(EventBusMsg message) {
    // ...
    } else if (message.getTagEnum() == EventBusEnum.MQTT_MSG_ARRIVED) {
        bridgeWebView.callHandler("mqttMsgArrivedCallBack",
            (String) message.getT(),
            new CallBackFunction() {
                @Override
                public void onCallBack(String data) { }
            });
    }
    // ...
}
- message.getT() 预期是 MQTT 负载的 String 表示形式(可能还有元数据,取决于 MqttClientUtil 的格式化方式)。

JavaScript 处理方式

您必须为 mqttMsgArrivedCallBack 注册处理器以接收和解析传入消息:

bridgeWebView.registerHandler("mqttMsgArrivedCallBack", function (payload) {
  // `payload` is a string coming from native.
  // It might be plain text or JSON-encoded, depending on your MQTT publisher.

  try {
    const parsed = JSON.parse(payload);
    console.log("MQTT message (parsed):", parsed);
    // Handle structured message here
  } catch (e) {
    console.log("MQTT message (raw):", payload);
    // Handle plain string payload
  }
});
典型流程: 1. 使用 connectMqtt 连接。 2. 使用 mqttSubTopic 订阅。 3. 已订阅主题的传入消息通过 mqttMsgArrivedCallBack 传递。

6. 错误码与结果处理

MQTT 相关处理器使用 Result.ok(...)Result.fail(code, ...) 来包装响应。相关错误码包括:

  • MQTT_CONNECT_FAIL – 连接代理的尝试失败(配置无效、网络错误、认证错误等)
  • MQTT_CURRENT_NOT_CONNECTED – 在客户端为 null 或已断开连接时调用了需要活跃 MQTT 连接的操作(订阅、取消订阅、发布)。
  • PARAMETER_ERROR – 请求负载缺少必需字段或无法解析。
  • FAIL – 通用失败(如订阅/发布返回 false)。

在 JS 侧,您应始终检查:

const result = JSON.parse(response);
if (!result.success) {
  console.error("MQTT error:", result.code, result.message);
}

7. 典型 MQTT 使用流程(来自 JavaScript)

  1. 配置并连接
    • 构建一个 MqttConfig 对象。
    • 使用 JSON 配置调用 connectMqtt
    • 等待 connectMqttCallBack 返回 success = true
  2. 订阅主题
    • 对于每个主题,使用 { topic, qos } 调用 mqttSubTopic
  3. 注册消息处理器
    • 注册 mqttMsgArrivedCallBack 以处理传入消息。
  4. 发布消息
    • 使用 mqttPublishMsg 向代理发送命令或数据。
  5. 取消订阅与清理
    • 离开界面或不再需要某个主题时,调用 mqttUnSubTopic
    • MQTT 断开连接目前在原生层内部处理(BaseWebViewActivity 中未暴露显式的 disconnectMqtt 桥接处理器)。

8. 状态更新

所有处理器注册完成后,dispatch({ type: "SET_BRIDGE_INITIALIZED", payload: true }) 更新应用程序状态,表示桥接现已初始化。此标志防止在后续调用中重新初始化,确保应用程序维持 JavaScript 与 WebView 之间的单一通信点。


9. 函数摘要

setupBridge 函数提供了一种有组织且安全的方法,用于处理应用程序内各种基于蓝牙、QR 码和 MQTT 的交互。每个处理器验证并处理数据,确保一致的状态管理,并减少异步数据处理的潜在问题。这种结构使 WebView 与应用程序状态之间的无缝通信成为可能,允许通过单次初始化调用进行复杂的多源数据交换。

总结

此处使用的 BLE 技术似乎是:

  • Central-Peripheral with GATT Profile:应用程序(中心设备)扫描、连接外围设备并与之交互,检索 GATT 服务和特征。
  • 通过 MQTT 进行 IoT 集成:BLE 数据或状态更新可能使用 MQTT 发送到服务器,表明与更广泛的 IoT 生态系统集成。
  • 混合应用结构WebViewJavascriptBridge 的存在表明这是一个 WebView 或混合应用设置,能够在跨平台环境中实现 BLE 功能。

此设置将允许移动应用程序在 IoT 上下文中扫描、连接、初始化并与 BLE 设备交互,可能使用 MQTT 将数据发送到云服务或后端。

这种分离将 MQTT 连接和底层处理保留在原生代码中,而 WebView/JavaScript 层只需使用一小组桥接方法和回调即可将 MQTT 集成到 UI 和业务逻辑中。