# HiveMQ Edge-to-Cloud AI Pipeline

This article explains the full data pipeline shown in the diagram starting from a PLC on the shopfloor, sending data through HiveMQ Edge and Cloud, into an AI model built with **Python + PyOD**, and finally sending the anomaly result back to the HiveMQ Edge for visualization.

{% embed url="<https://files.gitbook.com/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FLd2M9UNfMTnw9DjDdZJz%2Fuploads%2FN7nrvFnAq1yuJpxLOFcm%2FHiveMQ_Post3.mp4?alt=media&token=fdabc463-593a-40d1-9092-f67facd0188d>" %}

{% hint style="info" %}
Kindly refer to last article to learn how to send factory data to HiveMQ Cloud. This article continues from there to further send data to ML for anomaly detection.
{% endhint %}

## System Overview

This architecture demonstrates how vibration data from a machine is collected at the **OT (Operational Technology)** level, transported securely to the **Cloud**, evaluated using an AI model, and returned to the shop-floor for further action.

It consists of:

1. **Input Layer** – Reading vibration data from a PLC (via OPC UA)
2. **Transport Layer** – Sending data from HiveMQ Edge → HiveMQ Cloud (via MQTT)
3. **AI Layer** – ML server running Python + Flask + PyOD locally on the computer
4. **Output Layer** – Returning anomaly alerts via MQTT → HiveMQ Edge → PLC (optional)

***

## Step-by-Step Breakdown of the Pipeline

### **1. Input Layer — PLC to HiveMQ Edge (OPC UA)**

* A Siemens S7-1500 PLC measures **vibration (simulated)** from the machine.
* This data is exposed through **OPC UA**.
* HiveMQ Edge connects to the PLC's OPC UA server.
* It maps the OPC UA tag (e.g., `vibration`) to an MQTT topic:

```
machine/s71500/rVib
```

> Industrial data becomes IIoT data.

***

### **2. Transport Layer — HiveMQ Edge to HiveMQ Cloud (MQTT)**

HiveMQ Edge publishes the MQTT message (`machine/s71500/vib`) to **HiveMQ Cloud**, which acts as a central cloud broker.

Benefits:

* Low latency
* Very small message size → suitable for industrial IoT
* Secure TLS transport and easy to scale

{% hint style="info" %}
Learn more about that in our earlier posts on HiveMQ where you will learn the complete workflow: [PLC to Cloud via HiveMQ (Edge + Cloud) — OPC UA → MQTT → Confluent](/product-reviews/smart-platforms/hivemq/plc-to-cloud-via-hivemq-edge-+-cloud-opc-ua-mqtt-confluent.md)
{% endhint %}

***

### 3. AI Layer — ML server running Python + Flask + PyOD locally on the compute

This is where the anomaly detection happens. Let's first understand what is Anomaly detection and why we need it

#### What is Anomaly detection?

**Anomaly detection** is the process of identifying data points or behavior that deviate from what’s considered *normal*. In simple terms, it spots when something unusual is happening.

In **Machine Learning**, anomaly detection helps models understand patterns and flag unexpected behaviour without needing labels. In **IIoT**, it's essential because machines generate huge amounts of real-time data, and even small deviations can signal issues like equipment failure, quality defects, cyber-attacks, or unsafe operating conditions.

#### **Why we need it:**

* Detect problems early before they become costly
* Improve uptime and reliability
* Enhance safety and Reduce maintenance costs

#### 🧠 PyOD — Python Outlier Detection Library

**PyOD** is a widely-used ML library that provides 40+ algorithms for anomaly detection, such as:

* Isolation Forest (IForest)
* AutoEncoders
* One-Class SVM
* LOF (Local Outlier Factor)

In the example, **Isolation Forest (IForest)** is used. It learns what “normal vibration” looks like from training data:

```python
train = np.array([[0.23], [0.25], [0.21], [0.29]])
model = IForest()
model.fit(train)
```

When new data comes, PyOD predicts:

* **0 → Normal**
* **1 → Anomaly**

**Learn more about Pyod here:** [**https://pyod.readthedocs.io/en/latest/**](https://pyod.readthedocs.io/en/latest/)

#### **Installing Pyod and testing Anomaly detection**

In our example, the Pyod has been installed in the windows computer. The following are the steps:

{% stepper %}
{% step %}

#### Install pyod on Windows PC

{% hint style="info" %}
Make sure Python is installed in your computer. In our case, we are using Python 3. To install Python visit here: <https://www.python.org/downloads/>
{% endhint %}

```
pip3 install pyod
```

{% endstep %}

{% step %}

#### Create a python file for testing:

`test_pyod.py`
{% endstep %}

{% step %}

#### Add the following code in that

```python
from pyod.models.iforest import IForest
import numpy as np

# training data: only normal vibration history
train = np.array([[0.23], [0.25], [0.21], [0.29]])

model = IForest()
model.fit(train)

# real-time vibration input
value = 0.45 #change this value to see different result.
prediction = model.predict([[value]])[0]

if prediction == 1:
    print("Anomaly detected!")
else:
    print("Normal")

```

{% endstep %}

{% step %}

#### Run the Python file

```shellscript
python test_pyod.py
```

{% endstep %}

{% step %}

#### Validate the output

‘Anomaly detected’ or ‘Normal’ based on the value sent in the code (0.45)
{% endstep %}
{% endstepper %}

#### Anomaly detection using Pyod and Node-RED

Now, we are going to test the code using Node-RED. In this case, we setup a flask server in Node-RED so that we can execute the Python code using API. This makes it easy to send data to the Python file and get anomaly results as feedback.

{% hint style="info" %}
Make sure latest version of Node-RED and Python is installed in your system.&#x20;

* To install Node-RED visit here: <https://nodered.org/>
* To install Python visit here: <https://www.python.org/downloads/>
  {% endhint %}

Once, the Node-RED is installed proceed with the following steps:

{% stepper %}
{% step %}

#### Convert Your Python Code into a Simple API Server.

We will not run Python file directly from Node-RED, we will create a Python web API for data exchange.&#x20;

Create a new Python file named '**server.py**' with the following code:

{% code overflow="wrap" lineNumbers="true" expandable="true" %}

```python
from flask import Flask, request, jsonify
from pyod.models.iforest import IForest
import numpy as np

app = Flask(__name__)

# Train model once at startup
train = np.array([[0.23], [0.25], [0.21], [0.29]])
model = IForest()
model.fit(train)

@app.route("/predict", methods=["POST"])
def predict():
    data = request.json
    value = float(data["value"])  # vibration input

    prediction = model.predict([[value]])[0]

    return jsonify({
        "value": value,
        "prediction": int(prediction),
        "status": "Anomaly" if prediction == 1 else "Normal"
    })

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)
```

{% endcode %}
{% endstep %}

{% step %}

#### Install Flask

```shellscript
pip install flask
```

{% endstep %}

{% step %}

#### Run your Python Server

```shellscript
python server.py
```

{% endstep %}

{% step %}

#### Validate

You should see the following:

<figure><img src="/files/zMyDf7tFLRaaojkKEtiA" alt=""><figcaption></figcaption></figure>

Your anomaly detection API is now LIVE at: <http://localhost:5000/predict>
{% endstep %}

{% step %}

#### Start Node-RED and test the Anomaly detection

You can use the '**http request node**' in Node-RED to send sample data and fetch anomaly results as shown below.&#x20;

<figure><img src="/files/7XhteKdDw8faP3PWJeij" alt=""><figcaption></figcaption></figure>

Kindly check the reference Node-RED flow below:

{% code overflow="wrap" expandable="true" %}

```json
[
    {
        "id": "53bf0f485d74978b",
        "type": "tab",
        "label": "Flow 3",
        "disabled": false,
        "info": "",
        "env": []
    },
    {
        "id": "05ded4d2341631a0",
        "type": "http request",
        "z": "53bf0f485d74978b",
        "name": "",
        "method": "POST",
        "ret": "obj",
        "paytoqs": "ignore",
        "url": "http://localhost:5000/predict",
        "tls": "",
        "persist": false,
        "proxy": "",
        "insecureHTTPParser": false,
        "authType": "",
        "senderr": false,
        "headers": [],
        "x": 530,
        "y": 80,
        "wires": [
            [
                "7b1ef153885b1894"
            ]
        ]
    },
    {
        "id": "4abe4b4dca5e520e",
        "type": "function",
        "z": "53bf0f485d74978b",
        "name": "function 3",
        "func": "msg.headers = { \"Content-Type\": \"application/json\" };\nmsg.payload = { value: msg.payload };\nreturn msg;\n",
        "outputs": 1,
        "timeout": 0,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 340,
        "y": 80,
        "wires": [
            [
                "05ded4d2341631a0"
            ]
        ]
    },
    {
        "id": "7b1ef153885b1894",
        "type": "debug",
        "z": "53bf0f485d74978b",
        "name": "debug 1",
        "active": true,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "false",
        "statusVal": "",
        "statusType": "auto",
        "x": 700,
        "y": 80,
        "wires": []
    },
    {
        "id": "301488acacb31208",
        "type": "inject",
        "z": "53bf0f485d74978b",
        "name": "",
        "props": [
            {
                "p": "payload"
            },
            {
                "p": "topic",
                "vt": "str"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "payload": "0.45",
        "payloadType": "num",
        "x": 150,
        "y": 80,
        "wires": [
            [
                "4abe4b4dca5e520e"
            ]
        ]
    },
    {
        "id": "0f591249346fc88a",
        "type": "inject",
        "z": "53bf0f485d74978b",
        "name": "",
        "props": [
            {
                "p": "payload"
            },
            {
                "p": "topic",
                "vt": "str"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "payload": "0.25",
        "payloadType": "num",
        "x": 150,
        "y": 120,
        "wires": [
            [
                "4abe4b4dca5e520e"
            ]
        ]
    }
]
```

{% endcode %}
{% endstep %}
{% endstepper %}

***

#### Testing Anomaly detection with PLC Data

Now, let's use the above example and sends the PLC data to the Python server to get anomaly results.&#x20;

{% hint style="info" %}
For the sake of demonstration, we are considering the normal values within the range $$0.0∼5.0$$ and anomalies outside this range. So, in our Python code, we have created a dataset of $$500$$ samples in the range of $$0.0$$ to $$5.0$$. Our model will learn from this dataset, as shown in the code below.
{% endhint %}

{% stepper %}
{% step %}

#### Update your Python code in the server.py file

The code trains an anomaly detection model, exposes it through a REST API, and lets you retrain the model using real machine data.

{% code overflow="wrap" lineNumbers="true" expandable="true" %}

```python
from flask import Flask, request, jsonify
from pyod.models.iforest import IForest
from sklearn.preprocessing import StandardScaler
import numpy as np

app = Flask(__name__)

# =========================================================
# 1. TRAIN MODEL ON REALISTIC NORMAL VIBRATION DISTRIBUTION
# =========================================================

# Simulate "normal" vibration: around 2.0 + noise(-3 to +3)
# That gives ~ 0 to 5 mm/s range
normal_values = np.random.uniform(0.0, 5.0, 500)   # 500 samples
normal_values = normal_values.reshape(-1, 1)

# Scale the training data (important for broader vibration ranges)
scaler = StandardScaler()
normal_scaled = scaler.fit_transform(normal_values)

# Train Isolation Forest
model = IForest(contamination=0.05)  # 5% anomaly expected
model.fit(normal_scaled)


# =========================================================
# 2. API ENDPOINT: PREDICT ANOMALY
# =========================================================
@app.route("/predict", methods=["POST"])
def predict():
    data = request.json
    value = float(data["value"])

    # Scale incoming data
    scaled_value = scaler.transform([[value]])

    # Raw prediction: 1 = anomaly, 0 = normal
    prediction = int(model.predict(scaled_value)[0])

    # Anomaly score (higher → more anomaly)
    score = float(model.decision_function(scaled_value)[0])

    return jsonify({
        "input_value": value,
        "scaled_value": scaled_value[0][0],
        "anomaly_score": score,
        "prediction": prediction,
        "status": "Anomaly Detected" if prediction == 1 else "Normal Behaviour"
    })


# =========================================================
# 3. OPTIONAL — RETRAIN USING DATA FROM CLIENT (Node-RED)
# =========================================================
@app.route("/retrain", methods=["POST"])
def retrain():
    data = request.json
    new_samples = np.array(data["samples"]).reshape(-1, 1)

    global scaler, model

    scaler = StandardScaler()
    scaled = scaler.fit_transform(new_samples)

    model = IForest(contamination=0.05)
    model.fit(scaled)

    return jsonify({"message": "Model retrained successfully", "samples_used": len(new_samples)})


# =========================================================
# 4. RUN SERVER
# =========================================================
if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)

```

{% endcode %}

This script creates a small **AI-powered anomaly detection API** using Flask and the PyOD Isolation Forest model.

1. **It generates normal vibration data** (0–5 mm/s) and uses it to train an Isolation Forest model.
2. **It scales all vibration values** using StandardScaler so the model can understand them correctly.
3. **It exposes a `/predict` API endpoint** where you send one vibration value, and the server returns:
   * Normal or Anomaly
   * Anomaly score
   * Scaled value
4. **It provides a `/retrain` endpoint** that allows you to send new vibration samples (from Node-RED or a PLC) and retrain the model on the fly.
5. **Flask runs the server on port 5000**, making it easy to integrate with IIoT systems.

Learn more about this code here: [Anomaly detection Code Explanation](/product-reviews/smart-platforms/hivemq/hivemq-edge-to-cloud-ai-pipeline/anomaly-detection-code-explanation.md)&#x20;
{% endstep %}

{% step %}

#### Update the Node-RED flow

Now we need to subscribe the following HiveMQ Cloud topic to get the LIVE vibration data:

```
hivemq-edge/machine/s71500/rVib
```

<figure><img src="/files/M6ovmudQeJq1mKxjt6Ey" alt=""><figcaption></figcaption></figure>
{% endstep %}
{% endstepper %}

> Every time vibration data arrives from HiveMQ Cloud → the ML server receives it, runs inference, and give the result back to Node-RED

***

### **4. Output Layer** – Returning anomaly alerts via MQTT → HiveMQ Edge → PLC (optional)

**Now we need to send the result back to HiveMQ Edge, visualize it on the dashboard or sends to the PLC for further action.**

{% stepper %}
{% step %}

#### Add MQTT out node in the flow

We will take another MQTT topic to filter and publish the result

```
hivemq-edge/machine/s71500/anomaly
```

<figure><img src="/files/OOaPUFdbwXR3t2129bIE" alt=""><figcaption></figcaption></figure>

The following flow can be used as a reference.

{% code overflow="wrap" expandable="true" %}

```json
[
    {
        "id": "1ad2c5724dad60ec",
        "type": "tab",
        "label": "Flow 1",
        "disabled": false,
        "info": "",
        "env": []
    },
    {
        "id": "2e10e2177a83dbbf",
        "type": "http request",
        "z": "1ad2c5724dad60ec",
        "name": "",
        "method": "POST",
        "ret": "obj",
        "paytoqs": "ignore",
        "url": "http://localhost:5000/predict",
        "tls": "",
        "persist": false,
        "proxy": "",
        "insecureHTTPParser": false,
        "authType": "",
        "senderr": false,
        "headers": [],
        "x": 570,
        "y": 80,
        "wires": [
            [
                "c3746a81c58e797a"
            ]
        ]
    },
    {
        "id": "1d57a237256e69e7",
        "type": "function",
        "z": "1ad2c5724dad60ec",
        "name": "function 1",
        "func": "msg.headers = { \"Content-Type\": \"application/json\" };\nmsg.payload = { value: msg.payload.value };\nreturn msg;\n",
        "outputs": 1,
        "timeout": 0,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 400,
        "y": 80,
        "wires": [
            [
                "2e10e2177a83dbbf"
            ]
        ]
    },
    {
        "id": "ecd705d7eabc8086",
        "type": "mqtt in",
        "z": "1ad2c5724dad60ec",
        "name": "",
        "topic": "hivemq-edge/machine/s71500/rVib",
        "qos": "2",
        "datatype": "auto-detect",
        "broker": "08f8856327f4fe37",
        "nl": false,
        "rap": true,
        "rh": 0,
        "inputs": 0,
        "x": 160,
        "y": 80,
        "wires": [
            [
                "1d57a237256e69e7"
            ]
        ]
    },
    {
        "id": "67d96c79df053896",
        "type": "mqtt out",
        "z": "1ad2c5724dad60ec",
        "name": "",
        "topic": "hivemq-edge/machine/s71500/anomaly",
        "qos": "",
        "retain": "",
        "respTopic": "",
        "contentType": "",
        "userProps": "",
        "correl": "",
        "expiry": "",
        "broker": "08f8856327f4fe37",
        "x": 600,
        "y": 160,
        "wires": []
    },
    {
        "id": "c3746a81c58e797a",
        "type": "function",
        "z": "1ad2c5724dad60ec",
        "name": "function 2",
        "func": "msg.payload = msg.payload.status;\nreturn msg;",
        "outputs": 1,
        "timeout": 0,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 300,
        "y": 160,
        "wires": [
            [
                "67d96c79df053896"
            ]
        ]
    },
    {
        "id": "08f8856327f4fe37",
        "type": "mqtt-broker",
        "name": "",
        "broker": "1eba33276f3f4d2b9852f78bf6ab2880.s1.eu.hivemq.cloud",
        "port": "8883",
        "tls": "",
        "clientid": "",
        "autoConnect": true,
        "usetls": true,
        "protocolVersion": 4,
        "keepalive": 60,
        "cleansession": true,
        "autoUnsubscribe": true,
        "birthTopic": "",
        "birthQos": "0",
        "birthRetain": "false",
        "birthPayload": "",
        "birthMsg": {},
        "closeTopic": "",
        "closeQos": "0",
        "closeRetain": "false",
        "closePayload": "",
        "closeMsg": {},
        "willTopic": "",
        "willQos": "0",
        "willRetain": "false",
        "willPayload": "",
        "willMsg": {},
        "userProps": "",
        "sessionExpiry": ""
    }
]
```

{% endcode %}
{% endstep %}

{% step %}

#### Bridge the HiveMQ Cloud MQTT topic with HiveMQ Edge MQTT topic

Go to MQTT bridge in HiveMQ Edge and add a remote subscription in your current bridge as shown below:

<figure><img src="/files/PEjrCUv9Ua55ocSswdft" alt=""><figcaption></figcaption></figure>

<figure><img src="/files/THKGag1h29KxsJeaQa7H" alt="" width="375"><figcaption></figcaption></figure>

Now navigate to Broker Configuration and 'Enable loop prevention'. This will prevent looping of data flow

<figure><img src="/files/QR9Gffxrya8GEdhq11ct" alt="" width="375"><figcaption></figcaption></figure>
{% endstep %}

{% step %}

#### Visualize Anomaly in HiveMQ Edge

Now that the anomaly arrives to MQTT Edge, you can visualize it with tools like Node-RED as shown below:

<figure><img src="/files/dRlF8zLznYXiPHae86WK" alt=""><figcaption></figcaption></figure>

<figure><img src="/files/ZJS6Oi6FSFX8XzHcoMKu" alt="" width="321"><figcaption></figcaption></figure>

{% hint style="info" %}
You can further sends the anomaly results back to PLC via OPC UA.
{% endhint %}
{% endstep %}
{% endstepper %}

***

## Complete Workflow

{% embed url="<https://files.gitbook.com/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FLd2M9UNfMTnw9DjDdZJz%2Fuploads%2FvPGMMRYPhiqsUXnsEKhl%2FHiveMQ%20Anomaly%20detection.mp4?alt=media&token=730bd29b-d752-4e25-976d-e49e406924bd>" %}

## ♥️ Work With Me

I regularly test **industrial automation and IIoT devices**. If you’d like me to **review your product** or showcase it in my courses and YouTube channel:

📧 Email: <rajvir@codeandcompile.com> or drop me a message on [LinkedIn](https://www.linkedin.com/in/singhrajvir/)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://wiki.codeandcompile.com/product-reviews/smart-platforms/thingsboard/hivemq-edge-to-cloud-ai-pipeline.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
