# HiveMQ Edge-to-Cloud AI Pipeline

This article explains the full data pipeline shown in the diagram starting from a PLC on the shopfloor, sending data through HiveMQ Edge and Cloud, into an AI model built with **Python + PyOD**, and finally sending the anomaly result back to the HiveMQ Edge for visualization.

{% embed url="<https://files.gitbook.com/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FLd2M9UNfMTnw9DjDdZJz%2Fuploads%2FN7nrvFnAq1yuJpxLOFcm%2FHiveMQ_Post3.mp4?alt=media&token=fdabc463-593a-40d1-9092-f67facd0188d>" %}

{% hint style="info" %}
Kindly refer to last article to learn how to send factory data to HiveMQ Cloud. This article continues from there to further send data to ML for anomaly detection.
{% endhint %}

## System Overview

This architecture demonstrates how vibration data from a machine is collected at the **OT (Operational Technology)** level, transported securely to the **Cloud**, evaluated using an AI model, and returned to the shop-floor for further action.

It consists of:

1. **Input Layer** – Reading vibration data from a PLC (via OPC UA)
2. **Transport Layer** – Sending data from HiveMQ Edge → HiveMQ Cloud (via MQTT)
3. **AI Layer** – ML server running Python + Flask + PyOD locally on the computer
4. **Output Layer** – Returning anomaly alerts via MQTT → HiveMQ Edge → PLC (optional)

***

## Step-by-Step Breakdown of the Pipeline

### **1. Input Layer — PLC to HiveMQ Edge (OPC UA)**

* A Siemens S7-1500 PLC measures **vibration (simulated)** from the machine.
* This data is exposed through **OPC UA**.
* HiveMQ Edge connects to the PLC's OPC UA server.
* It maps the OPC UA tag (e.g., `vibration`) to an MQTT topic:

```
machine/s71500/rVib
```

> Industrial data becomes IIoT data.

***

### **2. Transport Layer — HiveMQ Edge to HiveMQ Cloud (MQTT)**

HiveMQ Edge publishes the MQTT message (`machine/s71500/vib`) to **HiveMQ Cloud**, which acts as a central cloud broker.

Benefits:

* Low latency
* Very small message size → suitable for industrial IoT
* Secure TLS transport and easy to scale

{% hint style="info" %}
Learn more about that in our earlier posts on HiveMQ where you will learn the complete workflow: [plc-to-cloud-via-hivemq-edge-+-cloud-opc-ua-mqtt-confluent](https://wiki.codeandcompile.com/product-reviews/smart-platforms/hivemq/plc-to-cloud-via-hivemq-edge-+-cloud-opc-ua-mqtt-confluent "mention")
{% endhint %}

***

### 3. AI Layer — ML server running Python + Flask + PyOD locally on the compute

This is where the anomaly detection happens. Let's first understand what is Anomaly detection and why we need it

#### What is Anomaly detection?

**Anomaly detection** is the process of identifying data points or behavior that deviate from what’s considered *normal*. In simple terms, it spots when something unusual is happening.

In **Machine Learning**, anomaly detection helps models understand patterns and flag unexpected behaviour without needing labels. In **IIoT**, it's essential because machines generate huge amounts of real-time data, and even small deviations can signal issues like equipment failure, quality defects, cyber-attacks, or unsafe operating conditions.

#### **Why we need it:**

* Detect problems early before they become costly
* Improve uptime and reliability
* Enhance safety and Reduce maintenance costs

#### 🧠 PyOD — Python Outlier Detection Library

**PyOD** is a widely-used ML library that provides 40+ algorithms for anomaly detection, such as:

* Isolation Forest (IForest)
* AutoEncoders
* One-Class SVM
* LOF (Local Outlier Factor)

In the example, **Isolation Forest (IForest)** is used. It learns what “normal vibration” looks like from training data:

```python
train = np.array([[0.23], [0.25], [0.21], [0.29]])
model = IForest()
model.fit(train)
```

When new data comes, PyOD predicts:

* **0 → Normal**
* **1 → Anomaly**

**Learn more about Pyod here:** [**https://pyod.readthedocs.io/en/latest/**](https://pyod.readthedocs.io/en/latest/)

#### **Installing Pyod and testing Anomaly detection**

In our example, the Pyod has been installed in the windows computer. The following are the steps:

{% stepper %}
{% step %}

#### Install pyod on Windows PC

{% hint style="info" %}
Make sure Python is installed in your computer. In our case, we are using Python 3. To install Python visit here: <https://www.python.org/downloads/>
{% endhint %}

```
pip3 install pyod
```

{% endstep %}

{% step %}

#### Create a python file for testing:

`test_pyod.py`
{% endstep %}

{% step %}

#### Add the following code in that

```python
from pyod.models.iforest import IForest
import numpy as np

# training data: only normal vibration history
train = np.array([[0.23], [0.25], [0.21], [0.29]])

model = IForest()
model.fit(train)

# real-time vibration input
value = 0.45 #change this value to see different result.
prediction = model.predict([[value]])[0]

if prediction == 1:
    print("Anomaly detected!")
else:
    print("Normal")

```

{% endstep %}

{% step %}

#### Run the Python file

```shellscript
python test_pyod.py
```

{% endstep %}

{% step %}

#### Validate the output

‘Anomaly detected’ or ‘Normal’ based on the value sent in the code (0.45)
{% endstep %}
{% endstepper %}

#### Anomaly detection using Pyod and Node-RED

Now, we are going to test the code using Node-RED. In this case, we setup a flask server in Node-RED so that we can execute the Python code using API. This makes it easy to send data to the Python file and get anomaly results as feedback.

{% hint style="info" %}
Make sure latest version of Node-RED and Python is installed in your system.&#x20;

* To install Node-RED visit here: <https://nodered.org/>
* To install Python visit here: <https://www.python.org/downloads/>
  {% endhint %}

Once, the Node-RED is installed proceed with the following steps:

{% stepper %}
{% step %}

#### Convert Your Python Code into a Simple API Server.

We will not run Python file directly from Node-RED, we will create a Python web API for data exchange.&#x20;

Create a new Python file named '**server.py**' with the following code:

{% code overflow="wrap" lineNumbers="true" expandable="true" %}

```python
from flask import Flask, request, jsonify
from pyod.models.iforest import IForest
import numpy as np

app = Flask(__name__)

# Train model once at startup
train = np.array([[0.23], [0.25], [0.21], [0.29]])
model = IForest()
model.fit(train)

@app.route("/predict", methods=["POST"])
def predict():
    data = request.json
    value = float(data["value"])  # vibration input

    prediction = model.predict([[value]])[0]

    return jsonify({
        "value": value,
        "prediction": int(prediction),
        "status": "Anomaly" if prediction == 1 else "Normal"
    })

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)
```

{% endcode %}
{% endstep %}

{% step %}

#### Install Flask

```shellscript
pip install flask
```

{% endstep %}

{% step %}

#### Run your Python Server

```shellscript
python server.py
```

{% endstep %}

{% step %}

#### Validate

You should see the following:

<figure><img src="https://1831238825-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FLd2M9UNfMTnw9DjDdZJz%2Fuploads%2FwHRcXvldlLyrn9zbhTt2%2Fimage.png?alt=media&#x26;token=ada51f0f-5581-47bd-a273-6ea4cff8149f" alt=""><figcaption></figcaption></figure>

Your anomaly detection API is now LIVE at: <http://localhost:5000/predict>
{% endstep %}

{% step %}

#### Start Node-RED and test the Anomaly detection

You can use the '**http request node**' in Node-RED to send sample data and fetch anomaly results as shown below.&#x20;

<figure><img src="https://1831238825-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FLd2M9UNfMTnw9DjDdZJz%2Fuploads%2FDpn8aJR9C5tEd3V8TQxk%2Fimage.png?alt=media&#x26;token=55a65e47-0bdd-47f7-9cbe-db51b57aca7a" alt=""><figcaption></figcaption></figure>

Kindly check the reference Node-RED flow below:

{% code overflow="wrap" expandable="true" %}

```json
[
    {
        "id": "53bf0f485d74978b",
        "type": "tab",
        "label": "Flow 3",
        "disabled": false,
        "info": "",
        "env": []
    },
    {
        "id": "05ded4d2341631a0",
        "type": "http request",
        "z": "53bf0f485d74978b",
        "name": "",
        "method": "POST",
        "ret": "obj",
        "paytoqs": "ignore",
        "url": "http://localhost:5000/predict",
        "tls": "",
        "persist": false,
        "proxy": "",
        "insecureHTTPParser": false,
        "authType": "",
        "senderr": false,
        "headers": [],
        "x": 530,
        "y": 80,
        "wires": [
            [
                "7b1ef153885b1894"
            ]
        ]
    },
    {
        "id": "4abe4b4dca5e520e",
        "type": "function",
        "z": "53bf0f485d74978b",
        "name": "function 3",
        "func": "msg.headers = { \"Content-Type\": \"application/json\" };\nmsg.payload = { value: msg.payload };\nreturn msg;\n",
        "outputs": 1,
        "timeout": 0,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 340,
        "y": 80,
        "wires": [
            [
                "05ded4d2341631a0"
            ]
        ]
    },
    {
        "id": "7b1ef153885b1894",
        "type": "debug",
        "z": "53bf0f485d74978b",
        "name": "debug 1",
        "active": true,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "false",
        "statusVal": "",
        "statusType": "auto",
        "x": 700,
        "y": 80,
        "wires": []
    },
    {
        "id": "301488acacb31208",
        "type": "inject",
        "z": "53bf0f485d74978b",
        "name": "",
        "props": [
            {
                "p": "payload"
            },
            {
                "p": "topic",
                "vt": "str"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "payload": "0.45",
        "payloadType": "num",
        "x": 150,
        "y": 80,
        "wires": [
            [
                "4abe4b4dca5e520e"
            ]
        ]
    },
    {
        "id": "0f591249346fc88a",
        "type": "inject",
        "z": "53bf0f485d74978b",
        "name": "",
        "props": [
            {
                "p": "payload"
            },
            {
                "p": "topic",
                "vt": "str"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "payload": "0.25",
        "payloadType": "num",
        "x": 150,
        "y": 120,
        "wires": [
            [
                "4abe4b4dca5e520e"
            ]
        ]
    }
]
```

{% endcode %}
{% endstep %}
{% endstepper %}

***

#### Testing Anomaly detection with PLC Data

Now, let's use the above example and sends the PLC data to the Python server to get anomaly results.&#x20;

{% hint style="info" %}
For the sake of demonstration, we are considering the normal values within the range $$0.0∼5.0$$ and anomalies outside this range. So, in our Python code, we have created a dataset of $$500$$ samples in the range of $$0.0$$ to $$5.0$$. Our model will learn from this dataset, as shown in the code below.
{% endhint %}

{% stepper %}
{% step %}

#### Update your Python code in the server.py file

The code trains an anomaly detection model, exposes it through a REST API, and lets you retrain the model using real machine data.

{% code overflow="wrap" lineNumbers="true" expandable="true" %}

```python
from flask import Flask, request, jsonify
from pyod.models.iforest import IForest
from sklearn.preprocessing import StandardScaler
import numpy as np

app = Flask(__name__)

# =========================================================
# 1. TRAIN MODEL ON REALISTIC NORMAL VIBRATION DISTRIBUTION
# =========================================================

# Simulate "normal" vibration: around 2.0 + noise(-3 to +3)
# That gives ~ 0 to 5 mm/s range
normal_values = np.random.uniform(0.0, 5.0, 500)   # 500 samples
normal_values = normal_values.reshape(-1, 1)

# Scale the training data (important for broader vibration ranges)
scaler = StandardScaler()
normal_scaled = scaler.fit_transform(normal_values)

# Train Isolation Forest
model = IForest(contamination=0.05)  # 5% anomaly expected
model.fit(normal_scaled)


# =========================================================
# 2. API ENDPOINT: PREDICT ANOMALY
# =========================================================
@app.route("/predict", methods=["POST"])
def predict():
    data = request.json
    value = float(data["value"])

    # Scale incoming data
    scaled_value = scaler.transform([[value]])

    # Raw prediction: 1 = anomaly, 0 = normal
    prediction = int(model.predict(scaled_value)[0])

    # Anomaly score (higher → more anomaly)
    score = float(model.decision_function(scaled_value)[0])

    return jsonify({
        "input_value": value,
        "scaled_value": scaled_value[0][0],
        "anomaly_score": score,
        "prediction": prediction,
        "status": "Anomaly Detected" if prediction == 1 else "Normal Behaviour"
    })


# =========================================================
# 3. OPTIONAL — RETRAIN USING DATA FROM CLIENT (Node-RED)
# =========================================================
@app.route("/retrain", methods=["POST"])
def retrain():
    data = request.json
    new_samples = np.array(data["samples"]).reshape(-1, 1)

    global scaler, model

    scaler = StandardScaler()
    scaled = scaler.fit_transform(new_samples)

    model = IForest(contamination=0.05)
    model.fit(scaled)

    return jsonify({"message": "Model retrained successfully", "samples_used": len(new_samples)})


# =========================================================
# 4. RUN SERVER
# =========================================================
if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)

```

{% endcode %}

This script creates a small **AI-powered anomaly detection API** using Flask and the PyOD Isolation Forest model.

1. **It generates normal vibration data** (0–5 mm/s) and uses it to train an Isolation Forest model.
2. **It scales all vibration values** using StandardScaler so the model can understand them correctly.
3. **It exposes a `/predict` API endpoint** where you send one vibration value, and the server returns:
   * Normal or Anomaly
   * Anomaly score
   * Scaled value
4. **It provides a `/retrain` endpoint** that allows you to send new vibration samples (from Node-RED or a PLC) and retrain the model on the fly.
5. **Flask runs the server on port 5000**, making it easy to integrate with IIoT systems.

Learn more about this code here: [anomaly-detection-code-explanation](https://wiki.codeandcompile.com/product-reviews/smart-platforms/hivemq/hivemq-edge-to-cloud-ai-pipeline/anomaly-detection-code-explanation "mention")&#x20;
{% endstep %}

{% step %}

#### Update the Node-RED flow

Now we need to subscribe the following HiveMQ Cloud topic to get the LIVE vibration data:

```
hivemq-edge/machine/s71500/rVib
```

<figure><img src="https://1831238825-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FLd2M9UNfMTnw9DjDdZJz%2Fuploads%2FAjQgm0t1ZXGDv7hG6AAB%2Fimage.png?alt=media&#x26;token=f15099ba-799b-486b-8804-93014f96760a" alt=""><figcaption></figcaption></figure>
{% endstep %}
{% endstepper %}

> Every time vibration data arrives from HiveMQ Cloud → the ML server receives it, runs inference, and give the result back to Node-RED

***

### **4. Output Layer** – Returning anomaly alerts via MQTT → HiveMQ Edge → PLC (optional)

**Now we need to send the result back to HiveMQ Edge, visualize it on the dashboard or sends to the PLC for further action.**

{% stepper %}
{% step %}

#### Add MQTT out node in the flow

We will take another MQTT topic to filter and publish the result

```
hivemq-edge/machine/s71500/anomaly
```

<figure><img src="https://1831238825-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FLd2M9UNfMTnw9DjDdZJz%2Fuploads%2F43gRI8hmJY6DGWsQIAPD%2Fimage.png?alt=media&#x26;token=cb6eb521-125c-464e-93b4-fe04847e6f20" alt=""><figcaption></figcaption></figure>

The following flow can be used as a reference.

{% code overflow="wrap" expandable="true" %}

```json
[
    {
        "id": "1ad2c5724dad60ec",
        "type": "tab",
        "label": "Flow 1",
        "disabled": false,
        "info": "",
        "env": []
    },
    {
        "id": "2e10e2177a83dbbf",
        "type": "http request",
        "z": "1ad2c5724dad60ec",
        "name": "",
        "method": "POST",
        "ret": "obj",
        "paytoqs": "ignore",
        "url": "http://localhost:5000/predict",
        "tls": "",
        "persist": false,
        "proxy": "",
        "insecureHTTPParser": false,
        "authType": "",
        "senderr": false,
        "headers": [],
        "x": 570,
        "y": 80,
        "wires": [
            [
                "c3746a81c58e797a"
            ]
        ]
    },
    {
        "id": "1d57a237256e69e7",
        "type": "function",
        "z": "1ad2c5724dad60ec",
        "name": "function 1",
        "func": "msg.headers = { \"Content-Type\": \"application/json\" };\nmsg.payload = { value: msg.payload.value };\nreturn msg;\n",
        "outputs": 1,
        "timeout": 0,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 400,
        "y": 80,
        "wires": [
            [
                "2e10e2177a83dbbf"
            ]
        ]
    },
    {
        "id": "ecd705d7eabc8086",
        "type": "mqtt in",
        "z": "1ad2c5724dad60ec",
        "name": "",
        "topic": "hivemq-edge/machine/s71500/rVib",
        "qos": "2",
        "datatype": "auto-detect",
        "broker": "08f8856327f4fe37",
        "nl": false,
        "rap": true,
        "rh": 0,
        "inputs": 0,
        "x": 160,
        "y": 80,
        "wires": [
            [
                "1d57a237256e69e7"
            ]
        ]
    },
    {
        "id": "67d96c79df053896",
        "type": "mqtt out",
        "z": "1ad2c5724dad60ec",
        "name": "",
        "topic": "hivemq-edge/machine/s71500/anomaly",
        "qos": "",
        "retain": "",
        "respTopic": "",
        "contentType": "",
        "userProps": "",
        "correl": "",
        "expiry": "",
        "broker": "08f8856327f4fe37",
        "x": 600,
        "y": 160,
        "wires": []
    },
    {
        "id": "c3746a81c58e797a",
        "type": "function",
        "z": "1ad2c5724dad60ec",
        "name": "function 2",
        "func": "msg.payload = msg.payload.status;\nreturn msg;",
        "outputs": 1,
        "timeout": 0,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 300,
        "y": 160,
        "wires": [
            [
                "67d96c79df053896"
            ]
        ]
    },
    {
        "id": "08f8856327f4fe37",
        "type": "mqtt-broker",
        "name": "",
        "broker": "1eba33276f3f4d2b9852f78bf6ab2880.s1.eu.hivemq.cloud",
        "port": "8883",
        "tls": "",
        "clientid": "",
        "autoConnect": true,
        "usetls": true,
        "protocolVersion": 4,
        "keepalive": 60,
        "cleansession": true,
        "autoUnsubscribe": true,
        "birthTopic": "",
        "birthQos": "0",
        "birthRetain": "false",
        "birthPayload": "",
        "birthMsg": {},
        "closeTopic": "",
        "closeQos": "0",
        "closeRetain": "false",
        "closePayload": "",
        "closeMsg": {},
        "willTopic": "",
        "willQos": "0",
        "willRetain": "false",
        "willPayload": "",
        "willMsg": {},
        "userProps": "",
        "sessionExpiry": ""
    }
]
```

{% endcode %}
{% endstep %}

{% step %}

#### Bridge the HiveMQ Cloud MQTT topic with HiveMQ Edge MQTT topic

Go to MQTT bridge in HiveMQ Edge and add a remote subscription in your current bridge as shown below:

<figure><img src="https://1831238825-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FLd2M9UNfMTnw9DjDdZJz%2Fuploads%2F6dsLevpxweEJggPefsGl%2Fimage.png?alt=media&#x26;token=827fc4c3-7676-4c87-ae86-3d47f87ac94f" alt=""><figcaption></figcaption></figure>

<figure><img src="https://1831238825-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FLd2M9UNfMTnw9DjDdZJz%2Fuploads%2FQrKQVQllTD4ajg0Pi6Vk%2Fimage.png?alt=media&#x26;token=d568f6f5-435a-45be-bf1f-af057e4eaeb3" alt="" width="375"><figcaption></figcaption></figure>

Now navigate to Broker Configuration and 'Enable loop prevention'. This will prevent looping of data flow

<figure><img src="https://1831238825-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FLd2M9UNfMTnw9DjDdZJz%2Fuploads%2Ff50UAlCyi6c5fwQ9fYCf%2Fimage.png?alt=media&#x26;token=090fbfcd-5e7b-4181-8e0f-f6734e4bbf51" alt="" width="375"><figcaption></figcaption></figure>
{% endstep %}

{% step %}

#### Visualize Anomaly in HiveMQ Edge

Now that the anomaly arrives to MQTT Edge, you can visualize it with tools like Node-RED as shown below:

<figure><img src="https://1831238825-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FLd2M9UNfMTnw9DjDdZJz%2Fuploads%2Ft2uKg7nB5W5OTRC0nGP3%2Fimage.png?alt=media&#x26;token=1d4c1529-1cd8-4aaa-ba2e-e351d10bf0b5" alt=""><figcaption></figcaption></figure>

<figure><img src="https://1831238825-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FLd2M9UNfMTnw9DjDdZJz%2Fuploads%2FCgm4f1cEtmphGG1eKsx2%2Fimage.png?alt=media&#x26;token=c2416802-6777-41e1-af90-87a0df6d1568" alt="" width="321"><figcaption></figcaption></figure>

{% hint style="info" %}
You can further sends the anomaly results back to PLC via OPC UA.
{% endhint %}
{% endstep %}
{% endstepper %}

***

## Complete Workflow

{% embed url="<https://files.gitbook.com/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FLd2M9UNfMTnw9DjDdZJz%2Fuploads%2FvPGMMRYPhiqsUXnsEKhl%2FHiveMQ%20Anomaly%20detection.mp4?alt=media&token=730bd29b-d752-4e25-976d-e49e406924bd>" %}

## ♥️ Work With Me

I regularly test **industrial automation and IIoT devices**. If you’d like me to **review your product** or showcase it in my courses and YouTube channel:

📧 Email: <rajvir@codeandcompile.com> or drop me a message on [LinkedIn](https://www.linkedin.com/in/singhrajvir/)
