Manufacturing IoT Implementation

·15 min read·Specialized Domainsintermediate

Why connected factories matter more than ever for developers building resilient systems

industrial edge gateway with ethernet cables and 24V DC power supply mounted inside a metal control cabinet on a factory floor

The term "Manufacturing IoT" shows up in slide decks and vendor pitches everywhere, but the real work happens in dusty plant floors, inside control cabinets, and across networks that were designed decades before TLS was a thing. If you’ve ever tried to pull data from a PLC that only speaks Modbus TCP, or wrestled with an MQTT broker that keeps dropping connections because of poor Wi‑Fi near a CNC machine, you know this isn’t about shiny dashboards. It’s about making reliable, maintainable systems that can survive power glitches, operator errors, and the reality of legacy hardware.

I’m not going to pretend this is simple. I’ve seen sensor streams collapse under backpressure, spent afternoons decoding ambiguous CSV exports from SCADA systems, and learned the hard way that adding one more edge gateway often means owning one more Linux box in a humid environment. But I’ve also seen the payoff: live OEE that actually reflects reality, anomaly detection that catches bearing failures weeks before downtime, and safety alerts that reach the right person in seconds instead of minutes.

This post is for developers and technically curious readers who want to build or maintain manufacturing IoT systems. We’ll talk about the landscape, pick a pragmatic stack, and walk through real patterns you’ll use in production. Code examples are Python based because it’s ubiquitous at the edge and in analytics. Along the way, I’ll share what tends to work, where it breaks, and how to avoid common pitfalls.

Where Manufacturing IoT stands today

Manufacturing IoT sits at the intersection of Operational Technology (OT) and Information Technology (IT). OT is the world of PLCs, drives, sensors, and hard real time constraints. IT is the world of databases, APIs, microservices, and cloud scale. In modern plants, these two worlds must coexist, but they don’t always play nicely.

In real-world projects, teams typically deploy edge gateways that bridge protocols like Modbus TCP, OPC UA, or Ethernet/IP into MQTT or HTTP. The data lands in a time-series database, gets processed by stream jobs, and eventually feeds dashboards and alerting systems. Security is a growing focus: the Purdue model is still referenced, and Zero Trust is creeping into plant networks. Standards like OPC UA provide a path toward more secure, modelled data, but legacy gear remains dominant.

Who uses this? Manufacturers of all sizes, systems integrators, and platform vendors. Small teams might start with a single gateway and a Grafana dashboard. Larger organizations roll out fleets of gateways managed by Kubernetes at the edge, with cloud-based control planes. The choice between open-source stacks (Node‑RED, EMQX, Telegraf, InfluxDB, Grafana) and commercial platforms (AVEVA, PTC, AWS IoT SiteWise, Azure IoT Hub) often comes down to compliance, support, and integration with existing MES/ERP systems.

Compared to alternatives, direct database ingestion (e.g., Telegraf writing to InfluxDB) is simpler but less flexible. A message bus like MQTT decouples producers and consumers, which is crucial when you need to add a new analytics service without touching the edge. OPC UA shines for complex data models and security, but it’s heavier to implement. HTTP/REST is fine for intermittent batch uploads, but not ideal for streaming telemetry. For most plants, MQTT plus a time-series database is the pragmatic baseline.

Core concepts and practical patterns

Data flow architecture

Most manufacturing IoT systems follow a simple pattern:

  1. Collect: Read from PLCs, sensors, or existing SCADA systems.
  2. Normalize: Convert proprietary formats to a common schema.
  3. Transport: Publish to MQTT or buffer locally if the network fails.
  4. Process: Enrich, aggregate, and validate in a stream or microservice.
  5. Store: Write to a time-series database.
  6. Consume: Serve dashboards, alerts, and integrations.

At the edge, you’ll run a gateway that handles connectivity and buffering. In the cloud or on-prem, you’ll run services for processing and storage. Think in terms of buffers and backpressure: if the database slows down, the MQTT broker shouldn’t crash, and the edge should persist data locally.

Language choices and tradeoffs

  • Python: Great for edge logic and analytics due to rich libraries (pymodbus, opcua, paho-mqtt). Easy to prototype, but mind the GIL for CPU-bound tasks.
  • Node.js: Often used for MQTT services and lightweight APIs. Strong I/O performance, but less common for heavy data processing.
  • Go: Popular for high-throughput stream processing and gateway services. Lower memory footprint, strong concurrency model.
  • Rust: Gaining traction for safety-critical edge components. Steep learning curve but excellent for reliability.

For this post, we’ll stick with Python, as it’s the most accessible and widely used in real-world manufacturing IoT.

Telemetry schema and metadata

Avoid arbitrary key-value dumps. A stable schema reduces downstream breakage. Common fields:

  • device_id: unique machine identifier
  • timestamp: UTC ISO 8601
  • signal_name: e.g., temperature, vibration, cycle_count
  • value: numeric or string
  • quality: good/bad/uncertain (OPC UA concept)
  • tags: location, line, asset class

Example schema in JSON:

{
  "device_id": "CNC-01",
  "timestamp": "2025-10-15T13:45:02Z",
  "signal_name": "spindle_temperature",
  "value": 68.7,
  "quality": "good",
  "tags": {
    "line": "L1",
    "station": "ST3",
    "asset_class": "motor"
  }
}

Practical implementation: a minimal MQTT + Modbus gateway

Let’s build a small, realistic edge service. We’ll read Modbus registers from a machine, publish to MQTT, and include local buffering if the network is down. This is the kind of service you’ll deploy on an industrial PC or Raspberry Pi-class device.

Folder structure

manufacturing-iot-edge/
├── config/
│   └── gateway.yaml
├── src/
│   ├── modbus_reader.py
│   ├── mqtt_publisher.py
│   ├── buffer.py
│   └── main.py
├── data/
│   └── buffer.db
├── logs/
│   └── app.log
├── requirements.txt
└── README.md

Dependencies

pymodbus==3.5.4
paho-mqtt==2.1.0
pyyaml==6.0.1
tenacity==8.2.3

Configuration

# config/gateway.yaml
modbus:
  host: "192.168.1.50"
  port: 502
  unit_id: 1
  registers:
    - name: "spindle_temperature"
      address: 100
      count: 2   # float32
      type: "float32"
    - name: "cycle_count"
      address: 102
      count: 1
      type: "uint16"

mqtt:
  host: "mqtt.local"
  port: 1883
  topic: "factory/line1/cnc01/telemetry"
  qos: 1
  client_id: "edge-gateway-cnc01"

buffer:
  path: "data/buffer.db"
  max_rows: 10000
  retry_interval_seconds: 10

Buffering with SQLite

SQLite is a simple, reliable local buffer. It’s a good fit for edge devices with limited resources and helps survive network outages.

# src/buffer.py
import sqlite3
import json
import time
from pathlib import Path
from typing import List, Dict, Any

class LocalBuffer:
    def __init__(self, path: str, max_rows: int = 10000):
        self.path = Path(path)
        self.max_rows = max_rows
        self._init_db()

    def _init_db(self):
        conn = sqlite3.connect(self.path)
        try:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS telemetry (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    created_at REAL NOT NULL,
                    payload TEXT NOT NULL
                )
            """)
            conn.commit()
        finally:
            conn.close()

    def push(self, payload: Dict[str, Any]) -> None:
        conn = sqlite3.connect(self.path)
        try:
            conn.execute(
                "INSERT INTO telemetry (created_at, payload) VALUES (?, ?)",
                (time.time(), json.dumps(payload))
            )
            # Simple retention policy
            conn.execute("""
                DELETE FROM telemetry
                WHERE id IN (
                    SELECT id FROM telemetry
                    ORDER BY id ASC
                    LIMIT (SELECT MAX(0, COUNT(*) - ?) FROM telemetry)
                )
            """, (self.max_rows,))
            conn.commit()
        finally:
            conn.close()

    def drain(self, batch_size: int = 100) -> List[Dict[str, Any]]:
        conn = sqlite3.connect(self.path)
        conn.row_factory = sqlite3.Row
        try:
            rows = conn.execute(
                "SELECT id, payload FROM telemetry ORDER BY id ASC LIMIT ?",
                (batch_size,)
            ).fetchall()
            items = [json.loads(row["payload"]) for row in rows]
            if items:
                conn.execute(
                    "DELETE FROM telemetry WHERE id IN ({})".format(
                        ",".join(str(row["id"]) for row in rows)
                    )
                )
                conn.commit()
            return items
        finally:
            conn.close()

Reading Modbus

We’ll use pymodbus to read registers. Keep timeouts conservative and add retries.

# src/modbus_reader.py
from pymodbus.client import ModbusTcpClient
from pymodbus.payload import BinaryPayloadDecoder
from pymodbus.constants import Endian
import struct
import logging

logger = logging.getLogger(__name__)

class ModbusReader:
    def __init__(self, host: str, port: int, unit_id: int):
        self.client = ModbusTcpClient(host=host, port=port, unit_id=unit_id, timeout=3.0)

    def read_float32(self, address: int, count: int = 2):
        # count=2 for 32-bit float (two 16-bit registers)
        rr = self.client.read_holding_registers(address, count)
        if rr.isError():
            raise RuntimeError(f"Modbus error: {rr}")
        decoder = BinaryPayloadDecoder.fromRegisters(rr.registers, byteorder=Endian.BIG, wordorder=Endian.BIG)
        value = decoder.decode_32bit_float()
        return value

    def read_uint16(self, address: int, count: int = 1):
        rr = self.client.read_holding_registers(address, count)
        if rr.isError():
            raise RuntimeError(f"Modbus error: {rr}")
        decoder = BinaryPayloadDecoder.fromRegisters(rr.registers, byteorder=Endian.BIG, wordorder=Endian.BIG)
        value = decoder.decode_16bit_uint()
        return value

    def connect(self):
        if not self.client.connect():
            raise RuntimeError("Failed to connect to Modbus server")

    def close(self):
        self.client.close()

MQTT publisher with backoff

We use tenacity for retry logic and publish with QoS 1 for at-least-once delivery.

# src/mqtt_publisher.py
import paho.mqtt.client as mqtt
import json
import logging
from tenacity import retry, stop_after_attempt, wait_exponential

logger = logging.getLogger(__name__)

class MQTTPublisher:
    def __init__(self, host: str, port: int, topic: str, qos: int = 1, client_id: str = "edge"):
        self.host = host
        self.port = port
        self.topic = topic
        self.qos = qos
        self.client = mqtt.Client(client_id=client_id)
        self.client.on_connect = self._on_connect
        self.client.on_disconnect = self._on_disconnect

    def _on_connect(self, client, userdata, flags, rc, properties=None):
        if rc == 0:
            logger.info("MQTT connected")
        else:
            logger.warning(f"MQTT connect failed code {rc}")

    def _on_disconnect(self, client, userdata, rc):
        logger.warning(f"MQTT disconnected code {rc}")

    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=10))
    def connect(self):
        self.client.connect(self.host, self.port, 60)
        self.client.loop_start()

    def publish(self, payload: dict):
        try:
            result = self.client.publish(
                self.topic, json.dumps(payload), qos=self.qos, retain=False
            )
            result.wait_for_publish()
            if not result.is_published():
                raise RuntimeError("MQTT message not published")
        except Exception as e:
            logger.error(f"Publish failed: {e}")
            raise

    def disconnect(self):
        self.client.loop_stop()
        self.client.disconnect()

Wiring it together

This main loop demonstrates read, normalize, publish, and local buffer. Notice how we separate concerns: each module has a single responsibility, making it easier to test and maintain.

# src/main.py
import yaml
import time
import logging
from pathlib import Path
from datetime import datetime, timezone
from modbus_reader import ModbusReader
from mqtt_publisher import MQTTPublisher
from buffer import LocalBuffer

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
    handlers=[
        logging.FileHandler("logs/app.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

def load_config(path: str):
    with open(path) as f:
        return yaml.safe_load(f)

def build_payload(device_id: str, readings: dict) -> dict:
    return {
        "device_id": device_id,
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "readings": readings,
        "tags": {"line": "L1", "station": "ST3"}
    }

def main():
    cfg = load_config("config/gateway.yaml")

    reader = ModbusReader(
        host=cfg["modbus"]["host"],
        port=cfg["modbus"]["port"],
        unit_id=cfg["modbus"]["unit_id"]
    )
    reader.connect()

    publisher = MQTTPublisher(
        host=cfg["mqtt"]["host"],
        port=cfg["mqtt"]["port"],
        topic=cfg["mqtt"]["topic"],
        qos=cfg["mqtt"]["qos"],
        client_id=cfg["mqtt"]["client_id"]
    )
    publisher.connect()

    buffer = LocalBuffer(
        path=cfg["buffer"]["path"],
        max_rows=cfg["buffer"]["max_rows"]
    )

    try:
        while True:
            try:
                readings = {}
                for reg in cfg["modbus"]["registers"]:
                    if reg["type"] == "float32":
                        value = reader.read_float32(reg["address"], reg["count"])
                    elif reg["type"] == "uint16":
                        value = reader.read_uint16(reg["address"], reg["count"])
                    else:
                        raise ValueError(f"Unknown type {reg['type']}")
                    readings[reg["name"]] = value

                payload = build_payload("CNC-01", readings)

                # Try to publish; if it fails, push to local buffer
                try:
                    publisher.publish(payload)
                    logger.info("Published payload")
                except Exception as e:
                    logger.warning(f"Publish error, buffering: {e}")
                    buffer.push(payload)

                # Attempt to drain buffer if any
                drained = buffer.drain(batch_size=10)
                for item in drained:
                    try:
                        publisher.publish(item)
                        logger.info("Drained buffered payload")
                    except Exception as e:
                        # Put back for next round
                        buffer.push(item)
                        break

            except Exception as e:
                logger.error(f"Read cycle error: {e}")
                time.sleep(5)

            time.sleep(1)

    except KeyboardInterrupt:
        logger.info("Shutting down")
    finally:
        reader.close()
        publisher.disconnect()

if __name__ == "__main__":
    main()

Running the service

python -m venv venv
source venv/bin/activate
pip install -r requirements.txt
python src/main.py

Observations and pitfalls

  • Time sync: Ensure NTP is running. Without accurate time, analytics become unreliable.
  • Modbus endianness: Different PLCs use different register orders. Test with known values.
  • MQTT QoS: QoS 1 is usually enough, but if you need exactly-once, add deduplication logic.
  • Backpressure: If your database lags, the MQTT broker can bloat. Use local buffering and set retention policies.
  • Security: Start with TLS and authentication. Even inside a plant network, breaches happen. If you can’t use TLS, segment networks and use client certs where possible.

Real-world example: streaming OEE calculation

OEE (Overall Equipment Effectiveness) combines availability, performance, and quality. In practice, you’ll compute it from signals like cycle counts, runtime states, and defect counts.

Let’s stream OEE using a small service that subscribes to telemetry and publishes OEE metrics.

# src/oee_service.py
import json
import logging
import paho.mqtt.client as mqtt
from datetime import datetime, timezone

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class OEEAggregator:
    def __init__(self):
        self.state = {
            "planned_time": 0.0,    # seconds
            "run_time": 0.0,        # seconds
            "ideal_cycle_time": 1.2, # seconds per part
            "total_parts": 0,
            "good_parts": 0
        }
        self.last_tick = None

    def on_telemetry(self, payload: dict):
        # Assume telemetry includes "run_status" (1 = running, 0 = stopped)
        # and "cycle_count" and "defect_count"
        ts = payload.get("timestamp")
        readings = payload.get("readings", {})
        status = readings.get("run_status", 0)
        cycle_count = readings.get("cycle_count", 0)
        defect_count = readings.get("defect_count", 0)

        now = datetime.fromisoformat(ts.replace("Z", "+00:00"))
        if self.last_tick:
            dt = (now - self.last_tick).total_seconds()
            self.state["planned_time"] += dt
            if status == 1:
                self.state["run_time"] += dt

        self.state["total_parts"] = cycle_count
        self.state["good_parts"] = max(cycle_count - defect_count, 0)
        self.last_tick = now

    def compute_oee(self) -> dict:
        availability = self.state["run_time"] / max(self.state["planned_time"], 1e-9)
        performance = (self.state["total_parts"] * self.state["ideal_cycle_time"]) / max(self.state["run_time"], 1e-9)
        quality = self.state["good_parts"] / max(self.state["total_parts"], 1e-9)
        oee = availability * performance * quality
        return {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "availability": availability,
            "performance": performance,
            "quality": quality,
            "oee": oee
        }

def on_connect(client, userdata, flags, rc, properties=None):
    logger.info(f"OEE service connected: {rc}")
    client.subscribe("factory/line1/cnc01/telemetry")

def on_message(client, userdata, msg):
    try:
        payload = json.loads(msg.payload.decode())
        aggregator = userdata
        aggregator.on_telemetry(payload)
        oee = aggregator.compute_oee()
        # Publish OEE to another topic
        client.publish("factory/line1/cnc01/oee", json.dumps(oee), qos=1)
        logger.info(f"OEE published: {oee['oee']:.3f}")
    except Exception as e:
        logger.error(f"OEE service error: {e}")

def main():
    aggregator = OEEAggregator()
    client = mqtt.Client(client_id="oee-service")
    client.user_data_set(aggregator)
    client.on_connect = on_connect
    client.on_message = on_message

    client.connect("mqtt.local", 1883, 60)
    client.loop_forever()

if __name__ == "__main__":
    main()

Why this pattern works in production:

  • State lives in memory for speed. For durability, back it up to Redis or a local file periodically.
  • OEE is computed per device and per line. You can add tags for aggregation.
  • The service is decoupled from the edge gateway. You can restart it without affecting data collection.

Honest evaluation: strengths and tradeoffs

When to use this stack:

  • You have legacy PLCs that speak Modbus or Ethernet/IP.
  • You need a simple, scalable way to ingest and process telemetry.
  • You want to prototype quickly and iterate as requirements evolve.

Where it struggles:

  • Hard real-time control: This stack is for monitoring and analytics, not safety-critical control.
  • Very high frequency data: If you’re sampling at >1kHz, consider edge preprocessing and efficient serialization (e.g., Protocol Buffers).
  • Highly regulated environments: You may need formal validation and vendor-supported platforms.

Alternatives to consider:

  • OPC UA: Strong security and rich data modelling, but heavier to implement. Best when you need complex metadata or certified interoperability.
  • Vendor platforms: Faster to deploy, often with built-in connectors and compliance. Lock-in and licensing are tradeoffs.
  • Time-series DB only: Simpler, but you lose decoupling and streaming transformations.

Personal experience: learning curves and common mistakes

A few lessons learned the hard way:

  • Don’t trust wall time. Use UTC everywhere. I once debugged a “performance drop” that was actually a timezone issue in Grafana.
  • Start with QoS 1. QoS 0 is tempting for throughput, but dropped messages hurt trust. QoS 2 rarely adds value at the edge due to overhead.
  • Buffer locally. Networks fail. Power cycles happen. If you don’t buffer, you’ll lose data and lose stakeholder confidence.
  • Model your data. Random keys in JSON turn into maintenance nightmares. Pick names once and stick to them.
  • Keep the loop simple. Avoid heavy processing in the main loop. Use background threads or separate services.

Moments where this stack shines:

  • Catching a bearing failure: Vibration slowly increased over a week. The trend in InfluxDB + Grafana made it obvious before the machine tripped.
  • Operator buy-in: When the dashboard shows real cycle counts rather than manual logs, operators trust the system and start using it.
  • Maintenance scheduling: OEE highlighted downtime patterns, allowing planned interventions during breaks rather than during production.

Getting started: workflow and mental model

Focus on workflow over commands:

  • Decide your gateway hardware: Industrial PC for harsh environments, Raspberry Pi for pilots.
  • Choose protocol bridges: Modbus for legacy, OPC UA for newer machines. Prefer a single MQTT interface for downstream services.
  • Define your schema: device_id, timestamp, signal_name, value, quality, tags. Document it.
  • Add local persistence: SQLite buffer is fine for small sites; consider a lightweight queue (e.g., NanoMQ) for larger flows.
  • Deploy as services: Use systemd for edge services. Keep logs structured and rotation enabled.
  • Start small: One machine, one dashboard. Prove reliability. Then scale.

Directory mental model:

  • config: One file per device class or site.
  • src: Small, single-purpose modules.
  • data: Persistent buffers; set retention limits.
  • logs: Keep at most 7 days locally; ship to a central log service.
  • tests: Unit tests for parsing and serialization; integration tests with a local MQTT broker.

Free learning resources

Summary: who should use this and who might skip it

Use Manufacturing IoT with an MQTT + time-series approach if:

  • You’re integrating legacy PLCs into modern analytics.
  • You need a decoupled, maintainable system that can survive network hiccups.
  • You want fast iteration and a clear path to scale.

Consider skipping or adjusting if:

  • You’re building safety-critical control loops that require deterministic real-time behavior.
  • Your environment is heavily regulated and requires vendor-supported platforms.
  • Your data volume or frequency demands custom, high-performance pipelines at the edge.

A final thought: reliability beats features. Operators care that the system works and the numbers are right. Stakeholders care that alerts are timely and actionable. Build small, observable components, add buffers where you can, and keep the data model stable. That’s how Manufacturing IoT transitions from pilot to production without drama.

*** here is placeholder: query = sensor network *** *** alt text for image = industrial sensors and actuators connected via an edge gateway, showing twisted pair ethernet and shielded sensor cables in a factory setting ***