Socket.io

we need communicate between frontend and backend using websocket.
socket.io is a good choice.

server

we can deploy socket.io server and React App in the same server, share same domain and port:

create a file named server.js:

const path = require("path");
const express = require("express");
const { createServer } = require("http");
const { Server } = require("socket.io");
const { createAdapter } = require("@socket.io/cluster-adapter");
const { setupWorker } = require("@socket.io/sticky");
const dotenv = require("dotenv");

const app = express();

// react app

app.use(express.static(path.join(__dirname, "../build")));
app.get("/*", (req, res) => {
  res.sendFile(path.join(__dirname, "../build", "index.html"));
});

// socket.io

const server = createServer(app);
const io = new Server(server, {
  maxHttpBufferSize: 1e8,
  transports: ["websocket"],
  cors: {
    origin: "*",
    accessControlAllowOrigin: "*",
  },
});

io.adapter(createAdapter());

setupWorker(io);

io.on("connection", (socket) => {
  console.log(`A user connected: ${socket.id}`);

  // 监听客户端发送的消息
  socket.on("surge", (msg) => {
    console.log("Message from client: ", socket.id, msg);
    if (msg.key && msg.message) {
      console.log("server emit: ", msg.key, msg.message);
      io.emit(msg.key, msg);
      io.emit(msg.sid, `received ${msg.sid}`);
    }
  });

  // 监听客户端断开连接事件
  socket.on("disconnect", () => {
    console.log("A user disconnected");
  });
});

const envPath = path.join(
  __dirname,
  `../.env.${process.env.NODE_ENV || "development"}`
);
dotenv.config({ path: envPath });

const port = process.env.REACT_APP_NODE_PORT || 3000;

// get variable from .env.development.local
server.listen(port, () => {
  console.log(`Server listening on http://localhost:${port}`);
});

we can deploy with pm2 with cluster mode, pm2 should use socket.io’s pm2: https://socket.io/docs/v4/pm2/
if you have installed pm2, remove it:

npm remove -g pm2
rm -rf ~/.pm2

create a file named ecosystem.config.js:

module.exports = {
  apps: [
    {
      name: "surge-frontend",
      script: "server/server.js",
      instances: "max",
      exec_mode: "cluster",
      watch: true,
      merge_logs: true,
    },
  ],
};

then deploy with this command:

NODE_ENV=production.local pm2 start server/ecosystem.config.js

client

react client

import { useEffect, useState } from "react";

const App: React.FC = () => {

  const [websocketToken, setWebsocketToken] = useState("");

  useEffect(() => {
    // because react app and socket.io server share same domain, so server address it not required
    const socket = io({
      transports: ["websocket"],
    });

    // I generate a unique id, and send it to backend server(python, I will explain following)
    // backend send message that contain this unique id to socket.io server, server will emit message to this unique id
    // so a one by one connection is implemented between a web page and a backend server
    const token = uuidv4();
    setWebsocketToken(token);
    console.log("token: ", token);

    socket.on("connect", () => {
      console.log("websocket connect success", token);
    });

    socket.on(token, (data) => {
      console.log(data);
    });

    return () => {
      socket.disconnect();
    };
    // eslint-disable-next-line react-hooks/exhaustive-deps
  }, []);
}

python client

we use python-socketio library

import socketio

def send_websocket_message(message):
    """往websocket发送消息

    Args:
        message: dict
            all keys are required
            {
                type: string, "progress" | "error" | "rate"
                key: string, websocket key
                message: string
            }
    """
    def send(message):
        sio = socketio.Client()
        sio.connect("http://localhost:3000", transports="websocket")

        message["sid"] = sio.sid

        sio.emit('surge', message)

        # this event listener is very important
        # because sio.emit is asynchronous, if this function finished, sio will disconnect, message maybe haven't been sent
        # so we send sid to server, server will send a message to this sid after received message, then we disconnect
        # note: the following websocket_cmd function will be executed in a longer time, so we don't need this. the last message maybe emit fail, but it doesn't matter
        @sio.on(sio.sid)
        def handle_echo(message):
            print(message)
            sio.disconnect()

        # wait function will block current thread, until sio.disconnect() is called
        sio.wait()

    t = threading.Thread(target=send, args=(message, ))
    t.start()


def websocket_cmd(cmd_str, progress_token):
    """执行命令并将结果发送到websocket
    """
    LOGGER.info('websocket_cmd: %s', cmd_str)
    sio = socketio.Client()
    sio.connect("http://localhost:3000", transports="websocket")
    with subprocess.Popen(cmd_str,
                          shell=True,
                          stdout=subprocess.PIPE,
                          bufsize=1,
                          universal_newlines=True) as process:
        for line in process.stdout:
            line = line.rstrip()
            sio.emit("surge", {
                "type": "rate",
                "key": progress_token,
                "message": line
            })
    sio.disconnect()
    if process.returncode != 0:
        if process.returncode == 255:
            raise Exception(f"车辆/服务器不在线, Command '{cmd_str}' returned 255")
        raise Exception(
            f"Command '{cmd_str}' returned non-zero exit status {process.returncode}."
        )

nginx

if we want use nginx as a reverse proxy, we should add following config:

server {
    listen 8802;
    server_name socketio;

    location /socket.io {
        proxy_pass http://localhost:3000/socket.io;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
    }
}