Manufacturing and IoT with AWS: Industry 4.0 Implementation Guide | Forrict Skip to main content
IoT Manufacturing Industry 4.0 AWS Services

Manufacturing and IoT with AWS: Industry 4.0 Implementation Guide

Forrict Team
Manufacturing and IoT with AWS: Industry 4.0 Implementation Guide
Complete guide to implementing Industrial IoT on AWS with IoT Core, Greengrass, predictive maintenance, and real-time factory monitoring solutions.

Dutch manufacturing companies are embracing Industry 4.0, transforming traditional factories into smart, connected facilities. Companies like ASML, Philips, and DAF Trucks leverage Industrial IoT (IIoT) to optimize production, reduce downtime, and improve product quality. AWS provides a comprehensive IoT platform that enables manufacturers to collect, process, and analyze data from thousands of connected devices in real-time.

The Industry 4.0 Revolution

Industry 4.0 represents the fourth industrial revolution, characterized by the integration of cyber-physical systems, Internet of Things (IoT), cloud computing, and artificial intelligence into manufacturing processes. For Dutch manufacturers, this transformation is critical for maintaining competitiveness in the global market.

Business Drivers for IoT in Manufacturing

Predictive Maintenance: Traditional preventive maintenance schedules equipment servicing at fixed intervals, leading to either premature part replacement or unexpected failures. Predictive maintenance uses real-time sensor data and machine learning to predict failures before they occur, reducing downtime by up to 50% and maintenance costs by 30%.

Operational Efficiency: Real-time monitoring of production lines identifies bottlenecks, quality issues, and inefficiencies instantly. Dutch manufacturers report 15-25% improvements in Overall Equipment Effectiveness (OEE) after implementing IoT monitoring.

Quality Control: Automated quality inspection using computer vision and sensor data catches defects earlier in the production process, reducing waste and improving customer satisfaction.

Supply Chain Optimization: IoT-enabled asset tracking provides real-time visibility into inventory levels, shipment locations, and supply chain disruptions, enabling faster response to changes.

Energy Management: Manufacturing accounts for significant energy consumption. IoT sensors identify energy waste and optimize consumption patterns, reducing costs and supporting sustainability goals.

AWS IoT Services Overview

AWS offers a complete IoT stack for manufacturing:

AWS IoT Core: Managed cloud service that connects billions of devices securely and routes messages to AWS services and applications.

AWS IoT Greengrass: Extends AWS capabilities to edge devices, enabling local processing, machine learning inference, and operation even when disconnected from the cloud.

AWS IoT SiteWise: Collects, organizes, and analyzes industrial equipment data at scale, providing insights into operational performance.

AWS IoT Events: Detects and responds to events from IoT sensors and applications, triggering automated actions.

AWS IoT Analytics: Processes, enriches, stores, and analyzes IoT data for decision-making.

Amazon Timestream: Purpose-built time-series database for IoT applications, storing and analyzing trillions of events per day.

AWS IoT Core Architecture for Manufacturing

A typical manufacturing IoT architecture on AWS consists of edge devices collecting sensor data, secure communication channels to the cloud, data processing pipelines, and analytics/ML services for insights.

Reference Architecture

Factory Floor
├── Sensors & PLCs
│   ├── Temperature sensors
│   ├── Vibration sensors
│   ├── Pressure sensors
│   └── Machine controllers

├── Edge Gateways (IoT Greengrass)
│   ├── Local data processing
│   ├── ML inference
│   ├── Protocol translation
│   └── Data aggregation

└── Network (Secure)
    └── TLS 1.2+ encryption


         │ MQTT/HTTPS


AWS Cloud
├── AWS IoT Core
│   ├── Device authentication
│   ├── Message routing
│   └── Rules engine

├── Data Processing
│   ├── AWS IoT Analytics
│   ├── Amazon Kinesis
│   └── AWS Lambda

├── Storage
│   ├── Amazon Timestream (time-series)
│   ├── Amazon S3 (raw data lake)
│   └── DynamoDB (metadata)

├── Analytics & ML
│   ├── Amazon SageMaker (predictive models)
│   ├── AWS IoT SiteWise (OEE metrics)
│   └── Amazon QuickSight (dashboards)

└── Applications
    ├── Real-time monitoring dashboards
    ├── Predictive maintenance alerts
    └── Quality control systems

Implementing IoT Core with CDK

Here’s a complete CDK implementation for a manufacturing IoT infrastructure:

import * as cdk from 'aws-cdk-lib';
import * as iot from 'aws-cdk-lib/aws-iot';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as timestream from 'aws-cdk-lib/aws-timestream';
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as sns from 'aws-cdk-lib/aws-sns';
import * as subscriptions from 'aws-cdk-lib/aws-sns-subscriptions';
import { Construct } from 'constructs';

export class ManufacturingIoTStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // S3 bucket for raw IoT data
    const dataBucket = new s3.Bucket(this, 'IoTDataBucket', {
      encryption: s3.BucketEncryption.S3_MANAGED,
      versioned: true,
      lifecycleRules: [
        {
          transitions: [
            {
              storageClass: s3.StorageClass.INTELLIGENT_TIERING,
              transitionAfter: cdk.Duration.days(30),
            },
            {
              storageClass: s3.StorageClass.GLACIER,
              transitionAfter: cdk.Duration.days(90),
            },
          ],
        },
      ],
    });

    // Timestream database for time-series data
    const timestreamDb = new timestream.CfnDatabase(this, 'IoTDatabase', {
      databaseName: 'manufacturing-iot',
    });

    const timestreamTable = new timestream.CfnTable(this, 'SensorDataTable', {
      databaseName: timestreamDb.databaseName!,
      tableName: 'sensor-data',
      retentionProperties: {
        MemoryStoreRetentionPeriodInHours: '24',
        MagneticStoreRetentionPeriodInDays: '365',
      },
    });
    timestreamTable.addDependency(timestreamDb);

    // Kinesis stream for real-time processing
    const dataStream = new kinesis.Stream(this, 'IoTDataStream', {
      streamName: 'manufacturing-iot-stream',
      shardCount: 2,
      retentionPeriod: cdk.Duration.days(7),
      encryption: kinesis.StreamEncryption.MANAGED,
    });

    // SNS topic for alerts
    const alertTopic = new sns.Topic(this, 'AlertTopic', {
      displayName: 'Manufacturing IoT Alerts',
    });
    alertTopic.addSubscription(
      new subscriptions.EmailSubscription('operations@forrict.nl')
    );

    // Lambda function for data processing
    const dataProcessorFunction = new lambda.Function(this, 'DataProcessor', {
      runtime: lambda.Runtime.PYTHON_3_11,
      handler: 'index.handler',
      code: lambda.Code.fromAsset('lambda/iot-processor'),
      timeout: cdk.Duration.seconds(60),
      memorySize: 512,
      environment: {
        TIMESTREAM_DB: timestreamDb.databaseName!,
        TIMESTREAM_TABLE: timestreamTable.tableName!,
        ALERT_TOPIC_ARN: alertTopic.topicArn,
      },
    });

    // Grant permissions to write to Timestream
    dataProcessorFunction.addToRolePolicy(new iam.PolicyStatement({
      effect: iam.Effect.ALLOW,
      actions: [
        'timestream:WriteRecords',
        'timestream:DescribeEndpoints',
      ],
      resources: ['*'],
    }));

    alertTopic.grantPublish(dataProcessorFunction);

    // Lambda function for anomaly detection
    const anomalyDetectorFunction = new lambda.Function(this, 'AnomalyDetector', {
      runtime: lambda.Runtime.PYTHON_3_11,
      handler: 'index.handler',
      code: lambda.Code.fromAsset('lambda/anomaly-detector'),
      timeout: cdk.Duration.seconds(60),
      memorySize: 1024,
      environment: {
        ALERT_TOPIC_ARN: alertTopic.topicArn,
        TIMESTREAM_DB: timestreamDb.databaseName!,
        TIMESTREAM_TABLE: timestreamTable.tableName!,
      },
    });

    anomalyDetectorFunction.addToRolePolicy(new iam.PolicyStatement({
      effect: iam.Effect.ALLOW,
      actions: [
        'timestream:Select',
        'timestream:DescribeEndpoints',
      ],
      resources: ['*'],
    }));

    alertTopic.grantPublish(anomalyDetectorFunction);

    // IoT Rule to process sensor data
    const iotRole = new iam.Role(this, 'IoTRuleRole', {
      assumedBy: new iam.ServicePrincipal('iot.amazonaws.com'),
    });

    dataBucket.grantWrite(iotRole);
    dataStream.grantWrite(iotRole);
    dataProcessorFunction.grantInvoke(iotRole);

    // IoT Topic Rule for sensor data
    new iot.CfnTopicRule(this, 'SensorDataRule', {
      topicRulePayload: {
        sql: "SELECT * FROM 'factory/+/sensor/#'",
        description: 'Route sensor data to processing pipeline',
        actions: [
          {
            lambda: {
              functionArn: dataProcessorFunction.functionArn,
            },
          },
          {
            kinesis: {
              roleArn: iotRole.roleArn,
              streamName: dataStream.streamName,
              partitionKey: '${topic(2)}', // Device ID
            },
          },
          {
            s3: {
              roleArn: iotRole.roleArn,
              bucketName: dataBucket.bucketName,
              key: 'raw/${timestamp()}/${topic(2)}.json',
            },
          },
        ],
      },
    });

    // IoT Topic Rule for anomaly detection
    new iot.CfnTopicRule(this, 'AnomalyDetectionRule', {
      topicRulePayload: {
        sql: "SELECT * FROM 'factory/+/sensor/#' WHERE temperature > 80 OR vibration > 10",
        description: 'Detect anomalies in sensor data',
        actions: [
          {
            lambda: {
              functionArn: anomalyDetectorFunction.functionArn,
            },
          },
          {
            sns: {
              roleArn: iotRole.roleArn,
              targetArn: alertTopic.topicArn,
              messageFormat: 'JSON',
            },
          },
        ],
      },
    });

    alertTopic.grantPublish(iotRole);

    // IoT Thing Type for manufacturing devices
    new iot.CfnThingType(this, 'SensorThingType', {
      thingTypeName: 'ManufacturingSensor',
      thingTypeProperties: {
        thingTypeDescription: 'Industrial sensor for manufacturing',
        searchableAttributes: ['location', 'machineType', 'factory'],
      },
    });

    // IoT Policy for devices
    new iot.CfnPolicy(this, 'DevicePolicy', {
      policyName: 'ManufacturingDevicePolicy',
      policyDocument: {
        Version: '2012-10-17',
        Statement: [
          {
            Effect: 'Allow',
            Action: ['iot:Connect'],
            Resource: ['arn:aws:iot:*:*:client/${iot:ClientId}'],
          },
          {
            Effect: 'Allow',
            Action: ['iot:Publish'],
            Resource: ['arn:aws:iot:*:*:topic/factory/*/sensor/*'],
          },
          {
            Effect: 'Allow',
            Action: ['iot:Subscribe'],
            Resource: ['arn:aws:iot:*:*:topicfilter/factory/*/command/*'],
          },
          {
            Effect: 'Allow',
            Action: ['iot:Receive'],
            Resource: ['arn:aws:iot:*:*:topic/factory/*/command/*'],
          },
        ],
      },
    });

    // Outputs
    new cdk.CfnOutput(this, 'DataBucketName', {
      value: dataBucket.bucketName,
      description: 'S3 bucket for raw IoT data',
    });

    new cdk.CfnOutput(this, 'TimestreamDatabase', {
      value: timestreamDb.databaseName!,
      description: 'Timestream database name',
    });

    new cdk.CfnOutput(this, 'KinesisStreamName', {
      value: dataStream.streamName,
      description: 'Kinesis stream for real-time processing',
    });
  }
}

This infrastructure provides secure device connectivity, real-time data processing, time-series storage, and automated anomaly detection.

IoT Device Simulation and Testing

Before deploying physical sensors, simulate IoT devices to test your infrastructure:

import json
import time
import random
import ssl
from datetime import datetime
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient

class ManufacturingSensorSimulator:
    def __init__(self, device_id, factory_id, machine_type):
        self.device_id = device_id
        self.factory_id = factory_id
        self.machine_type = machine_type

        # Initialize MQTT client
        self.mqtt_client = AWSIoTMQTTClient(device_id)
        self.mqtt_client.configureEndpoint("your-iot-endpoint.iot.eu-west-1.amazonaws.com", 8883)
        self.mqtt_client.configureCredentials(
            "AmazonRootCA1.pem",
            f"certs/{device_id}.private.key",
            f"certs/{device_id}.cert.pem"
        )

        # Configure connection
        self.mqtt_client.configureAutoReconnectBackoffTime(1, 32, 20)
        self.mqtt_client.configureOfflinePublishQueueing(-1)
        self.mqtt_client.configureDrainingFrequency(2)
        self.mqtt_client.configureConnectDisconnectTimeout(10)
        self.mqtt_client.configureMQTTOperationTimeout(5)

    def connect(self):
        """Connect to AWS IoT Core"""
        self.mqtt_client.connect()
        print(f"Device {self.device_id} connected to AWS IoT Core")

    def generate_sensor_data(self, introduce_anomaly=False):
        """Generate realistic sensor data"""
        # Normal operating ranges
        temperature = random.gauss(65, 5)  # Mean 65°C, std dev 5
        vibration = random.gauss(3, 0.5)   # Mean 3 mm/s, std dev 0.5
        pressure = random.gauss(100, 2)    # Mean 100 PSI, std dev 2
        rpm = random.gauss(1500, 50)       # Mean 1500 RPM, std dev 50
        power = random.gauss(75, 5)        # Mean 75 kW, std dev 5

        # Introduce anomalies for testing
        if introduce_anomaly:
            anomaly_type = random.choice(['overheat', 'vibration', 'pressure'])
            if anomaly_type == 'overheat':
                temperature = random.gauss(90, 10)  # Overheating
            elif anomaly_type == 'vibration':
                vibration = random.gauss(12, 2)     # Excessive vibration
            elif anomaly_type == 'pressure':
                pressure = random.gauss(120, 5)     # High pressure

        return {
            'deviceId': self.device_id,
            'factoryId': self.factory_id,
            'machineType': self.machine_type,
            'timestamp': int(time.time() * 1000),
            'sensors': {
                'temperature': round(temperature, 2),
                'vibration': round(vibration, 2),
                'pressure': round(pressure, 2),
                'rpm': round(rpm, 0),
                'power': round(power, 2),
            },
            'operationalStatus': 'running',
            'productionCount': random.randint(0, 100),
        }

    def publish_data(self, data):
        """Publish sensor data to IoT Core"""
        topic = f"factory/{self.factory_id}/sensor/{self.device_id}"
        message = json.dumps(data)
        self.mqtt_client.publish(topic, message, 1)
        print(f"Published to {topic}: {message}")

    def run_simulation(self, duration_seconds=3600, interval_seconds=5, anomaly_rate=0.01):
        """Run sensor simulation"""
        self.connect()

        start_time = time.time()
        message_count = 0

        try:
            while (time.time() - start_time) < duration_seconds:
                # Decide whether to introduce anomaly
                introduce_anomaly = random.random() < anomaly_rate

                # Generate and publish data
                data = self.generate_sensor_data(introduce_anomaly)
                self.publish_data(data)

                message_count += 1
                time.sleep(interval_seconds)

        except KeyboardInterrupt:
            print("\nSimulation stopped by user")

        finally:
            print(f"\nSimulation complete. Published {message_count} messages")
            self.mqtt_client.disconnect()

# Usage example
if __name__ == "__main__":
    # Simulate multiple machines in Amsterdam factory
    simulators = [
        ManufacturingSensorSimulator("CNC-001", "AMS-FACTORY-01", "CNC-Machine"),
        ManufacturingSensorSimulator("PRESS-001", "AMS-FACTORY-01", "Hydraulic-Press"),
        ManufacturingSensorSimulator("WELD-001", "AMS-FACTORY-01", "Welding-Robot"),
        ManufacturingSensorSimulator("ASSY-001", "AMS-FACTORY-01", "Assembly-Line"),
    ]

    import threading

    # Run simulators in parallel
    threads = []
    for sim in simulators:
        thread = threading.Thread(
            target=sim.run_simulation,
            args=(3600, 5, 0.02)  # 1 hour, 5 second intervals, 2% anomaly rate
        )
        thread.start()
        threads.append(thread)

    # Wait for all threads to complete
    for thread in threads:
        thread.join()

    print("All simulations complete")

This simulator creates realistic sensor data with occasional anomalies, perfect for testing your IoT pipeline.

AWS IoT Greengrass for Edge Computing

Manufacturing environments often require local processing due to network constraints, latency requirements, or operational continuity needs. AWS IoT Greengrass brings AWS capabilities to the factory floor.

Greengrass Use Cases

Local ML Inference: Run machine learning models directly on edge devices for real-time quality inspection, predictive maintenance, and process optimization without cloud round-trip latency.

Protocol Translation: Convert industrial protocols (Modbus, OPC-UA, Profinet) to MQTT for cloud connectivity.

Data Filtering and Aggregation: Process and aggregate sensor data locally, reducing bandwidth costs by sending only relevant data to the cloud.

Offline Operation: Continue processing data and making decisions even when connectivity to AWS is interrupted.

Implementing Greengrass Components

# Greengrass component for predictive maintenance
import json
import time
import numpy as np
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
    PublishToIoTCoreRequest,
    QOS
)

class PredictiveMaintenanceComponent:
    def __init__(self):
        self.ipc_client = awsiot.greengrasscoreipc.connect()
        self.sensor_buffer = []
        self.buffer_size = 100

        # Load pre-trained model (simplified example)
        self.failure_threshold = 0.85

    def process_sensor_data(self, sensor_data):
        """Process incoming sensor data"""
        self.sensor_buffer.append(sensor_data)

        # Keep buffer at specified size
        if len(self.sensor_buffer) > self.buffer_size:
            self.sensor_buffer.pop(0)

        # Run prediction when buffer is full
        if len(self.sensor_buffer) == self.buffer_size:
            prediction = self.predict_failure()

            if prediction['failure_probability'] > self.failure_threshold:
                self.send_alert(prediction)

    def predict_failure(self):
        """Predict equipment failure based on sensor trends"""
        # Extract sensor values
        temperatures = [d['sensors']['temperature'] for d in self.sensor_buffer]
        vibrations = [d['sensors']['vibration'] for d in self.sensor_buffer]

        # Simple trend analysis (replace with actual ML model)
        temp_trend = np.polyfit(range(len(temperatures)), temperatures, 1)[0]
        vib_trend = np.polyfit(range(len(vibrations)), vibrations, 1)[0]

        # Calculate failure probability
        failure_prob = 0.0

        if temp_trend > 0.1:  # Temperature rising
            failure_prob += 0.3
        if vib_trend > 0.05:  # Vibration increasing
            failure_prob += 0.4
        if temperatures[-1] > 80:  # Current temp high
            failure_prob += 0.2
        if vibrations[-1] > 8:  # Current vibration high
            failure_prob += 0.2

        return {
            'failure_probability': min(failure_prob, 1.0),
            'temperature_trend': temp_trend,
            'vibration_trend': vib_trend,
            'recommendation': self.get_recommendation(failure_prob),
            'timestamp': int(time.time() * 1000),
        }

    def get_recommendation(self, failure_prob):
        """Get maintenance recommendation"""
        if failure_prob > 0.9:
            return "URGENT: Schedule immediate maintenance"
        elif failure_prob > 0.7:
            return "Schedule maintenance within 24 hours"
        elif failure_prob > 0.5:
            return "Schedule maintenance within 1 week"
        else:
            return "Continue monitoring"

    def send_alert(self, prediction):
        """Send alert to IoT Core"""
        message = {
            'deviceId': self.sensor_buffer[-1]['deviceId'],
            'alertType': 'predictive_maintenance',
            'prediction': prediction,
        }

        request = PublishToIoTCoreRequest()
        request.topic_name = f"factory/alerts/maintenance"
        request.payload = json.dumps(message).encode('utf-8')
        request.qos = QOS.AT_LEAST_ONCE

        self.ipc_client.publish_to_iot_core(request)
        print(f"Alert sent: {prediction['recommendation']}")

# Greengrass component lifecycle
component = PredictiveMaintenanceComponent()

def handler(event):
    """Handle incoming sensor data"""
    sensor_data = json.loads(event)
    component.process_sensor_data(sensor_data)

Time-Series Data with Amazon Timestream

Manufacturing generates massive amounts of time-series data. Amazon Timestream is optimized for this workload, offering fast ingestion and query performance.

Writing Data to Timestream

import boto3
import time
from datetime import datetime

timestream_write = boto3.client('timestream-write', region_name='eu-west-1')
timestream_query = boto3.client('timestream-query', region_name='eu-west-1')

DATABASE_NAME = 'manufacturing-iot'
TABLE_NAME = 'sensor-data'

def write_sensor_records(sensor_data_batch):
    """Write batch of sensor records to Timestream"""
    records = []

    for data in sensor_data_batch:
        current_time = str(int(time.time() * 1000))

        # Create records for each sensor measurement
        for sensor_name, sensor_value in data['sensors'].items():
            record = {
                'Dimensions': [
                    {'Name': 'deviceId', 'Value': data['deviceId']},
                    {'Name': 'factoryId', 'Value': data['factoryId']},
                    {'Name': 'machineType', 'Value': data['machineType']},
                    {'Name': 'sensorType', 'Value': sensor_name},
                ],
                'MeasureName': sensor_name,
                'MeasureValue': str(sensor_value),
                'MeasureValueType': 'DOUBLE',
                'Time': current_time,
            }
            records.append(record)

    try:
        result = timestream_write.write_records(
            DatabaseName=DATABASE_NAME,
            TableName=TABLE_NAME,
            Records=records
        )
        print(f"Wrote {len(records)} records to Timestream")
        return result

    except Exception as e:
        print(f"Error writing to Timestream: {e}")
        raise

def query_equipment_performance(device_id, hours=24):
    """Query equipment performance over time"""
    query = f"""
        SELECT
            deviceId,
            BIN(time, 1h) AS hour,
            AVG(CASE WHEN sensorType = 'temperature' THEN measure_value::double END) AS avg_temperature,
            MAX(CASE WHEN sensorType = 'temperature' THEN measure_value::double END) AS max_temperature,
            AVG(CASE WHEN sensorType = 'vibration' THEN measure_value::double END) AS avg_vibration,
            MAX(CASE WHEN sensorType = 'vibration' THEN measure_value::double END) AS max_vibration,
            AVG(CASE WHEN sensorType = 'power' THEN measure_value::double END) AS avg_power
        FROM "{DATABASE_NAME}"."{TABLE_NAME}"
        WHERE deviceId = '{device_id}'
            AND time > ago({hours}h)
        GROUP BY deviceId, BIN(time, 1h)
        ORDER BY hour DESC
    """

    try:
        result = timestream_query.query(QueryString=query)
        return parse_query_result(result)

    except Exception as e:
        print(f"Error querying Timestream: {e}")
        raise

def calculate_oee(factory_id, start_time, end_time):
    """Calculate Overall Equipment Effectiveness (OEE)"""
    query = f"""
        WITH production_data AS (
            SELECT
                deviceId,
                COUNT(*) as total_readings,
                SUM(CASE WHEN operationalStatus = 'running' THEN 1 ELSE 0 END) as running_count,
                AVG(measure_value::double) as avg_production_rate
            FROM "{DATABASE_NAME}"."{TABLE_NAME}"
            WHERE factoryId = '{factory_id}'
                AND time BETWEEN from_iso8601_timestamp('{start_time}')
                AND from_iso8601_timestamp('{end_time}')
                AND measureName = 'productionCount'
            GROUP BY deviceId
        )
        SELECT
            deviceId,
            (running_count * 1.0 / total_readings) * 100 as availability_percent,
            avg_production_rate
        FROM production_data
    """

    result = timestream_query.query(QueryString=query)
    return parse_query_result(result)

def detect_anomalies(device_id, hours=1):
    """Detect anomalies using statistical methods"""
    query = f"""
        WITH sensor_stats AS (
            SELECT
                sensorType,
                AVG(measure_value::double) as mean_value,
                STDDEV(measure_value::double) as stddev_value
            FROM "{DATABASE_NAME}"."{TABLE_NAME}"
            WHERE deviceId = '{device_id}'
                AND time > ago({hours}h)
            GROUP BY sensorType
        ),
        recent_readings AS (
            SELECT
                time,
                sensorType,
                measure_value::double as value
            FROM "{DATABASE_NAME}"."{TABLE_NAME}"
            WHERE deviceId = '{device_id}'
                AND time > ago(5m)
        )
        SELECT
            r.time,
            r.sensorType,
            r.value,
            s.mean_value,
            s.stddev_value,
            ABS(r.value - s.mean_value) / s.stddev_value as z_score
        FROM recent_readings r
        JOIN sensor_stats s ON r.sensorType = s.sensorType
        WHERE ABS(r.value - s.mean_value) / s.stddev_value > 3
        ORDER BY r.time DESC
    """

    result = timestream_query.query(QueryString=query)
    anomalies = parse_query_result(result)

    if anomalies:
        print(f"Found {len(anomalies)} anomalies for device {device_id}")

    return anomalies

def parse_query_result(result):
    """Parse Timestream query result"""
    rows = []
    for row in result['Rows']:
        parsed_row = {}
        for i, col in enumerate(row['Data']):
            col_name = result['ColumnInfo'][i]['Name']
            parsed_row[col_name] = col.get('ScalarValue', '')
        rows.append(parsed_row)
    return rows

Predictive Maintenance with Machine Learning

Predictive maintenance is one of the most valuable IIoT applications. Using SageMaker, we can train models to predict equipment failures.

Training a Failure Prediction Model

import boto3
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import joblib
import sagemaker
from sagemaker.sklearn.estimator import SKLearn

# Prepare training data from Timestream
def prepare_training_data(factory_id, days=90):
    """Extract and prepare training data"""
    timestream_query = boto3.client('timestream-query', region_name='eu-west-1')

    # Query historical sensor data with labeled failures
    query = f"""
        SELECT
            deviceId,
            time,
            AVG(CASE WHEN sensorType = 'temperature' THEN measure_value::double END) AS temperature,
            AVG(CASE WHEN sensorType = 'vibration' THEN measure_value::double END) AS vibration,
            AVG(CASE WHEN sensorType = 'pressure' THEN measure_value::double END) AS pressure,
            AVG(CASE WHEN sensorType = 'rpm' THEN measure_value::double END) AS rpm,
            AVG(CASE WHEN sensorType = 'power' THEN measure_value::double END) AS power
        FROM "manufacturing-iot"."sensor-data"
        WHERE factoryId = '{factory_id}'
            AND time > ago({days}d)
        GROUP BY deviceId, time
        ORDER BY time
    """

    result = timestream_query.query(QueryString=query)

    # Convert to DataFrame
    data = []
    for row in result['Rows']:
        data_row = {}
        for i, col in enumerate(row['Data']):
            col_name = result['ColumnInfo'][i]['Name']
            data_row[col_name] = float(col.get('ScalarValue', 0))
        data.append(data_row)

    df = pd.DataFrame(data)

    # Feature engineering
    df = create_features(df)

    return df

def create_features(df):
    """Create features for ML model"""
    # Rolling statistics
    for col in ['temperature', 'vibration', 'pressure', 'rpm', 'power']:
        df[f'{col}_rolling_mean_1h'] = df.groupby('deviceId')[col].transform(
            lambda x: x.rolling(window=12, min_periods=1).mean()
        )
        df[f'{col}_rolling_std_1h'] = df.groupby('deviceId')[col].transform(
            lambda x: x.rolling(window=12, min_periods=1).std()
        )
        df[f'{col}_trend'] = df.groupby('deviceId')[col].transform(
            lambda x: x.diff()
        )

    # Rate of change
    df['temp_rate_of_change'] = df.groupby('deviceId')['temperature'].diff()
    df['vibration_rate_of_change'] = df.groupby('deviceId')['vibration'].diff()

    # Interaction features
    df['temp_vibration_interaction'] = df['temperature'] * df['vibration']
    df['pressure_rpm_ratio'] = df['pressure'] / (df['rpm'] + 1)

    # Time-based features
    df['hour'] = pd.to_datetime(df['time'], unit='ms').dt.hour
    df['day_of_week'] = pd.to_datetime(df['time'], unit='ms').dt.dayofweek

    # Drop NaN values from rolling calculations
    df = df.fillna(method='bfill')

    return df

def train_failure_prediction_model(training_data):
    """Train Random Forest model for failure prediction"""
    # Prepare features and target
    feature_columns = [col for col in training_data.columns
                      if col not in ['deviceId', 'time', 'failure_within_24h']]

    X = training_data[feature_columns]
    y = training_data['failure_within_24h']

    # Split data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    # Train model
    model = RandomForestClassifier(
        n_estimators=200,
        max_depth=15,
        min_samples_split=10,
        min_samples_leaf=4,
        class_weight='balanced',
        random_state=42,
        n_jobs=-1
    )

    model.fit(X_train, y_train)

    # Evaluate
    y_pred = model.predict(X_test)
    print("\nModel Performance:")
    print(classification_report(y_test, y_pred))
    print("\nConfusion Matrix:")
    print(confusion_matrix(y_test, y_pred))

    # Feature importance
    feature_importance = pd.DataFrame({
        'feature': feature_columns,
        'importance': model.feature_importances_
    }).sort_values('importance', ascending=False)

    print("\nTop 10 Most Important Features:")
    print(feature_importance.head(10))

    return model, feature_columns

# Deploy model to SageMaker
def deploy_to_sagemaker(model, feature_columns):
    """Deploy trained model to SageMaker endpoint"""
    # Save model and feature columns
    joblib.dump(model, 'model.joblib')
    joblib.dump(feature_columns, 'feature_columns.joblib')

    # Upload to S3
    sagemaker_session = sagemaker.Session()
    bucket = sagemaker_session.default_bucket()
    prefix = 'manufacturing-predictive-maintenance'

    model_data = sagemaker_session.upload_data(
        path='model.joblib',
        bucket=bucket,
        key_prefix=f'{prefix}/model'
    )

    # Create SageMaker model
    role = sagemaker.get_execution_role()

    sklearn_model = SKLearn(
        entry_point='inference.py',
        role=role,
        instance_type='ml.m5.large',
        framework_version='1.2-1',
        py_version='py3',
    )

    # Deploy endpoint
    predictor = sklearn_model.deploy(
        initial_instance_count=1,
        instance_type='ml.t2.medium',
        endpoint_name='manufacturing-maintenance-predictor'
    )

    print(f"Model deployed to endpoint: manufacturing-maintenance-predictor")

    return predictor

Real-Time Factory Monitoring Dashboard

Visualize factory operations in real-time using Amazon QuickSight or custom dashboards:

# Lambda function to aggregate metrics for dashboards
import boto3
import json
from datetime import datetime, timedelta

timestream_query = boto3.client('timestream-query', region_name='eu-west-1')
dynamodb = boto3.resource('dynamodb', region_name='eu-west-1')
metrics_table = dynamodb.Table('factory-metrics')

def lambda_handler(event, context):
    """Aggregate factory metrics for dashboard"""
    factory_id = event.get('factoryId', 'AMS-FACTORY-01')

    # Calculate metrics
    metrics = {
        'timestamp': datetime.utcnow().isoformat(),
        'factoryId': factory_id,
        'oee': calculate_oee(factory_id),
        'activeEquipment': count_active_equipment(factory_id),
        'alertCount': count_recent_alerts(factory_id),
        'productionRate': calculate_production_rate(factory_id),
        'energyConsumption': calculate_energy_consumption(factory_id),
        'equipmentStatus': get_equipment_status(factory_id),
    }

    # Store in DynamoDB
    metrics_table.put_item(Item=metrics)

    return {
        'statusCode': 200,
        'body': json.dumps(metrics, default=str)
    }

def calculate_oee(factory_id):
    """Calculate Overall Equipment Effectiveness"""
    query = f"""
        SELECT
            AVG(availability) * AVG(performance) * AVG(quality) as oee
        FROM "manufacturing-iot"."oee-metrics"
        WHERE factoryId = '{factory_id}'
            AND time > ago(1h)
    """
    result = timestream_query.query(QueryString=query)

    if result['Rows']:
        return float(result['Rows'][0]['Data'][0]['ScalarValue']) * 100
    return 0

def count_active_equipment(factory_id):
    """Count active equipment"""
    query = f"""
        SELECT COUNT(DISTINCT deviceId) as active_count
        FROM "manufacturing-iot"."sensor-data"
        WHERE factoryId = '{factory_id}'
            AND time > ago(5m)
    """
    result = timestream_query.query(QueryString=query)

    if result['Rows']:
        return int(result['Rows'][0]['Data'][0]['ScalarValue'])
    return 0

def calculate_production_rate(factory_id):
    """Calculate current production rate"""
    query = f"""
        SELECT SUM(measure_value::double) as total_production
        FROM "manufacturing-iot"."sensor-data"
        WHERE factoryId = '{factory_id}'
            AND measureName = 'productionCount'
            AND time > ago(1h)
    """
    result = timestream_query.query(QueryString=query)

    if result['Rows']:
        return float(result['Rows'][0]['Data'][0]['ScalarValue'])
    return 0

Supply Chain Optimization with IoT

Track assets throughout the supply chain with IoT-enabled tracking:

# Asset tracking with IoT
def track_shipment(shipment_id, location_data):
    """Track shipment location and conditions"""
    dynamodb = boto3.resource('dynamodb', region_name='eu-west-1')
    shipments_table = dynamodb.Table('shipments')

    # Update shipment location
    shipments_table.update_item(
        Key={'shipmentId': shipment_id},
        UpdateExpression='SET currentLocation = :loc, lastUpdated = :time, temperature = :temp',
        ExpressionAttributeValues={
            ':loc': location_data['coordinates'],
            ':time': int(time.time()),
            ':temp': location_data['temperature'],
        }
    )

    # Check for temperature violations
    if location_data['temperature'] < 2 or location_data['temperature'] > 8:
        send_cold_chain_alert(shipment_id, location_data)

Best Practices for Manufacturing IoT

1. Security

  • Use X.509 certificates for device authentication
  • Implement AWS IoT Device Defender for security monitoring
  • Enable AWS IoT Secure Tunneling for remote device access
  • Use VPN or AWS Direct Connect for factory connectivity
  • Implement least-privilege IAM policies
  • Enable AWS CloudTrail for audit logging

2. Scalability

  • Use IoT Core message broker for millions of devices
  • Implement data aggregation at edge with Greengrass
  • Use Kinesis Data Streams for high-throughput ingestion
  • Partition time-series data effectively in Timestream
  • Implement caching strategies for frequently accessed data

3. Reliability

  • Deploy Greengrass for offline operation capability
  • Implement message buffering and retry logic
  • Use DynamoDB for device state management
  • Set up multi-region failover for critical systems
  • Monitor device connectivity and health

4. Cost Optimization

  • Filter and aggregate data at edge before sending to cloud
  • Use appropriate Timestream retention policies
  • Implement S3 lifecycle policies for archived data
  • Right-size IoT Core message routing rules
  • Use reserved capacity for predictable workloads

Conclusion

AWS provides a comprehensive platform for implementing Industry 4.0 initiatives in manufacturing. From device connectivity with IoT Core, to edge processing with Greengrass, to predictive maintenance with SageMaker, Dutch manufacturers can transform their operations to be more efficient, reliable, and competitive.

The combination of real-time monitoring, predictive analytics, and automated responses enables manufacturers to:

  • Reduce unplanned downtime by 30-50%
  • Improve OEE by 15-25%
  • Decrease maintenance costs by 20-30%
  • Enhance product quality through early defect detection
  • Optimize energy consumption and reduce costs

At Forrict, we help Dutch manufacturing companies design and implement IoT solutions on AWS. Our team has experience across various manufacturing sectors including high-tech equipment (ASML-style), automotive (DAF-inspired), and food processing. Contact us to discuss how we can help you implement Industry 4.0 technologies and transform your manufacturing operations.

F

Forrict Team

AWS expert and consultant at Forrict, specializing in cloud architecture and AWS best practices for Dutch businesses.

Tags

AWS IoT Manufacturing Industry 4.0 IoT Core Greengrass Machine Learning Predictive Maintenance

Related Articles

Ready to Transform Your AWS Infrastructure?

Let's discuss how we can help optimize your cloud journey