mqtt mosquitto

mqtt is a lightweight messaging protocol for small sensors and mobile devices, optimized for high-latency or unreliable networks. it is widely used in the field of IoT(Internet of things 物联网).

mqtt is a protocal, it has many implementations, such as mosquitto, emqtt, etc.
mosquitto is a open source mqtt broker, it is written in C, and it is very small, it is easy to install and use.

it has broker and client. client can pub/sub message to broker.

let’s see how to install and use it.

broker

we can use docker to implent broker, it is very easy.

  1. create directory and config file in the home directory the file tree is like this:
    ├── mosquitto-mqtt
         ├── config
         │   └── mosquitto.conf
         ├── data
         └── log
    

    mosquitto.conf is the config file, it is like this:

    pid_file  /mosquitto/data/mosquitto.pid
    persistence true
    persistence_location /mosquitto/data/
    log_dest file /mosquitto/log/mosquitto.log
    log_type error
    log_type notice
    log_type information
    log_type debug
    log_type websockets
    connection_messages true
    log_dest topic
    log_dest stdout
    allow_anonymous true
    listener 1883
    
  2. docker run

    docker run -it --rm --name surge-mqtt -p 1883:1883  -v ~/mosquitto-mqtt:/mosquitto  eclipse-mosquitto
    

client

shell cli

https://hivemq.github.io/mqtt-cli/

# test
mqtt test -h 192.168.199.102 -p 8808
# sub
mqtt sub -h 192.168.199.102 -p 8808 -t test
# pub
mqtt pub -h 192.168.199.102 -p 8808 -t test -m test

client tools

mqttx is a elegent mqtt client.
you can use web or install in your computer.

python

install paho-mqtt client:

pip install paho-mqtt

we can pub and sub message to the above broker. this is pub code:

# python 3.6

import random
import time

from paho.mqtt import client as mqtt_client


broker = 'localhost'
port = 1883
topic = "test"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = 'emqx'
password = 'public'

def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def publish(client):
    msg_count = 0
    while True:
        time.sleep(1)
        msg = f"messages: {msg_count}"
        result = client.publish(topic, msg)
        # result: [0, 1]
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")
        msg_count += 1


def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)


if __name__ == '__main__':
    run()

sub code:

# python3.6

import random

from paho.mqtt import client as mqtt_client


# broker = 'broker.emqx.io'
broker = 'localhost'
port = 1883
topic = "test"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'
username = 'emqx'
password = 'public'


def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

    client.subscribe(topic)
    client.on_message = on_message


def run():
    client = connect_mqtt()
    subscribe(client)
    client.loop_forever()


if __name__ == '__main__':
    run()

run these two file, we can see the effect.

js

https://github.com/mqttjs/MQTT.js

react code:

import React, { useEffect } from "react";
import * as mqtt from "mqtt/dist/mqtt";

const Map: React.FC = () => {

  useEffect(() => {
    const client = mqtt.connect(process.env.REACT_APP_MQTT_BROKER, {
      clientId: "mqttjs_" + Math.random().toString(16).substr(2, 8),
    });

    client.on("connect", function () {
      console.log("connected");
      client.subscribe("simu", function (err) {
        if (!err) {
          client.publish("simu", JSON.stringify({ message: "Hello mqtt" }));
        }
      });
    });

    client.on("message", function (topic, message) {
      const taskResult = JSON.parse(message.toString());
      console.log(taskResult);
      const record = tableData.find((item) => item.id === taskResult.task_id);
      if (record) {
        record.status = taskResult.status;
        record.statistics = taskResult.statistics;
        record.percent = taskResult.percent || 0;
        setTableData([...tableData]);
      }
    });

    return () => {
      client.end();
    };
    // eslint-disable-next-line react-hooks/exhaustive-deps
  }, []);

  ...
}

maybe you will get an error: websocket connection failed
you should add protocol websockets to the mqtt broker config and restart.

node.js

create a file mqtt.js:

const mqtt = require("mqtt");
const client = mqtt.connect("mqtt://localhost:1883");

const EdgeServerMountpointStateTopic =
  "/surge/edge_server_monitor/disk/mountpoint_state_report";
const EdgeServerHotPlugEventTopic =
  "/surge/edge_server_monitor/disk/hot_plug_event_report";

client.on("connect", function () {
  client.subscribe(EdgeServerMountpointStateTopic, function (err) {
    if (err) {
      console.log("监控边缘服务器数据盘状态MQTT订阅失败");
    }
  });
  client.subscribe(EdgeServerHotPlugEventTopic, function (err) {
    if (err) {
      console.log("监控车辆数据盘是否插盘MQTT订阅失败");
    }
  });
});

client.on("message", function (topic, message) {
  // client.end();
  let data;
  switch (topic) {
    // 边缘服务器盘是否在线
    case EdgeServerMountpointStateTopic:
      try {
        data = JSON.parse(message.toString());
      } catch (e) {
        console.log(
          "监控边缘服务器数据盘状态MQTT消息解析失败: " + message.toString()
        );
        return;
      }
      break;
    // 车辆数据盘是否插入边缘服务器
    case EdgeServerHotPlugEventTopic:
      try {
        data = JSON.parse(message.toString());
      } catch (e) {
        console.log(
          "监控车辆数据盘是否插盘MQTT消息解析失败: " + message.toString()
        );
        return;
      }
      break;
    default:
      console.log("default");
      break;
  }
});

install mqtt.js:

npm install mqtt --save

use node.js to run:

node mqtt.js

references

https://www.emqx.com/en/blog/how-to-use-mqtt-in-python http://www.steves-internet-guide.com/running-the-mosquitto-mqtt-broker-in-docker-beginners-guide/