Nederlandse productiebedrijven omarmen Industry 4.0 en transformeren traditionele fabrieken naar slimme, verbonden faciliteiten. Bedrijven zoals ASML, Philips en DAF Trucks maken gebruik van Industrial IoT (IIoT) om productie te optimaliseren, downtime te verminderen en productkwaliteit te verbeteren. AWS biedt een uitgebreid IoT-platform dat fabrikanten in staat stelt om real-time data van duizenden verbonden apparaten te verzamelen, verwerken en analyseren.
De Industry 4.0 Revolutie
Industry 4.0 vertegenwoordigt de vierde industriële revolutie, gekenmerkt door de integratie van cyber-fysieke systemen, Internet of Things (IoT), cloud computing en kunstmatige intelligentie in productieprocessen. Voor Nederlandse fabrikanten is deze transformatie cruciaal voor het behouden van concurrentievermogen in de wereldmarkt.
Zakelijke Drivers voor IoT in Manufacturing
Voorspellend Onderhoud: Traditioneel preventief onderhoud plant apparatuur service op vaste intervallen, wat leidt tot premature vervanging van onderdelen of onverwachte storingen. Voorspellend onderhoud gebruikt real-time sensordata en machine learning om storingen te voorspellen voordat ze optreden, waardoor downtime met 50% wordt verminderd en onderhoudskosten met 30% dalen.
Operationele Efficiëntie: Real-time monitoring van productielijnen identificeert knelpunten, kwaliteitsproblemen en inefficiënties direct. Nederlandse fabrikanten rapporteren 15-25% verbeteringen in Overall Equipment Effectiveness (OEE) na implementatie van IoT-monitoring.
Kwaliteitscontrole: Geautomatiseerde kwaliteitsinspectie met computer vision en sensordata vangt defecten eerder in het productieproces op, waardoor verspilling wordt verminderd en klanttevredenheid verbetert.
Supply Chain Optimalisatie: IoT-enabled asset tracking biedt real-time zicht op voorraadniveaus, zendingslocaties en supply chain verstoringen, wat snellere reactie op veranderingen mogelijk maakt.
Energiebeheer: Manufacturing is verantwoordelijk voor aanzienlijk energieverbruik. IoT-sensoren identificeren energieverspilling en optimaliseren verbruikspatronen, waardoor kosten worden verlaagd en duurzaamheidsdoelen worden ondersteund.
AWS IoT Services Overzicht
AWS biedt een complete IoT-stack voor manufacturing:
AWS IoT Core: Beheerde cloudservice die miljarden apparaten veilig verbindt en berichten routeert naar AWS services en applicaties.
AWS IoT Greengrass: Breidt AWS mogelijkheden uit naar edge-apparaten, waardoor lokale verwerking, machine learning inferentie en werking mogelijk is zelfs zonder verbinding met de cloud.
AWS IoT SiteWise: Verzamelt, organiseert en analyseert industriële apparatuurdata op schaal, en biedt inzichten in operationele prestaties.
AWS IoT Events: Detecteert en reageert op events van IoT-sensoren en applicaties, en triggert geautomatiseerde acties.
AWS IoT Analytics: Verwerkt, verrijkt, bewaart en analyseert IoT-data voor besluitvorming.
Amazon Timestream: Purpose-built time-series database voor IoT-applicaties, bewaart en analyseert triljoenen events per dag.
AWS IoT Core Architectuur voor Manufacturing
Een typische manufacturing IoT-architectuur op AWS bestaat uit edge-apparaten die sensordata verzamelen, veilige communicatiekanalen naar de cloud, data processing pipelines, en analytics/ML services voor inzichten.
Referentie Architectuur
Fabrieksvloer
├── Sensoren & PLCs
│ ├── Temperatuursensoren
│ ├── Vibratiesensoren
│ ├── Druksensoren
│ └── Machine controllers
│
├── Edge Gateways (IoT Greengrass)
│ ├── Lokale data verwerking
│ ├── ML inference
│ ├── Protocol vertaling
│ └── Data aggregatie
│
└── Netwerk (Beveiligd)
└── TLS 1.2+ encryptie
│
│ MQTT/HTTPS
▼
AWS Cloud
├── AWS IoT Core
│ ├── Apparaat authenticatie
│ ├── Bericht routing
│ └── Rules engine
│
├── Data Verwerking
│ ├── AWS IoT Analytics
│ ├── Amazon Kinesis
│ └── AWS Lambda
│
├── Opslag
│ ├── Amazon Timestream (time-series)
│ ├── Amazon S3 (raw data lake)
│ └── DynamoDB (metadata)
│
├── Analytics & ML
│ ├── Amazon SageMaker (predictive models)
│ ├── AWS IoT SiteWise (OEE metrics)
│ └── Amazon QuickSight (dashboards)
│
└── Applicaties
├── Real-time monitoring dashboards
├── Voorspellend onderhoud alerts
└── Kwaliteitscontrole systemen
Implementatie van IoT Core met CDK
Hier is een complete CDK-implementatie voor een manufacturing IoT-infrastructuur:
import * as cdk from 'aws-cdk-lib';
import * as iot from 'aws-cdk-lib/aws-iot';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as timestream from 'aws-cdk-lib/aws-timestream';
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as sns from 'aws-cdk-lib/aws-sns';
import * as subscriptions from 'aws-cdk-lib/aws-sns-subscriptions';
import { Construct } from 'constructs';
export class ManufacturingIoTStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// S3 bucket voor ruwe IoT-data
const dataBucket = new s3.Bucket(this, 'IoTDataBucket', {
encryption: s3.BucketEncryption.S3_MANAGED,
versioned: true,
lifecycleRules: [
{
transitions: [
{
storageClass: s3.StorageClass.INTELLIGENT_TIERING,
transitionAfter: cdk.Duration.days(30),
},
{
storageClass: s3.StorageClass.GLACIER,
transitionAfter: cdk.Duration.days(90),
},
],
},
],
});
// Timestream database voor time-series data
const timestreamDb = new timestream.CfnDatabase(this, 'IoTDatabase', {
databaseName: 'manufacturing-iot',
});
const timestreamTable = new timestream.CfnTable(this, 'SensorDataTable', {
databaseName: timestreamDb.databaseName!,
tableName: 'sensor-data',
retentionProperties: {
MemoryStoreRetentionPeriodInHours: '24',
MagneticStoreRetentionPeriodInDays: '365',
},
});
timestreamTable.addDependency(timestreamDb);
// Kinesis stream voor real-time verwerking
const dataStream = new kinesis.Stream(this, 'IoTDataStream', {
streamName: 'manufacturing-iot-stream',
shardCount: 2,
retentionPeriod: cdk.Duration.days(7),
encryption: kinesis.StreamEncryption.MANAGED,
});
// SNS topic voor alerts
const alertTopic = new sns.Topic(this, 'AlertTopic', {
displayName: 'Manufacturing IoT Alerts',
});
alertTopic.addSubscription(
new subscriptions.EmailSubscription('operations@forrict.nl')
);
// Lambda functie voor data verwerking
const dataProcessorFunction = new lambda.Function(this, 'DataProcessor', {
runtime: lambda.Runtime.PYTHON_3_11,
handler: 'index.handler',
code: lambda.Code.fromAsset('lambda/iot-processor'),
timeout: cdk.Duration.seconds(60),
memorySize: 512,
environment: {
TIMESTREAM_DB: timestreamDb.databaseName!,
TIMESTREAM_TABLE: timestreamTable.tableName!,
ALERT_TOPIC_ARN: alertTopic.topicArn,
},
});
// Permissies geven om naar Timestream te schrijven
dataProcessorFunction.addToRolePolicy(new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'timestream:WriteRecords',
'timestream:DescribeEndpoints',
],
resources: ['*'],
}));
alertTopic.grantPublish(dataProcessorFunction);
// Lambda functie voor anomalie detectie
const anomalyDetectorFunction = new lambda.Function(this, 'AnomalyDetector', {
runtime: lambda.Runtime.PYTHON_3_11,
handler: 'index.handler',
code: lambda.Code.fromAsset('lambda/anomaly-detector'),
timeout: cdk.Duration.seconds(60),
memorySize: 1024,
environment: {
ALERT_TOPIC_ARN: alertTopic.topicArn,
TIMESTREAM_DB: timestreamDb.databaseName!,
TIMESTREAM_TABLE: timestreamTable.tableName!,
},
});
anomalyDetectorFunction.addToRolePolicy(new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'timestream:Select',
'timestream:DescribeEndpoints',
],
resources: ['*'],
}));
alertTopic.grantPublish(anomalyDetectorFunction);
// IoT Rule om sensordata te verwerken
const iotRole = new iam.Role(this, 'IoTRuleRole', {
assumedBy: new iam.ServicePrincipal('iot.amazonaws.com'),
});
dataBucket.grantWrite(iotRole);
dataStream.grantWrite(iotRole);
dataProcessorFunction.grantInvoke(iotRole);
// IoT Topic Rule voor sensordata
new iot.CfnTopicRule(this, 'SensorDataRule', {
topicRulePayload: {
sql: "SELECT * FROM 'factory/+/sensor/#'",
description: 'Route sensordata naar verwerkingspipeline',
actions: [
{
lambda: {
functionArn: dataProcessorFunction.functionArn,
},
},
{
kinesis: {
roleArn: iotRole.roleArn,
streamName: dataStream.streamName,
partitionKey: '${topic(2)}', // Apparaat ID
},
},
{
s3: {
roleArn: iotRole.roleArn,
bucketName: dataBucket.bucketName,
key: 'raw/${timestamp()}/${topic(2)}.json',
},
},
],
},
});
// IoT Topic Rule voor anomalie detectie
new iot.CfnTopicRule(this, 'AnomalyDetectionRule', {
topicRulePayload: {
sql: "SELECT * FROM 'factory/+/sensor/#' WHERE temperature > 80 OR vibration > 10",
description: 'Detecteer anomalieën in sensordata',
actions: [
{
lambda: {
functionArn: anomalyDetectorFunction.functionArn,
},
},
{
sns: {
roleArn: iotRole.roleArn,
targetArn: alertTopic.topicArn,
messageFormat: 'JSON',
},
},
],
},
});
alertTopic.grantPublish(iotRole);
// IoT Thing Type voor manufacturing apparaten
new iot.CfnThingType(this, 'SensorThingType', {
thingTypeName: 'ManufacturingSensor',
thingTypeProperties: {
thingTypeDescription: 'Industriële sensor voor manufacturing',
searchableAttributes: ['locatie', 'machineType', 'fabriek'],
},
});
// IoT Policy voor apparaten
new iot.CfnPolicy(this, 'DevicePolicy', {
policyName: 'ManufacturingDevicePolicy',
policyDocument: {
Version: '2012-10-17',
Statement: [
{
Effect: 'Allow',
Action: ['iot:Connect'],
Resource: ['arn:aws:iot:*:*:client/${iot:ClientId}'],
},
{
Effect: 'Allow',
Action: ['iot:Publish'],
Resource: ['arn:aws:iot:*:*:topic/factory/*/sensor/*'],
},
{
Effect: 'Allow',
Action: ['iot:Subscribe'],
Resource: ['arn:aws:iot:*:*:topicfilter/factory/*/command/*'],
},
{
Effect: 'Allow',
Action: ['iot:Receive'],
Resource: ['arn:aws:iot:*:*:topic/factory/*/command/*'],
},
],
},
});
// Outputs
new cdk.CfnOutput(this, 'DataBucketName', {
value: dataBucket.bucketName,
description: 'S3 bucket voor ruwe IoT-data',
});
new cdk.CfnOutput(this, 'TimestreamDatabase', {
value: timestreamDb.databaseName!,
description: 'Timestream database naam',
});
new cdk.CfnOutput(this, 'KinesisStreamName', {
value: dataStream.streamName,
description: 'Kinesis stream voor real-time verwerking',
});
}
}
Deze infrastructuur biedt veilige apparaatconnectiviteit, real-time data verwerking, time-series opslag en geautomatiseerde anomalie detectie.
IoT Apparaat Simulatie en Testen
Voordat u fysieke sensoren implementeert, simuleer IoT-apparaten om uw infrastructuur te testen:
import json
import time
import random
import ssl
from datetime import datetime
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
class ManufacturingSensorSimulator:
def __init__(self, device_id, factory_id, machine_type):
self.device_id = device_id
self.factory_id = factory_id
self.machine_type = machine_type
# Initialiseer MQTT client
self.mqtt_client = AWSIoTMQTTClient(device_id)
self.mqtt_client.configureEndpoint("your-iot-endpoint.iot.eu-west-1.amazonaws.com", 8883)
self.mqtt_client.configureCredentials(
"AmazonRootCA1.pem",
f"certs/{device_id}.private.key",
f"certs/{device_id}.cert.pem"
)
# Configureer verbinding
self.mqtt_client.configureAutoReconnectBackoffTime(1, 32, 20)
self.mqtt_client.configureOfflinePublishQueueing(-1)
self.mqtt_client.configureDrainingFrequency(2)
self.mqtt_client.configureConnectDisconnectTimeout(10)
self.mqtt_client.configureMQTTOperationTimeout(5)
def connect(self):
"""Verbind met AWS IoT Core"""
self.mqtt_client.connect()
print(f"Apparaat {self.device_id} verbonden met AWS IoT Core")
def generate_sensor_data(self, introduce_anomaly=False):
"""Genereer realistische sensordata"""
# Normale bedrijfsbereiken
temperature = random.gauss(65, 5) # Gemiddeld 65°C, std dev 5
vibration = random.gauss(3, 0.5) # Gemiddeld 3 mm/s, std dev 0.5
pressure = random.gauss(100, 2) # Gemiddeld 100 PSI, std dev 2
rpm = random.gauss(1500, 50) # Gemiddeld 1500 RPM, std dev 50
power = random.gauss(75, 5) # Gemiddeld 75 kW, std dev 5
# Introduceer anomalieën voor testen
if introduce_anomaly:
anomaly_type = random.choice(['overheat', 'vibration', 'pressure'])
if anomaly_type == 'overheat':
temperature = random.gauss(90, 10) # Oververhitting
elif anomaly_type == 'vibration':
vibration = random.gauss(12, 2) # Excessieve vibratie
elif anomaly_type == 'pressure':
pressure = random.gauss(120, 5) # Hoge druk
return {
'deviceId': self.device_id,
'factoryId': self.factory_id,
'machineType': self.machine_type,
'timestamp': int(time.time() * 1000),
'sensors': {
'temperature': round(temperature, 2),
'vibration': round(vibration, 2),
'pressure': round(pressure, 2),
'rpm': round(rpm, 0),
'power': round(power, 2),
},
'operationalStatus': 'running',
'productionCount': random.randint(0, 100),
}
def publish_data(self, data):
"""Publiceer sensordata naar IoT Core"""
topic = f"factory/{self.factory_id}/sensor/{self.device_id}"
message = json.dumps(data)
self.mqtt_client.publish(topic, message, 1)
print(f"Gepubliceerd naar {topic}: {message}")
def run_simulation(self, duration_seconds=3600, interval_seconds=5, anomaly_rate=0.01):
"""Voer sensor simulatie uit"""
self.connect()
start_time = time.time()
message_count = 0
try:
while (time.time() - start_time) < duration_seconds:
# Besluit of anomalie wordt geïntroduceerd
introduce_anomaly = random.random() < anomaly_rate
# Genereer en publiceer data
data = self.generate_sensor_data(introduce_anomaly)
self.publish_data(data)
message_count += 1
time.sleep(interval_seconds)
except KeyboardInterrupt:
print("\nSimulatie gestopt door gebruiker")
finally:
print(f"\nSimulatie compleet. {message_count} berichten gepubliceerd")
self.mqtt_client.disconnect()
# Gebruiksvoorbeeld
if __name__ == "__main__":
# Simuleer meerdere machines in Amsterdam fabriek
simulators = [
ManufacturingSensorSimulator("CNC-001", "AMS-FABRIEK-01", "CNC-Machine"),
ManufacturingSensorSimulator("PERS-001", "AMS-FABRIEK-01", "Hydraulische-Pers"),
ManufacturingSensorSimulator("LAS-001", "AMS-FABRIEK-01", "Lasrobot"),
ManufacturingSensorSimulator("ASSY-001", "AMS-FABRIEK-01", "Assemblagelijn"),
]
import threading
# Voer simulatoren parallel uit
threads = []
for sim in simulators:
thread = threading.Thread(
target=sim.run_simulation,
args=(3600, 5, 0.02) # 1 uur, 5 seconden intervallen, 2% anomalie rate
)
thread.start()
threads.append(thread)
# Wacht tot alle threads klaar zijn
for thread in threads:
thread.join()
print("Alle simulaties compleet")
Deze simulator creëert realistische sensordata met occasionele anomalieën, perfect voor het testen van uw IoT-pipeline.
AWS IoT Greengrass voor Edge Computing
Manufacturing omgevingen vereisen vaak lokale verwerking vanwege netwerkbeperkingen, latency-eisen of operationele continuïteitsbehoeften. AWS IoT Greengrass brengt AWS-mogelijkheden naar de fabrieksvloer.
Greengrass Use Cases
Lokale ML Inferentie: Voer machine learning modellen direct uit op edge-apparaten voor real-time kwaliteitsinspectie, voorspellend onderhoud en procesoptimalisatie zonder cloud round-trip latency.
Protocol Vertaling: Converteer industriële protocollen (Modbus, OPC-UA, Profinet) naar MQTT voor cloudconnectiviteit.
Data Filtering en Aggregatie: Verwerk en aggregeer sensordata lokaal, waardoor bandbreedtekosten worden verlaagd door alleen relevante data naar de cloud te sturen.
Offline Operatie: Blijf data verwerken en beslissingen nemen zelfs wanneer connectiviteit met AWS is onderbroken.
Implementatie van Greengrass Componenten
# Greengrass component voor voorspellend onderhoud
import json
import time
import numpy as np
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
PublishToIoTCoreRequest,
QOS
)
class PredictiveMaintenanceComponent:
def __init__(self):
self.ipc_client = awsiot.greengrasscoreipc.connect()
self.sensor_buffer = []
self.buffer_size = 100
# Laad voorgetraind model (vereenvoudigd voorbeeld)
self.failure_threshold = 0.85
def process_sensor_data(self, sensor_data):
"""Verwerk binnenkomende sensordata"""
self.sensor_buffer.append(sensor_data)
# Houd buffer op gespecificeerde grootte
if len(self.sensor_buffer) > self.buffer_size:
self.sensor_buffer.pop(0)
# Voer voorspelling uit wanneer buffer vol is
if len(self.sensor_buffer) == self.buffer_size:
prediction = self.predict_failure()
if prediction['failure_probability'] > self.failure_threshold:
self.send_alert(prediction)
def predict_failure(self):
"""Voorspel apparatuur storing op basis van sensortrends"""
# Extract sensorwaarden
temperatures = [d['sensors']['temperature'] for d in self.sensor_buffer]
vibrations = [d['sensors']['vibration'] for d in self.sensor_buffer]
# Eenvoudige trendanalyse (vervang met daadwerkelijk ML model)
temp_trend = np.polyfit(range(len(temperatures)), temperatures, 1)[0]
vib_trend = np.polyfit(range(len(vibrations)), vibrations, 1)[0]
# Bereken storing kans
failure_prob = 0.0
if temp_trend > 0.1: # Temperatuur stijgt
failure_prob += 0.3
if vib_trend > 0.05: # Vibratie neemt toe
failure_prob += 0.4
if temperatures[-1] > 80: # Huidige temp hoog
failure_prob += 0.2
if vibrations[-1] > 8: # Huidige vibratie hoog
failure_prob += 0.2
return {
'failure_probability': min(failure_prob, 1.0),
'temperature_trend': temp_trend,
'vibration_trend': vib_trend,
'recommendation': self.get_recommendation(failure_prob),
'timestamp': int(time.time() * 1000),
}
def get_recommendation(self, failure_prob):
"""Krijg onderhoudsaanbeveling"""
if failure_prob > 0.9:
return "URGENT: Plan onmiddellijk onderhoud"
elif failure_prob > 0.7:
return "Plan onderhoud binnen 24 uur"
elif failure_prob > 0.5:
return "Plan onderhoud binnen 1 week"
else:
return "Blijf monitoren"
def send_alert(self, prediction):
"""Stuur alert naar IoT Core"""
message = {
'deviceId': self.sensor_buffer[-1]['deviceId'],
'alertType': 'predictive_maintenance',
'prediction': prediction,
}
request = PublishToIoTCoreRequest()
request.topic_name = f"factory/alerts/maintenance"
request.payload = json.dumps(message).encode('utf-8')
request.qos = QOS.AT_LEAST_ONCE
self.ipc_client.publish_to_iot_core(request)
print(f"Alert verzonden: {prediction['recommendation']}")
# Greengrass component lifecycle
component = PredictiveMaintenanceComponent()
def handler(event):
"""Handel binnenkomende sensordata af"""
sensor_data = json.loads(event)
component.process_sensor_data(sensor_data)
Time-Series Data met Amazon Timestream
Manufacturing genereert enorme hoeveelheden time-series data. Amazon Timestream is geoptimaliseerd voor deze workload en biedt snelle ingestion en query prestaties.
Data Schrijven naar Timestream
import boto3
import time
from datetime import datetime
timestream_write = boto3.client('timestream-write', region_name='eu-west-1')
timestream_query = boto3.client('timestream-query', region_name='eu-west-1')
DATABASE_NAME = 'manufacturing-iot'
TABLE_NAME = 'sensor-data'
def write_sensor_records(sensor_data_batch):
"""Schrijf batch sensorrecords naar Timestream"""
records = []
for data in sensor_data_batch:
current_time = str(int(time.time() * 1000))
# Creëer records voor elke sensormeting
for sensor_name, sensor_value in data['sensors'].items():
record = {
'Dimensions': [
{'Name': 'deviceId', 'Value': data['deviceId']},
{'Name': 'factoryId', 'Value': data['factoryId']},
{'Name': 'machineType', 'Value': data['machineType']},
{'Name': 'sensorType', 'Value': sensor_name},
],
'MeasureName': sensor_name,
'MeasureValue': str(sensor_value),
'MeasureValueType': 'DOUBLE',
'Time': current_time,
}
records.append(record)
try:
result = timestream_write.write_records(
DatabaseName=DATABASE_NAME,
TableName=TABLE_NAME,
Records=records
)
print(f"{len(records)} records geschreven naar Timestream")
return result
except Exception as e:
print(f"Fout bij schrijven naar Timestream: {e}")
raise
def query_equipment_performance(device_id, hours=24):
"""Query apparatuur prestaties over tijd"""
query = f"""
SELECT
deviceId,
BIN(time, 1h) AS hour,
AVG(CASE WHEN sensorType = 'temperature' THEN measure_value::double END) AS avg_temperature,
MAX(CASE WHEN sensorType = 'temperature' THEN measure_value::double END) AS max_temperature,
AVG(CASE WHEN sensorType = 'vibration' THEN measure_value::double END) AS avg_vibration,
MAX(CASE WHEN sensorType = 'vibration' THEN measure_value::double END) AS max_vibration,
AVG(CASE WHEN sensorType = 'power' THEN measure_value::double END) AS avg_power
FROM "{DATABASE_NAME}"."{TABLE_NAME}"
WHERE deviceId = '{device_id}'
AND time > ago({hours}h)
GROUP BY deviceId, BIN(time, 1h)
ORDER BY hour DESC
"""
try:
result = timestream_query.query(QueryString=query)
return parse_query_result(result)
except Exception as e:
print(f"Fout bij querying Timestream: {e}")
raise
def calculate_oee(factory_id, start_time, end_time):
"""Bereken Overall Equipment Effectiveness (OEE)"""
query = f"""
WITH production_data AS (
SELECT
deviceId,
COUNT(*) as total_readings,
SUM(CASE WHEN operationalStatus = 'running' THEN 1 ELSE 0 END) as running_count,
AVG(measure_value::double) as avg_production_rate
FROM "{DATABASE_NAME}"."{TABLE_NAME}"
WHERE factoryId = '{factory_id}'
AND time BETWEEN from_iso8601_timestamp('{start_time}')
AND from_iso8601_timestamp('{end_time}')
AND measureName = 'productionCount'
GROUP BY deviceId
)
SELECT
deviceId,
(running_count * 1.0 / total_readings) * 100 as availability_percent,
avg_production_rate
FROM production_data
"""
result = timestream_query.query(QueryString=query)
return parse_query_result(result)
def detect_anomalies(device_id, hours=1):
"""Detecteer anomalieën met statistische methoden"""
query = f"""
WITH sensor_stats AS (
SELECT
sensorType,
AVG(measure_value::double) as mean_value,
STDDEV(measure_value::double) as stddev_value
FROM "{DATABASE_NAME}"."{TABLE_NAME}"
WHERE deviceId = '{device_id}'
AND time > ago({hours}h)
GROUP BY sensorType
),
recent_readings AS (
SELECT
time,
sensorType,
measure_value::double as value
FROM "{DATABASE_NAME}"."{TABLE_NAME}"
WHERE deviceId = '{device_id}'
AND time > ago(5m)
)
SELECT
r.time,
r.sensorType,
r.value,
s.mean_value,
s.stddev_value,
ABS(r.value - s.mean_value) / s.stddev_value as z_score
FROM recent_readings r
JOIN sensor_stats s ON r.sensorType = s.sensorType
WHERE ABS(r.value - s.mean_value) / s.stddev_value > 3
ORDER BY r.time DESC
"""
result = timestream_query.query(QueryString=query)
anomalies = parse_query_result(result)
if anomalies:
print(f"{len(anomalies)} anomalieën gevonden voor apparaat {device_id}")
return anomalies
def parse_query_result(result):
"""Parse Timestream query resultaat"""
rows = []
for row in result['Rows']:
parsed_row = {}
for i, col in enumerate(row['Data']):
col_name = result['ColumnInfo'][i]['Name']
parsed_row[col_name] = col.get('ScalarValue', '')
rows.append(parsed_row)
return rows
Voorspellend Onderhoud met Machine Learning
Voorspellend onderhoud is een van de meest waardevolle IIoT-toepassingen. Met SageMaker kunnen we modellen trainen om apparatuur storingen te voorspellen.
Trainen van een Storing Voorspellingsmodel
import boto3
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import joblib
import sagemaker
from sagemaker.sklearn.estimator import SKLearn
# Bereid trainingsdata voor uit Timestream
def prepare_training_data(factory_id, days=90):
"""Extract en bereid trainingsdata voor"""
timestream_query = boto3.client('timestream-query', region_name='eu-west-1')
# Query historische sensordata met gelabelde storingen
query = f"""
SELECT
deviceId,
time,
AVG(CASE WHEN sensorType = 'temperature' THEN measure_value::double END) AS temperature,
AVG(CASE WHEN sensorType = 'vibration' THEN measure_value::double END) AS vibration,
AVG(CASE WHEN sensorType = 'pressure' THEN measure_value::double END) AS pressure,
AVG(CASE WHEN sensorType = 'rpm' THEN measure_value::double END) AS rpm,
AVG(CASE WHEN sensorType = 'power' THEN measure_value::double END) AS power
FROM "manufacturing-iot"."sensor-data"
WHERE factoryId = '{factory_id}'
AND time > ago({days}d)
GROUP BY deviceId, time
ORDER BY time
"""
result = timestream_query.query(QueryString=query)
# Converteer naar DataFrame
data = []
for row in result['Rows']:
data_row = {}
for i, col in enumerate(row['Data']):
col_name = result['ColumnInfo'][i]['Name']
data_row[col_name] = float(col.get('ScalarValue', 0))
data.append(data_row)
df = pd.DataFrame(data)
# Feature engineering
df = create_features(df)
return df
def create_features(df):
"""Creëer features voor ML model"""
# Rolling statistieken
for col in ['temperature', 'vibration', 'pressure', 'rpm', 'power']:
df[f'{col}_rolling_mean_1h'] = df.groupby('deviceId')[col].transform(
lambda x: x.rolling(window=12, min_periods=1).mean()
)
df[f'{col}_rolling_std_1h'] = df.groupby('deviceId')[col].transform(
lambda x: x.rolling(window=12, min_periods=1).std()
)
df[f'{col}_trend'] = df.groupby('deviceId')[col].transform(
lambda x: x.diff()
)
# Veranderingssnelheid
df['temp_rate_of_change'] = df.groupby('deviceId')['temperature'].diff()
df['vibration_rate_of_change'] = df.groupby('deviceId')['vibration'].diff()
# Interactie features
df['temp_vibration_interaction'] = df['temperature'] * df['vibration']
df['pressure_rpm_ratio'] = df['pressure'] / (df['rpm'] + 1)
# Tijd-gebaseerde features
df['hour'] = pd.to_datetime(df['time'], unit='ms').dt.hour
df['day_of_week'] = pd.to_datetime(df['time'], unit='ms').dt.dayofweek
# Drop NaN waarden van rolling berekeningen
df = df.fillna(method='bfill')
return df
def train_failure_prediction_model(training_data):
"""Train Random Forest model voor storing voorspelling"""
# Bereid features en target voor
feature_columns = [col for col in training_data.columns
if col not in ['deviceId', 'time', 'failure_within_24h']]
X = training_data[feature_columns]
y = training_data['failure_within_24h']
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Train model
model = RandomForestClassifier(
n_estimators=200,
max_depth=15,
min_samples_split=10,
min_samples_leaf=4,
class_weight='balanced',
random_state=42,
n_jobs=-1
)
model.fit(X_train, y_train)
# Evalueer
y_pred = model.predict(X_test)
print("\nModel Prestaties:")
print(classification_report(y_test, y_pred))
print("\nConfusion Matrix:")
print(confusion_matrix(y_test, y_pred))
# Feature importance
feature_importance = pd.DataFrame({
'feature': feature_columns,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
print("\nTop 10 Belangrijkste Features:")
print(feature_importance.head(10))
return model, feature_columns
# Deploy model naar SageMaker
def deploy_to_sagemaker(model, feature_columns):
"""Deploy getraind model naar SageMaker endpoint"""
# Sla model en feature kolommen op
joblib.dump(model, 'model.joblib')
joblib.dump(feature_columns, 'feature_columns.joblib')
# Upload naar S3
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
prefix = 'manufacturing-predictive-maintenance'
model_data = sagemaker_session.upload_data(
path='model.joblib',
bucket=bucket,
key_prefix=f'{prefix}/model'
)
# Creëer SageMaker model
role = sagemaker.get_execution_role()
sklearn_model = SKLearn(
entry_point='inference.py',
role=role,
instance_type='ml.m5.large',
framework_version='1.2-1',
py_version='py3',
)
# Deploy endpoint
predictor = sklearn_model.deploy(
initial_instance_count=1,
instance_type='ml.t2.medium',
endpoint_name='manufacturing-maintenance-predictor'
)
print(f"Model gedeployed naar endpoint: manufacturing-maintenance-predictor")
return predictor
Real-Time Fabrieks Monitoring Dashboard
Visualiseer fabrieksoperaties real-time met Amazon QuickSight of custom dashboards.
Best Practices voor Manufacturing IoT
1. Beveiliging
- Gebruik X.509 certificaten voor apparaat authenticatie
- Implementeer AWS IoT Device Defender voor beveiligingsmonitoring
- Schakel AWS IoT Secure Tunneling in voor remote apparaat toegang
- Gebruik VPN of AWS Direct Connect voor fabriek connectiviteit
- Implementeer least-privilege IAM policies
- Schakel AWS CloudTrail in voor audit logging
2. Schaalbaarheid
- Gebruik IoT Core message broker voor miljoenen apparaten
- Implementeer data aggregatie aan de edge met Greengrass
- Gebruik Kinesis Data Streams voor high-throughput ingestion
- Partitioneer time-series data effectief in Timestream
- Implementeer caching strategieën voor veelvuldig opgevraagde data
3. Betrouwbaarheid
- Deploy Greengrass voor offline operatie mogelijkheid
- Implementeer message buffering en retry logica
- Gebruik DynamoDB voor apparaat state management
- Stel multi-region failover in voor kritische systemen
- Monitor apparaat connectiviteit en gezondheid
4. Kostenoptimalisatie
- Filter en aggregeer data aan de edge voordat naar cloud wordt gestuurd
- Gebruik geschikte Timestream retention policies
- Implementeer S3 lifecycle policies voor gearchiveerde data
- Right-size IoT Core message routing rules
- Gebruik reserved capacity voor voorspelbare workloads
Conclusie
AWS biedt een uitgebreid platform voor het implementeren van Industry 4.0-initiatieven in manufacturing. Van apparaatconnectiviteit met IoT Core, tot edge processing met Greengrass, tot voorspellend onderhoud met SageMaker - Nederlandse fabrikanten kunnen hun operaties transformeren om efficiënter, betrouwbaarder en concurrerender te worden.
De combinatie van real-time monitoring, predictive analytics en geautomatiseerde responses stelt fabrikanten in staat om:
- Ongeplande downtime met 30-50% te verminderen
- OEE met 15-25% te verbeteren
- Onderhoudskosten met 20-30% te verlagen
- Productkwaliteit te verbeteren door vroege defect detectie
- Energieverbruik te optimaliseren en kosten te verlagen
Bij Forrict helpen we Nederlandse productiebedrijven IoT-oplossingen op AWS te ontwerpen en implementeren. Ons team heeft ervaring in verschillende productiesectoren waaronder hightech apparatuur (ASML-stijl), automotive (DAF-geïnspireerd) en voedselverwerking. Neem contact met ons op om te bespreken hoe we u kunnen helpen Industry 4.0-technologieën te implementeren en uw productieoperaties te transformeren.
Forrict Team
AWS expert en consultant bij Forrict, gespecialiseerd in cloud architectuur en AWS best practices voor Nederlandse bedrijven.

