Dutch manufacturing companies are embracing Industry 4.0, transforming traditional factories into smart, connected facilities. Companies like ASML, Philips, and DAF Trucks leverage Industrial IoT (IIoT) to optimize production, reduce downtime, and improve product quality. AWS provides a comprehensive IoT platform that enables manufacturers to collect, process, and analyze data from thousands of connected devices in real-time.
The Industry 4.0 Revolution
Industry 4.0 represents the fourth industrial revolution, characterized by the integration of cyber-physical systems, Internet of Things (IoT), cloud computing, and artificial intelligence into manufacturing processes. For Dutch manufacturers, this transformation is critical for maintaining competitiveness in the global market.
Business Drivers for IoT in Manufacturing
Predictive Maintenance: Traditional preventive maintenance schedules equipment servicing at fixed intervals, leading to either premature part replacement or unexpected failures. Predictive maintenance uses real-time sensor data and machine learning to predict failures before they occur, reducing downtime by up to 50% and maintenance costs by 30%.
Operational Efficiency: Real-time monitoring of production lines identifies bottlenecks, quality issues, and inefficiencies instantly. Dutch manufacturers report 15-25% improvements in Overall Equipment Effectiveness (OEE) after implementing IoT monitoring.
Quality Control: Automated quality inspection using computer vision and sensor data catches defects earlier in the production process, reducing waste and improving customer satisfaction.
Supply Chain Optimization: IoT-enabled asset tracking provides real-time visibility into inventory levels, shipment locations, and supply chain disruptions, enabling faster response to changes.
Energy Management: Manufacturing accounts for significant energy consumption. IoT sensors identify energy waste and optimize consumption patterns, reducing costs and supporting sustainability goals.
AWS IoT Services Overview
AWS offers a complete IoT stack for manufacturing:
AWS IoT Core: Managed cloud service that connects billions of devices securely and routes messages to AWS services and applications.
AWS IoT Greengrass: Extends AWS capabilities to edge devices, enabling local processing, machine learning inference, and operation even when disconnected from the cloud.
AWS IoT SiteWise: Collects, organizes, and analyzes industrial equipment data at scale, providing insights into operational performance.
AWS IoT Events: Detects and responds to events from IoT sensors and applications, triggering automated actions.
AWS IoT Analytics: Processes, enriches, stores, and analyzes IoT data for decision-making.
Amazon Timestream: Purpose-built time-series database for IoT applications, storing and analyzing trillions of events per day.
AWS IoT Core Architecture for Manufacturing
A typical manufacturing IoT architecture on AWS consists of edge devices collecting sensor data, secure communication channels to the cloud, data processing pipelines, and analytics/ML services for insights.
Reference Architecture
Factory Floor
├── Sensors & PLCs
│ ├── Temperature sensors
│ ├── Vibration sensors
│ ├── Pressure sensors
│ └── Machine controllers
│
├── Edge Gateways (IoT Greengrass)
│ ├── Local data processing
│ ├── ML inference
│ ├── Protocol translation
│ └── Data aggregation
│
└── Network (Secure)
└── TLS 1.2+ encryption
│
│ MQTT/HTTPS
▼
AWS Cloud
├── AWS IoT Core
│ ├── Device authentication
│ ├── Message routing
│ └── Rules engine
│
├── Data Processing
│ ├── AWS IoT Analytics
│ ├── Amazon Kinesis
│ └── AWS Lambda
│
├── Storage
│ ├── Amazon Timestream (time-series)
│ ├── Amazon S3 (raw data lake)
│ └── DynamoDB (metadata)
│
├── Analytics & ML
│ ├── Amazon SageMaker (predictive models)
│ ├── AWS IoT SiteWise (OEE metrics)
│ └── Amazon QuickSight (dashboards)
│
└── Applications
├── Real-time monitoring dashboards
├── Predictive maintenance alerts
└── Quality control systems
Implementing IoT Core with CDK
Here’s a complete CDK implementation for a manufacturing IoT infrastructure:
import * as cdk from 'aws-cdk-lib';
import * as iot from 'aws-cdk-lib/aws-iot';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as timestream from 'aws-cdk-lib/aws-timestream';
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as sns from 'aws-cdk-lib/aws-sns';
import * as subscriptions from 'aws-cdk-lib/aws-sns-subscriptions';
import { Construct } from 'constructs';
export class ManufacturingIoTStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// S3 bucket for raw IoT data
const dataBucket = new s3.Bucket(this, 'IoTDataBucket', {
encryption: s3.BucketEncryption.S3_MANAGED,
versioned: true,
lifecycleRules: [
{
transitions: [
{
storageClass: s3.StorageClass.INTELLIGENT_TIERING,
transitionAfter: cdk.Duration.days(30),
},
{
storageClass: s3.StorageClass.GLACIER,
transitionAfter: cdk.Duration.days(90),
},
],
},
],
});
// Timestream database for time-series data
const timestreamDb = new timestream.CfnDatabase(this, 'IoTDatabase', {
databaseName: 'manufacturing-iot',
});
const timestreamTable = new timestream.CfnTable(this, 'SensorDataTable', {
databaseName: timestreamDb.databaseName!,
tableName: 'sensor-data',
retentionProperties: {
MemoryStoreRetentionPeriodInHours: '24',
MagneticStoreRetentionPeriodInDays: '365',
},
});
timestreamTable.addDependency(timestreamDb);
// Kinesis stream for real-time processing
const dataStream = new kinesis.Stream(this, 'IoTDataStream', {
streamName: 'manufacturing-iot-stream',
shardCount: 2,
retentionPeriod: cdk.Duration.days(7),
encryption: kinesis.StreamEncryption.MANAGED,
});
// SNS topic for alerts
const alertTopic = new sns.Topic(this, 'AlertTopic', {
displayName: 'Manufacturing IoT Alerts',
});
alertTopic.addSubscription(
new subscriptions.EmailSubscription('operations@forrict.nl')
);
// Lambda function for data processing
const dataProcessorFunction = new lambda.Function(this, 'DataProcessor', {
runtime: lambda.Runtime.PYTHON_3_11,
handler: 'index.handler',
code: lambda.Code.fromAsset('lambda/iot-processor'),
timeout: cdk.Duration.seconds(60),
memorySize: 512,
environment: {
TIMESTREAM_DB: timestreamDb.databaseName!,
TIMESTREAM_TABLE: timestreamTable.tableName!,
ALERT_TOPIC_ARN: alertTopic.topicArn,
},
});
// Grant permissions to write to Timestream
dataProcessorFunction.addToRolePolicy(new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'timestream:WriteRecords',
'timestream:DescribeEndpoints',
],
resources: ['*'],
}));
alertTopic.grantPublish(dataProcessorFunction);
// Lambda function for anomaly detection
const anomalyDetectorFunction = new lambda.Function(this, 'AnomalyDetector', {
runtime: lambda.Runtime.PYTHON_3_11,
handler: 'index.handler',
code: lambda.Code.fromAsset('lambda/anomaly-detector'),
timeout: cdk.Duration.seconds(60),
memorySize: 1024,
environment: {
ALERT_TOPIC_ARN: alertTopic.topicArn,
TIMESTREAM_DB: timestreamDb.databaseName!,
TIMESTREAM_TABLE: timestreamTable.tableName!,
},
});
anomalyDetectorFunction.addToRolePolicy(new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'timestream:Select',
'timestream:DescribeEndpoints',
],
resources: ['*'],
}));
alertTopic.grantPublish(anomalyDetectorFunction);
// IoT Rule to process sensor data
const iotRole = new iam.Role(this, 'IoTRuleRole', {
assumedBy: new iam.ServicePrincipal('iot.amazonaws.com'),
});
dataBucket.grantWrite(iotRole);
dataStream.grantWrite(iotRole);
dataProcessorFunction.grantInvoke(iotRole);
// IoT Topic Rule for sensor data
new iot.CfnTopicRule(this, 'SensorDataRule', {
topicRulePayload: {
sql: "SELECT * FROM 'factory/+/sensor/#'",
description: 'Route sensor data to processing pipeline',
actions: [
{
lambda: {
functionArn: dataProcessorFunction.functionArn,
},
},
{
kinesis: {
roleArn: iotRole.roleArn,
streamName: dataStream.streamName,
partitionKey: '${topic(2)}', // Device ID
},
},
{
s3: {
roleArn: iotRole.roleArn,
bucketName: dataBucket.bucketName,
key: 'raw/${timestamp()}/${topic(2)}.json',
},
},
],
},
});
// IoT Topic Rule for anomaly detection
new iot.CfnTopicRule(this, 'AnomalyDetectionRule', {
topicRulePayload: {
sql: "SELECT * FROM 'factory/+/sensor/#' WHERE temperature > 80 OR vibration > 10",
description: 'Detect anomalies in sensor data',
actions: [
{
lambda: {
functionArn: anomalyDetectorFunction.functionArn,
},
},
{
sns: {
roleArn: iotRole.roleArn,
targetArn: alertTopic.topicArn,
messageFormat: 'JSON',
},
},
],
},
});
alertTopic.grantPublish(iotRole);
// IoT Thing Type for manufacturing devices
new iot.CfnThingType(this, 'SensorThingType', {
thingTypeName: 'ManufacturingSensor',
thingTypeProperties: {
thingTypeDescription: 'Industrial sensor for manufacturing',
searchableAttributes: ['location', 'machineType', 'factory'],
},
});
// IoT Policy for devices
new iot.CfnPolicy(this, 'DevicePolicy', {
policyName: 'ManufacturingDevicePolicy',
policyDocument: {
Version: '2012-10-17',
Statement: [
{
Effect: 'Allow',
Action: ['iot:Connect'],
Resource: ['arn:aws:iot:*:*:client/${iot:ClientId}'],
},
{
Effect: 'Allow',
Action: ['iot:Publish'],
Resource: ['arn:aws:iot:*:*:topic/factory/*/sensor/*'],
},
{
Effect: 'Allow',
Action: ['iot:Subscribe'],
Resource: ['arn:aws:iot:*:*:topicfilter/factory/*/command/*'],
},
{
Effect: 'Allow',
Action: ['iot:Receive'],
Resource: ['arn:aws:iot:*:*:topic/factory/*/command/*'],
},
],
},
});
// Outputs
new cdk.CfnOutput(this, 'DataBucketName', {
value: dataBucket.bucketName,
description: 'S3 bucket for raw IoT data',
});
new cdk.CfnOutput(this, 'TimestreamDatabase', {
value: timestreamDb.databaseName!,
description: 'Timestream database name',
});
new cdk.CfnOutput(this, 'KinesisStreamName', {
value: dataStream.streamName,
description: 'Kinesis stream for real-time processing',
});
}
}
This infrastructure provides secure device connectivity, real-time data processing, time-series storage, and automated anomaly detection.
IoT Device Simulation and Testing
Before deploying physical sensors, simulate IoT devices to test your infrastructure:
import json
import time
import random
import ssl
from datetime import datetime
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
class ManufacturingSensorSimulator:
def __init__(self, device_id, factory_id, machine_type):
self.device_id = device_id
self.factory_id = factory_id
self.machine_type = machine_type
# Initialize MQTT client
self.mqtt_client = AWSIoTMQTTClient(device_id)
self.mqtt_client.configureEndpoint("your-iot-endpoint.iot.eu-west-1.amazonaws.com", 8883)
self.mqtt_client.configureCredentials(
"AmazonRootCA1.pem",
f"certs/{device_id}.private.key",
f"certs/{device_id}.cert.pem"
)
# Configure connection
self.mqtt_client.configureAutoReconnectBackoffTime(1, 32, 20)
self.mqtt_client.configureOfflinePublishQueueing(-1)
self.mqtt_client.configureDrainingFrequency(2)
self.mqtt_client.configureConnectDisconnectTimeout(10)
self.mqtt_client.configureMQTTOperationTimeout(5)
def connect(self):
"""Connect to AWS IoT Core"""
self.mqtt_client.connect()
print(f"Device {self.device_id} connected to AWS IoT Core")
def generate_sensor_data(self, introduce_anomaly=False):
"""Generate realistic sensor data"""
# Normal operating ranges
temperature = random.gauss(65, 5) # Mean 65°C, std dev 5
vibration = random.gauss(3, 0.5) # Mean 3 mm/s, std dev 0.5
pressure = random.gauss(100, 2) # Mean 100 PSI, std dev 2
rpm = random.gauss(1500, 50) # Mean 1500 RPM, std dev 50
power = random.gauss(75, 5) # Mean 75 kW, std dev 5
# Introduce anomalies for testing
if introduce_anomaly:
anomaly_type = random.choice(['overheat', 'vibration', 'pressure'])
if anomaly_type == 'overheat':
temperature = random.gauss(90, 10) # Overheating
elif anomaly_type == 'vibration':
vibration = random.gauss(12, 2) # Excessive vibration
elif anomaly_type == 'pressure':
pressure = random.gauss(120, 5) # High pressure
return {
'deviceId': self.device_id,
'factoryId': self.factory_id,
'machineType': self.machine_type,
'timestamp': int(time.time() * 1000),
'sensors': {
'temperature': round(temperature, 2),
'vibration': round(vibration, 2),
'pressure': round(pressure, 2),
'rpm': round(rpm, 0),
'power': round(power, 2),
},
'operationalStatus': 'running',
'productionCount': random.randint(0, 100),
}
def publish_data(self, data):
"""Publish sensor data to IoT Core"""
topic = f"factory/{self.factory_id}/sensor/{self.device_id}"
message = json.dumps(data)
self.mqtt_client.publish(topic, message, 1)
print(f"Published to {topic}: {message}")
def run_simulation(self, duration_seconds=3600, interval_seconds=5, anomaly_rate=0.01):
"""Run sensor simulation"""
self.connect()
start_time = time.time()
message_count = 0
try:
while (time.time() - start_time) < duration_seconds:
# Decide whether to introduce anomaly
introduce_anomaly = random.random() < anomaly_rate
# Generate and publish data
data = self.generate_sensor_data(introduce_anomaly)
self.publish_data(data)
message_count += 1
time.sleep(interval_seconds)
except KeyboardInterrupt:
print("\nSimulation stopped by user")
finally:
print(f"\nSimulation complete. Published {message_count} messages")
self.mqtt_client.disconnect()
# Usage example
if __name__ == "__main__":
# Simulate multiple machines in Amsterdam factory
simulators = [
ManufacturingSensorSimulator("CNC-001", "AMS-FACTORY-01", "CNC-Machine"),
ManufacturingSensorSimulator("PRESS-001", "AMS-FACTORY-01", "Hydraulic-Press"),
ManufacturingSensorSimulator("WELD-001", "AMS-FACTORY-01", "Welding-Robot"),
ManufacturingSensorSimulator("ASSY-001", "AMS-FACTORY-01", "Assembly-Line"),
]
import threading
# Run simulators in parallel
threads = []
for sim in simulators:
thread = threading.Thread(
target=sim.run_simulation,
args=(3600, 5, 0.02) # 1 hour, 5 second intervals, 2% anomaly rate
)
thread.start()
threads.append(thread)
# Wait for all threads to complete
for thread in threads:
thread.join()
print("All simulations complete")
This simulator creates realistic sensor data with occasional anomalies, perfect for testing your IoT pipeline.
AWS IoT Greengrass for Edge Computing
Manufacturing environments often require local processing due to network constraints, latency requirements, or operational continuity needs. AWS IoT Greengrass brings AWS capabilities to the factory floor.
Greengrass Use Cases
Local ML Inference: Run machine learning models directly on edge devices for real-time quality inspection, predictive maintenance, and process optimization without cloud round-trip latency.
Protocol Translation: Convert industrial protocols (Modbus, OPC-UA, Profinet) to MQTT for cloud connectivity.
Data Filtering and Aggregation: Process and aggregate sensor data locally, reducing bandwidth costs by sending only relevant data to the cloud.
Offline Operation: Continue processing data and making decisions even when connectivity to AWS is interrupted.
Implementing Greengrass Components
# Greengrass component for predictive maintenance
import json
import time
import numpy as np
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
PublishToIoTCoreRequest,
QOS
)
class PredictiveMaintenanceComponent:
def __init__(self):
self.ipc_client = awsiot.greengrasscoreipc.connect()
self.sensor_buffer = []
self.buffer_size = 100
# Load pre-trained model (simplified example)
self.failure_threshold = 0.85
def process_sensor_data(self, sensor_data):
"""Process incoming sensor data"""
self.sensor_buffer.append(sensor_data)
# Keep buffer at specified size
if len(self.sensor_buffer) > self.buffer_size:
self.sensor_buffer.pop(0)
# Run prediction when buffer is full
if len(self.sensor_buffer) == self.buffer_size:
prediction = self.predict_failure()
if prediction['failure_probability'] > self.failure_threshold:
self.send_alert(prediction)
def predict_failure(self):
"""Predict equipment failure based on sensor trends"""
# Extract sensor values
temperatures = [d['sensors']['temperature'] for d in self.sensor_buffer]
vibrations = [d['sensors']['vibration'] for d in self.sensor_buffer]
# Simple trend analysis (replace with actual ML model)
temp_trend = np.polyfit(range(len(temperatures)), temperatures, 1)[0]
vib_trend = np.polyfit(range(len(vibrations)), vibrations, 1)[0]
# Calculate failure probability
failure_prob = 0.0
if temp_trend > 0.1: # Temperature rising
failure_prob += 0.3
if vib_trend > 0.05: # Vibration increasing
failure_prob += 0.4
if temperatures[-1] > 80: # Current temp high
failure_prob += 0.2
if vibrations[-1] > 8: # Current vibration high
failure_prob += 0.2
return {
'failure_probability': min(failure_prob, 1.0),
'temperature_trend': temp_trend,
'vibration_trend': vib_trend,
'recommendation': self.get_recommendation(failure_prob),
'timestamp': int(time.time() * 1000),
}
def get_recommendation(self, failure_prob):
"""Get maintenance recommendation"""
if failure_prob > 0.9:
return "URGENT: Schedule immediate maintenance"
elif failure_prob > 0.7:
return "Schedule maintenance within 24 hours"
elif failure_prob > 0.5:
return "Schedule maintenance within 1 week"
else:
return "Continue monitoring"
def send_alert(self, prediction):
"""Send alert to IoT Core"""
message = {
'deviceId': self.sensor_buffer[-1]['deviceId'],
'alertType': 'predictive_maintenance',
'prediction': prediction,
}
request = PublishToIoTCoreRequest()
request.topic_name = f"factory/alerts/maintenance"
request.payload = json.dumps(message).encode('utf-8')
request.qos = QOS.AT_LEAST_ONCE
self.ipc_client.publish_to_iot_core(request)
print(f"Alert sent: {prediction['recommendation']}")
# Greengrass component lifecycle
component = PredictiveMaintenanceComponent()
def handler(event):
"""Handle incoming sensor data"""
sensor_data = json.loads(event)
component.process_sensor_data(sensor_data)
Time-Series Data with Amazon Timestream
Manufacturing generates massive amounts of time-series data. Amazon Timestream is optimized for this workload, offering fast ingestion and query performance.
Writing Data to Timestream
import boto3
import time
from datetime import datetime
timestream_write = boto3.client('timestream-write', region_name='eu-west-1')
timestream_query = boto3.client('timestream-query', region_name='eu-west-1')
DATABASE_NAME = 'manufacturing-iot'
TABLE_NAME = 'sensor-data'
def write_sensor_records(sensor_data_batch):
"""Write batch of sensor records to Timestream"""
records = []
for data in sensor_data_batch:
current_time = str(int(time.time() * 1000))
# Create records for each sensor measurement
for sensor_name, sensor_value in data['sensors'].items():
record = {
'Dimensions': [
{'Name': 'deviceId', 'Value': data['deviceId']},
{'Name': 'factoryId', 'Value': data['factoryId']},
{'Name': 'machineType', 'Value': data['machineType']},
{'Name': 'sensorType', 'Value': sensor_name},
],
'MeasureName': sensor_name,
'MeasureValue': str(sensor_value),
'MeasureValueType': 'DOUBLE',
'Time': current_time,
}
records.append(record)
try:
result = timestream_write.write_records(
DatabaseName=DATABASE_NAME,
TableName=TABLE_NAME,
Records=records
)
print(f"Wrote {len(records)} records to Timestream")
return result
except Exception as e:
print(f"Error writing to Timestream: {e}")
raise
def query_equipment_performance(device_id, hours=24):
"""Query equipment performance over time"""
query = f"""
SELECT
deviceId,
BIN(time, 1h) AS hour,
AVG(CASE WHEN sensorType = 'temperature' THEN measure_value::double END) AS avg_temperature,
MAX(CASE WHEN sensorType = 'temperature' THEN measure_value::double END) AS max_temperature,
AVG(CASE WHEN sensorType = 'vibration' THEN measure_value::double END) AS avg_vibration,
MAX(CASE WHEN sensorType = 'vibration' THEN measure_value::double END) AS max_vibration,
AVG(CASE WHEN sensorType = 'power' THEN measure_value::double END) AS avg_power
FROM "{DATABASE_NAME}"."{TABLE_NAME}"
WHERE deviceId = '{device_id}'
AND time > ago({hours}h)
GROUP BY deviceId, BIN(time, 1h)
ORDER BY hour DESC
"""
try:
result = timestream_query.query(QueryString=query)
return parse_query_result(result)
except Exception as e:
print(f"Error querying Timestream: {e}")
raise
def calculate_oee(factory_id, start_time, end_time):
"""Calculate Overall Equipment Effectiveness (OEE)"""
query = f"""
WITH production_data AS (
SELECT
deviceId,
COUNT(*) as total_readings,
SUM(CASE WHEN operationalStatus = 'running' THEN 1 ELSE 0 END) as running_count,
AVG(measure_value::double) as avg_production_rate
FROM "{DATABASE_NAME}"."{TABLE_NAME}"
WHERE factoryId = '{factory_id}'
AND time BETWEEN from_iso8601_timestamp('{start_time}')
AND from_iso8601_timestamp('{end_time}')
AND measureName = 'productionCount'
GROUP BY deviceId
)
SELECT
deviceId,
(running_count * 1.0 / total_readings) * 100 as availability_percent,
avg_production_rate
FROM production_data
"""
result = timestream_query.query(QueryString=query)
return parse_query_result(result)
def detect_anomalies(device_id, hours=1):
"""Detect anomalies using statistical methods"""
query = f"""
WITH sensor_stats AS (
SELECT
sensorType,
AVG(measure_value::double) as mean_value,
STDDEV(measure_value::double) as stddev_value
FROM "{DATABASE_NAME}"."{TABLE_NAME}"
WHERE deviceId = '{device_id}'
AND time > ago({hours}h)
GROUP BY sensorType
),
recent_readings AS (
SELECT
time,
sensorType,
measure_value::double as value
FROM "{DATABASE_NAME}"."{TABLE_NAME}"
WHERE deviceId = '{device_id}'
AND time > ago(5m)
)
SELECT
r.time,
r.sensorType,
r.value,
s.mean_value,
s.stddev_value,
ABS(r.value - s.mean_value) / s.stddev_value as z_score
FROM recent_readings r
JOIN sensor_stats s ON r.sensorType = s.sensorType
WHERE ABS(r.value - s.mean_value) / s.stddev_value > 3
ORDER BY r.time DESC
"""
result = timestream_query.query(QueryString=query)
anomalies = parse_query_result(result)
if anomalies:
print(f"Found {len(anomalies)} anomalies for device {device_id}")
return anomalies
def parse_query_result(result):
"""Parse Timestream query result"""
rows = []
for row in result['Rows']:
parsed_row = {}
for i, col in enumerate(row['Data']):
col_name = result['ColumnInfo'][i]['Name']
parsed_row[col_name] = col.get('ScalarValue', '')
rows.append(parsed_row)
return rows
Predictive Maintenance with Machine Learning
Predictive maintenance is one of the most valuable IIoT applications. Using SageMaker, we can train models to predict equipment failures.
Training a Failure Prediction Model
import boto3
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import joblib
import sagemaker
from sagemaker.sklearn.estimator import SKLearn
# Prepare training data from Timestream
def prepare_training_data(factory_id, days=90):
"""Extract and prepare training data"""
timestream_query = boto3.client('timestream-query', region_name='eu-west-1')
# Query historical sensor data with labeled failures
query = f"""
SELECT
deviceId,
time,
AVG(CASE WHEN sensorType = 'temperature' THEN measure_value::double END) AS temperature,
AVG(CASE WHEN sensorType = 'vibration' THEN measure_value::double END) AS vibration,
AVG(CASE WHEN sensorType = 'pressure' THEN measure_value::double END) AS pressure,
AVG(CASE WHEN sensorType = 'rpm' THEN measure_value::double END) AS rpm,
AVG(CASE WHEN sensorType = 'power' THEN measure_value::double END) AS power
FROM "manufacturing-iot"."sensor-data"
WHERE factoryId = '{factory_id}'
AND time > ago({days}d)
GROUP BY deviceId, time
ORDER BY time
"""
result = timestream_query.query(QueryString=query)
# Convert to DataFrame
data = []
for row in result['Rows']:
data_row = {}
for i, col in enumerate(row['Data']):
col_name = result['ColumnInfo'][i]['Name']
data_row[col_name] = float(col.get('ScalarValue', 0))
data.append(data_row)
df = pd.DataFrame(data)
# Feature engineering
df = create_features(df)
return df
def create_features(df):
"""Create features for ML model"""
# Rolling statistics
for col in ['temperature', 'vibration', 'pressure', 'rpm', 'power']:
df[f'{col}_rolling_mean_1h'] = df.groupby('deviceId')[col].transform(
lambda x: x.rolling(window=12, min_periods=1).mean()
)
df[f'{col}_rolling_std_1h'] = df.groupby('deviceId')[col].transform(
lambda x: x.rolling(window=12, min_periods=1).std()
)
df[f'{col}_trend'] = df.groupby('deviceId')[col].transform(
lambda x: x.diff()
)
# Rate of change
df['temp_rate_of_change'] = df.groupby('deviceId')['temperature'].diff()
df['vibration_rate_of_change'] = df.groupby('deviceId')['vibration'].diff()
# Interaction features
df['temp_vibration_interaction'] = df['temperature'] * df['vibration']
df['pressure_rpm_ratio'] = df['pressure'] / (df['rpm'] + 1)
# Time-based features
df['hour'] = pd.to_datetime(df['time'], unit='ms').dt.hour
df['day_of_week'] = pd.to_datetime(df['time'], unit='ms').dt.dayofweek
# Drop NaN values from rolling calculations
df = df.fillna(method='bfill')
return df
def train_failure_prediction_model(training_data):
"""Train Random Forest model for failure prediction"""
# Prepare features and target
feature_columns = [col for col in training_data.columns
if col not in ['deviceId', 'time', 'failure_within_24h']]
X = training_data[feature_columns]
y = training_data['failure_within_24h']
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Train model
model = RandomForestClassifier(
n_estimators=200,
max_depth=15,
min_samples_split=10,
min_samples_leaf=4,
class_weight='balanced',
random_state=42,
n_jobs=-1
)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
print("\nModel Performance:")
print(classification_report(y_test, y_pred))
print("\nConfusion Matrix:")
print(confusion_matrix(y_test, y_pred))
# Feature importance
feature_importance = pd.DataFrame({
'feature': feature_columns,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
print("\nTop 10 Most Important Features:")
print(feature_importance.head(10))
return model, feature_columns
# Deploy model to SageMaker
def deploy_to_sagemaker(model, feature_columns):
"""Deploy trained model to SageMaker endpoint"""
# Save model and feature columns
joblib.dump(model, 'model.joblib')
joblib.dump(feature_columns, 'feature_columns.joblib')
# Upload to S3
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
prefix = 'manufacturing-predictive-maintenance'
model_data = sagemaker_session.upload_data(
path='model.joblib',
bucket=bucket,
key_prefix=f'{prefix}/model'
)
# Create SageMaker model
role = sagemaker.get_execution_role()
sklearn_model = SKLearn(
entry_point='inference.py',
role=role,
instance_type='ml.m5.large',
framework_version='1.2-1',
py_version='py3',
)
# Deploy endpoint
predictor = sklearn_model.deploy(
initial_instance_count=1,
instance_type='ml.t2.medium',
endpoint_name='manufacturing-maintenance-predictor'
)
print(f"Model deployed to endpoint: manufacturing-maintenance-predictor")
return predictor
Real-Time Factory Monitoring Dashboard
Visualize factory operations in real-time using Amazon QuickSight or custom dashboards:
# Lambda function to aggregate metrics for dashboards
import boto3
import json
from datetime import datetime, timedelta
timestream_query = boto3.client('timestream-query', region_name='eu-west-1')
dynamodb = boto3.resource('dynamodb', region_name='eu-west-1')
metrics_table = dynamodb.Table('factory-metrics')
def lambda_handler(event, context):
"""Aggregate factory metrics for dashboard"""
factory_id = event.get('factoryId', 'AMS-FACTORY-01')
# Calculate metrics
metrics = {
'timestamp': datetime.utcnow().isoformat(),
'factoryId': factory_id,
'oee': calculate_oee(factory_id),
'activeEquipment': count_active_equipment(factory_id),
'alertCount': count_recent_alerts(factory_id),
'productionRate': calculate_production_rate(factory_id),
'energyConsumption': calculate_energy_consumption(factory_id),
'equipmentStatus': get_equipment_status(factory_id),
}
# Store in DynamoDB
metrics_table.put_item(Item=metrics)
return {
'statusCode': 200,
'body': json.dumps(metrics, default=str)
}
def calculate_oee(factory_id):
"""Calculate Overall Equipment Effectiveness"""
query = f"""
SELECT
AVG(availability) * AVG(performance) * AVG(quality) as oee
FROM "manufacturing-iot"."oee-metrics"
WHERE factoryId = '{factory_id}'
AND time > ago(1h)
"""
result = timestream_query.query(QueryString=query)
if result['Rows']:
return float(result['Rows'][0]['Data'][0]['ScalarValue']) * 100
return 0
def count_active_equipment(factory_id):
"""Count active equipment"""
query = f"""
SELECT COUNT(DISTINCT deviceId) as active_count
FROM "manufacturing-iot"."sensor-data"
WHERE factoryId = '{factory_id}'
AND time > ago(5m)
"""
result = timestream_query.query(QueryString=query)
if result['Rows']:
return int(result['Rows'][0]['Data'][0]['ScalarValue'])
return 0
def calculate_production_rate(factory_id):
"""Calculate current production rate"""
query = f"""
SELECT SUM(measure_value::double) as total_production
FROM "manufacturing-iot"."sensor-data"
WHERE factoryId = '{factory_id}'
AND measureName = 'productionCount'
AND time > ago(1h)
"""
result = timestream_query.query(QueryString=query)
if result['Rows']:
return float(result['Rows'][0]['Data'][0]['ScalarValue'])
return 0
Supply Chain Optimization with IoT
Track assets throughout the supply chain with IoT-enabled tracking:
# Asset tracking with IoT
def track_shipment(shipment_id, location_data):
"""Track shipment location and conditions"""
dynamodb = boto3.resource('dynamodb', region_name='eu-west-1')
shipments_table = dynamodb.Table('shipments')
# Update shipment location
shipments_table.update_item(
Key={'shipmentId': shipment_id},
UpdateExpression='SET currentLocation = :loc, lastUpdated = :time, temperature = :temp',
ExpressionAttributeValues={
':loc': location_data['coordinates'],
':time': int(time.time()),
':temp': location_data['temperature'],
}
)
# Check for temperature violations
if location_data['temperature'] < 2 or location_data['temperature'] > 8:
send_cold_chain_alert(shipment_id, location_data)
Best Practices for Manufacturing IoT
1. Security
- Use X.509 certificates for device authentication
- Implement AWS IoT Device Defender for security monitoring
- Enable AWS IoT Secure Tunneling for remote device access
- Use VPN or AWS Direct Connect for factory connectivity
- Implement least-privilege IAM policies
- Enable AWS CloudTrail for audit logging
2. Scalability
- Use IoT Core message broker for millions of devices
- Implement data aggregation at edge with Greengrass
- Use Kinesis Data Streams for high-throughput ingestion
- Partition time-series data effectively in Timestream
- Implement caching strategies for frequently accessed data
3. Reliability
- Deploy Greengrass for offline operation capability
- Implement message buffering and retry logic
- Use DynamoDB for device state management
- Set up multi-region failover for critical systems
- Monitor device connectivity and health
4. Cost Optimization
- Filter and aggregate data at edge before sending to cloud
- Use appropriate Timestream retention policies
- Implement S3 lifecycle policies for archived data
- Right-size IoT Core message routing rules
- Use reserved capacity for predictable workloads
Conclusion
AWS provides a comprehensive platform for implementing Industry 4.0 initiatives in manufacturing. From device connectivity with IoT Core, to edge processing with Greengrass, to predictive maintenance with SageMaker, Dutch manufacturers can transform their operations to be more efficient, reliable, and competitive.
The combination of real-time monitoring, predictive analytics, and automated responses enables manufacturers to:
- Reduce unplanned downtime by 30-50%
- Improve OEE by 15-25%
- Decrease maintenance costs by 20-30%
- Enhance product quality through early defect detection
- Optimize energy consumption and reduce costs
At Forrict, we help Dutch manufacturing companies design and implement IoT solutions on AWS. Our team has experience across various manufacturing sectors including high-tech equipment (ASML-style), automotive (DAF-inspired), and food processing. Contact us to discuss how we can help you implement Industry 4.0 technologies and transform your manufacturing operations.
Forrict Team
AWS expert and consultant at Forrict, specializing in cloud architecture and AWS best practices for Dutch businesses.