Healthcare Cloud Solutions with AWS in the Netherlands: NEN7510 & WZD Compliance
Design secure, compliant AWS healthcare architectures meeting Dutch NEN7510 standards, WZD regulations, and GDPR requirements for patient data protection
Introduction
The Dutch healthcare sector operates under strict regulatory frameworks designed to protect patient privacy and ensure data security. Moving healthcare workloads to AWS requires compliance with NEN7510 (the Dutch healthcare information security standard), WZD (Wet Zorg en Dwang - Care and Coercion Act), GDPR, and alignment with international standards like HIPAA.
This comprehensive guide explores how to build AWS architectures specifically designed for Dutch healthcare providers, hospitals, and medical device manufacturers while meeting all regulatory obligations.
What You’ll Learn:
- NEN7510 compliance requirements and AWS implementation
- WZD regulations for patient data handling
- Healthcare-specific security controls and encryption standards
- Audit trails and access logging for medical records
- Patient consent management systems
- Infrastructure-as-Code for compliant healthcare deployments
- Integration patterns for legacy hospital systems (HL7, FHIR)
Understanding Dutch Healthcare Regulations
NEN7510: Healthcare Information Security Standard
NEN7510 is the Dutch healthcare sector’s adaptation of ISO 27002, specifically tailored for healthcare organizations.
Core Requirements:
- Risk Assessment: Comprehensive information security risk analysis
- Access Control: Role-based access with least privilege principle
- Audit Logging: Detailed logging of all patient data access
- Encryption: Data encryption at rest and in transit
- Business Continuity: Healthcare-specific DR and backup requirements
- Incident Management: Security incident response procedures
- Third-Party Management: Due diligence on cloud service providers
NEN7510 Security Domains:
- Organizational Security: Policies, procedures, and governance
- Physical Security: Data center and facility security
- Technical Security: Encryption, access control, monitoring
- Operational Security: Change management, incident response
WZD (Wet Zorg en Dwang)
The Care and Coercion Act regulates the use of coercion and force in healthcare:
Key Technical Requirements:
- Incident Registration: All coercion incidents must be logged
- Data Retention: Minimum 15-year retention for coercion records
- Access Controls: Restricted access to sensitive mental health data
- Audit Trail: Complete audit trail of data access and modifications
- Patient Rights: Support for patient data access requests
GDPR in Healthcare Context
Healthcare data is “special category data” under GDPR:
Additional Requirements:
- Explicit Consent: Patient consent for data processing
- Purpose Limitation: Data only used for specified medical purposes
- Data Minimization: Collect only necessary medical data
- Right to Erasure: Complex in healthcare due to retention requirements
- Data Portability: Patients can request their medical records
- Breach Notification: 72-hour breach notification to DPA
HIPAA Alignment
While not required in Netherlands, HIPAA alignment provides additional security:
HIPAA Principles Applied:
- Administrative Safeguards: Security management processes
- Physical Safeguards: Facility access controls, workstation security
- Technical Safeguards: Access control, audit controls, integrity controls
- Privacy Rule: Patient privacy protections
- Security Rule: Electronic PHI (ePHI) protection
NEN7510-Compliant AWS Architecture
1. Healthcare Data Encryption Architecture
// lib/healthcare-encryption-stack.ts
import * as cdk from 'aws-cdk-lib';
import * as kms from 'aws-cdk-lib/aws-kms';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as rds from 'aws-cdk-lib/aws-rds';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as secretsmanager from 'aws-cdk-lib/aws-secretsmanager';
import { Construct } from 'constructs';
export class HealthcareEncryptionStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// KMS key for PHI encryption (NEN7510 requirement)
const phiEncryptionKey = new kms.Key(this, 'PHIEncryptionKey', {
description: 'KMS key for Protected Health Information encryption',
enableKeyRotation: true, // NEN7510 requires key rotation
removalPolicy: cdk.RemovalPolicy.RETAIN,
policy: new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
sid: 'Enable IAM User Permissions',
effect: iam.Effect.ALLOW,
principals: [new iam.AccountRootPrincipal()],
actions: ['kms:*'],
resources: ['*'],
}),
new iam.PolicyStatement({
sid: 'Allow healthcare services',
effect: iam.Effect.ALLOW,
principals: [
new iam.ServicePrincipal('s3.amazonaws.com'),
new iam.ServicePrincipal('rds.amazonaws.com'),
new iam.ServicePrincipal('lambda.amazonaws.com'),
],
actions: [
'kms:Decrypt',
'kms:DescribeKey',
'kms:CreateGrant',
],
resources: ['*'],
conditions: {
StringEquals: {
'kms:ViaService': [
`s3.${this.region}.amazonaws.com`,
`rds.${this.region}.amazonaws.com`,
],
},
},
}),
],
}),
});
// S3 bucket for medical imaging (DICOM files)
const medicalImagingBucket = new s3.Bucket(this, 'MedicalImagingBucket', {
bucketName: `medical-imaging-${this.account}-${this.region}`,
encryption: s3.BucketEncryption.KMS,
encryptionKey: phiEncryptionKey,
versioned: true, // NEN7510 requires version control
blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
enforceSSL: true,
objectOwnership: s3.ObjectOwnership.BUCKET_OWNER_ENFORCED,
lifecycleRules: [
{
id: 'TransitionOldScans',
transitions: [
{
storageClass: s3.StorageClass.INTELLIGENT_TIERING,
transitionAfter: cdk.Duration.days(90),
},
{
storageClass: s3.StorageClass.GLACIER,
transitionAfter: cdk.Duration.days(365),
},
],
// WZD requires 15-year retention for mental health records
expiration: cdk.Duration.days(5475), // 15 years
},
],
serverAccessLogsPrefix: 'access-logs/',
removalPolicy: cdk.RemovalPolicy.RETAIN,
});
// Enable S3 Object Lock for immutable medical records
const cfnBucket = medicalImagingBucket.node.defaultChild as s3.CfnBucket;
cfnBucket.objectLockEnabled = true;
cfnBucket.objectLockConfiguration = {
objectLockEnabled: 'Enabled',
rule: {
defaultRetention: {
mode: 'GOVERNANCE',
years: 15, // WZD requirement
},
},
};
// VPC for healthcare workloads
const vpc = new ec2.Vpc(this, 'HealthcareVPC', {
maxAzs: 3,
natGateways: 2,
subnetConfiguration: [
{
name: 'Public',
subnetType: ec2.SubnetType.PUBLIC,
cidrMask: 24,
},
{
name: 'Private',
subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS,
cidrMask: 24,
},
{
name: 'Isolated',
subnetType: ec2.SubnetType.PRIVATE_ISOLATED,
cidrMask: 24,
},
],
flowLogs: {
's3': {
destination: ec2.FlowLogDestination.toS3(medicalImagingBucket, 'vpc-flow-logs/'),
trafficType: ec2.FlowLogTrafficType.ALL,
},
},
});
// Database credentials in Secrets Manager
const dbCredentials = new secretsmanager.Secret(this, 'DBCredentials', {
secretName: 'healthcare-db-credentials',
description: 'RDS credentials for patient database',
generateSecretString: {
secretStringTemplate: JSON.stringify({ username: 'admin' }),
generateStringKey: 'password',
excludePunctuation: true,
passwordLength: 32,
requireEachIncludedType: true,
},
encryptionKey: phiEncryptionKey,
});
// RDS for patient records with encryption
const patientDatabase = new rds.DatabaseInstance(this, 'PatientDatabase', {
engine: rds.DatabaseInstanceEngine.postgres({
version: rds.PostgresEngineVersion.VER_15_4,
}),
instanceType: ec2.InstanceType.of(
ec2.InstanceClass.R6G,
ec2.InstanceSize.XLARGE
),
credentials: rds.Credentials.fromSecret(dbCredentials),
vpc,
vpcSubnets: {
subnetType: ec2.SubnetType.PRIVATE_ISOLATED, // No internet access
},
multiAz: true, // High availability for healthcare
storageEncrypted: true,
storageEncryptionKey: phiEncryptionKey,
// NEN7510 requires backup
backupRetention: cdk.Duration.days(35),
preferredBackupWindow: '03:00-04:00',
deletionProtection: true,
cloudwatchLogsExports: ['postgresql', 'upgrade'],
// Enable Performance Insights with encryption
enablePerformanceInsights: true,
performanceInsightEncryptionKey: phiEncryptionKey,
performanceInsightRetention: rds.PerformanceInsightRetention.MONTHS_6,
// Audit logging
parameterGroup: new rds.ParameterGroup(this, 'PatientDBParams', {
engine: rds.DatabaseInstanceEngine.postgres({
version: rds.PostgresEngineVersion.VER_15_4,
}),
parameters: {
'log_connections': '1',
'log_disconnections': '1',
'log_duration': '1',
'log_statement': 'all', // Log all SQL statements for audit
'rds.force_ssl': '1', // Enforce SSL connections
},
}),
removalPolicy: cdk.RemovalPolicy.RETAIN,
});
// Security group for database - very restrictive
patientDatabase.connections.allowFrom(
ec2.Peer.ipv4(vpc.vpcCidrBlock),
ec2.Port.tcp(5432),
'Allow PostgreSQL from VPC only'
);
// CloudWatch alarms for security monitoring
const unauthorizedAccessAlarm = new cdk.aws_cloudwatch.Alarm(
this,
'UnauthorizedAccessAlarm',
{
alarmName: 'healthcare-unauthorized-access',
metric: new cdk.aws_cloudwatch.Metric({
namespace: 'AWS/RDS',
metricName: 'FailedSQLServerAgentJobsCount',
dimensionsMap: {
DBInstanceIdentifier: patientDatabase.instanceIdentifier,
},
}),
threshold: 5,
evaluationPeriods: 1,
datapointsToAlarm: 1,
}
);
// Secrets rotation for NEN7510 compliance
new secretsmanager.SecretRotation(this, 'SecretRotation', {
application: secretsmanager.SecretRotationApplication.POSTGRES_ROTATION_SINGLE_USER,
secret: dbCredentials,
target: patientDatabase,
vpc,
automaticallyAfter: cdk.Duration.days(90), // Rotate every 90 days
});
// Outputs
new cdk.CfnOutput(this, 'PHIEncryptionKeyArn', {
value: phiEncryptionKey.keyArn,
description: 'KMS key for PHI encryption',
});
new cdk.CfnOutput(this, 'MedicalImagingBucketName', {
value: medicalImagingBucket.bucketName,
description: 'S3 bucket for medical imaging (DICOM)',
});
new cdk.CfnOutput(this, 'PatientDatabaseEndpoint', {
value: patientDatabase.dbInstanceEndpointAddress,
description: 'Patient database endpoint (private)',
});
}
}
2. Audit Logging and Access Control
# scripts/healthcare_audit.py
import boto3
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import hashlib
class HealthcareAuditLogger:
"""NEN7510-compliant audit logging for healthcare applications"""
def __init__(self, region: str = 'eu-central-1'):
self.cloudtrail = boto3.client('cloudtrail', region_name=region)
self.cloudwatch = boto3.client('logs', region_name=region)
self.dynamodb = boto3.resource('dynamodb', region_name=region)
self.region = region
# Audit log table (15-year retention for WZD)
self.audit_table = self.dynamodb.Table('HealthcareAuditLog')
def log_patient_access(
self,
user_id: str,
patient_id: str,
access_type: str,
resource: str,
purpose: str,
ip_address: str,
user_agent: str
) -> str:
"""
Log patient data access (NEN7510 requirement)
Args:
user_id: Healthcare professional ID
patient_id: Patient identifier (hashed)
access_type: READ, WRITE, DELETE, EXPORT
resource: Resource accessed (record type)
purpose: Medical purpose for access
ip_address: Source IP
user_agent: User agent string
Returns:
Audit log ID
"""
timestamp = datetime.utcnow()
audit_id = hashlib.sha256(
f"{user_id}{patient_id}{timestamp.isoformat()}".encode()
).hexdigest()
# Hash patient ID for privacy (NEN7510 pseudonymization)
patient_hash = hashlib.sha256(patient_id.encode()).hexdigest()
audit_entry = {
'auditId': audit_id,
'timestamp': timestamp.isoformat(),
'userId': user_id,
'patientHash': patient_hash, # Pseudonymized
'accessType': access_type,
'resource': resource,
'purpose': purpose,
'ipAddress': ip_address,
'userAgent': user_agent,
'ttl': int((timestamp + timedelta(days=5475)).timestamp()) # 15 years
}
# Store in DynamoDB with TTL
self.audit_table.put_item(Item=audit_entry)
# Also log to CloudWatch for real-time monitoring
self._log_to_cloudwatch(audit_entry)
return audit_id
def log_consent_change(
self,
patient_id: str,
consent_type: str,
consent_given: bool,
granted_by: str,
effective_date: datetime
) -> str:
"""
Log patient consent changes (GDPR requirement)
Args:
patient_id: Patient identifier
consent_type: Type of consent (treatment, research, data sharing)
consent_given: True if consent granted, False if withdrawn
granted_by: Who granted/withdrew consent
effective_date: When consent becomes effective
Returns:
Consent log ID
"""
timestamp = datetime.utcnow()
consent_id = hashlib.sha256(
f"CONSENT_{patient_id}{timestamp.isoformat()}".encode()
).hexdigest()
patient_hash = hashlib.sha256(patient_id.encode()).hexdigest()
consent_entry = {
'consentId': consent_id,
'timestamp': timestamp.isoformat(),
'patientHash': patient_hash,
'consentType': consent_type,
'consentGiven': consent_given,
'grantedBy': granted_by,
'effectiveDate': effective_date.isoformat(),
'ttl': int((timestamp + timedelta(days=5475)).timestamp())
}
self.audit_table.put_item(Item=consent_entry)
return consent_id
def query_patient_access_history(
self,
patient_id: str,
start_date: datetime,
end_date: datetime
) -> List[Dict]:
"""
Query patient access history (for patient rights requests)
Args:
patient_id: Patient identifier
start_date: Start of query period
end_date: End of query period
Returns:
List of access events
"""
patient_hash = hashlib.sha256(patient_id.encode()).hexdigest()
# Query DynamoDB
response = self.audit_table.query(
IndexName='PatientHashIndex',
KeyConditionExpression='patientHash = :ph AND timestamp BETWEEN :start AND :end',
ExpressionAttributeValues={
':ph': patient_hash,
':start': start_date.isoformat(),
':end': end_date.isoformat()
}
)
return response.get('Items', [])
def generate_nen7510_report(
self,
start_date: datetime,
end_date: datetime,
output_file: str = 'nen7510_audit_report.json'
) -> Dict:
"""
Generate NEN7510 compliance audit report
Returns comprehensive audit report including:
- Access patterns
- Consent management
- Security incidents
- Encryption status
"""
report = {
'report_metadata': {
'standard': 'NEN7510:2017',
'generated_at': datetime.utcnow().isoformat(),
'period_start': start_date.isoformat(),
'period_end': end_date.isoformat(),
'region': self.region
},
'access_audit': self._audit_access_patterns(start_date, end_date),
'consent_audit': self._audit_consent_management(start_date, end_date),
'security_audit': self._audit_security_controls(),
'encryption_audit': self._audit_encryption_status(),
'breach_incidents': self._check_security_breaches(start_date, end_date),
}
# Save report
with open(output_file, 'w') as f:
json.dump(report, f, indent=2, default=str)
print(f"NEN7510 audit report saved to {output_file}")
return report
def _audit_access_patterns(
self,
start_date: datetime,
end_date: datetime
) -> Dict:
"""Analyze access patterns for anomalies"""
# Scan audit table for period
response = self.audit_table.scan(
FilterExpression='#ts BETWEEN :start AND :end',
ExpressionAttributeNames={'#ts': 'timestamp'},
ExpressionAttributeValues={
':start': start_date.isoformat(),
':end': end_date.isoformat()
}
)
access_events = response.get('Items', [])
# Analyze patterns
access_by_type = {}
access_by_user = {}
unusual_access_times = []
for event in access_events:
# Count by access type
access_type = event.get('accessType', 'UNKNOWN')
access_by_type[access_type] = access_by_type.get(access_type, 0) + 1
# Count by user
user_id = event.get('userId', 'UNKNOWN')
access_by_user[user_id] = access_by_user.get(user_id, 0) + 1
# Check for unusual access times (outside business hours)
event_time = datetime.fromisoformat(event['timestamp'])
if event_time.hour < 7 or event_time.hour > 19:
unusual_access_times.append({
'timestamp': event['timestamp'],
'user': user_id,
'resource': event.get('resource')
})
return {
'total_access_events': len(access_events),
'access_by_type': access_by_type,
'top_users': sorted(
access_by_user.items(),
key=lambda x: x[1],
reverse=True
)[:10],
'unusual_access_times': unusual_access_times[:50]
}
def _audit_consent_management(
self,
start_date: datetime,
end_date: datetime
) -> Dict:
"""Audit consent management compliance"""
response = self.audit_table.scan(
FilterExpression='begins_with(consentId, :prefix) AND #ts BETWEEN :start AND :end',
ExpressionAttributeNames={'#ts': 'timestamp'},
ExpressionAttributeValues={
':prefix': 'CONSENT_',
':start': start_date.isoformat(),
':end': end_date.isoformat()
}
)
consent_events = response.get('Items', [])
consents_granted = sum(1 for e in consent_events if e.get('consentGiven'))
consents_withdrawn = sum(1 for e in consent_events if not e.get('consentGiven'))
return {
'total_consent_events': len(consent_events),
'consents_granted': consents_granted,
'consents_withdrawn': consents_withdrawn,
'consent_types': list(set(e.get('consentType') for e in consent_events))
}
def _audit_security_controls(self) -> Dict:
"""Verify security controls are in place"""
kms = boto3.client('kms', region_name=self.region)
config = boto3.client('config', region_name=self.region)
# Check KMS key rotation
keys = kms.list_keys()
key_rotation_status = []
for key in keys.get('Keys', [])[:10]: # Check first 10 keys
key_id = key['KeyId']
try:
rotation = kms.get_key_rotation_status(KeyId=key_id)
key_rotation_status.append({
'key_id': key_id,
'rotation_enabled': rotation.get('KeyRotationEnabled', False)
})
except:
pass
# Check AWS Config compliance
compliance = config.describe_compliance_by_config_rule()
compliant_rules = sum(
1 for r in compliance.get('ComplianceByConfigRules', [])
if r.get('Compliance', {}).get('ComplianceType') == 'COMPLIANT'
)
return {
'kms_key_rotation': key_rotation_status,
'config_compliant_rules': compliant_rules,
'total_config_rules': len(compliance.get('ComplianceByConfigRules', []))
}
def _audit_encryption_status(self) -> Dict:
"""Verify encryption of all healthcare data"""
s3 = boto3.client('s3', region_name=self.region)
rds = boto3.client('rds', region_name=self.region)
encryption_status = {
's3_buckets': [],
'rds_instances': []
}
# Check S3 encryption
buckets = s3.list_buckets()
for bucket in buckets.get('Buckets', []):
bucket_name = bucket['Name']
if 'healthcare' in bucket_name or 'medical' in bucket_name or 'patient' in bucket_name:
try:
encryption = s3.get_bucket_encryption(Bucket=bucket_name)
encryption_status['s3_buckets'].append({
'bucket': bucket_name,
'encrypted': True,
'sse_algorithm': encryption['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm']
})
except:
encryption_status['s3_buckets'].append({
'bucket': bucket_name,
'encrypted': False
})
# Check RDS encryption
db_instances = rds.describe_db_instances()
for db in db_instances.get('DBInstances', []):
if 'patient' in db['DBInstanceIdentifier'] or 'healthcare' in db['DBInstanceIdentifier']:
encryption_status['rds_instances'].append({
'db_instance': db['DBInstanceIdentifier'],
'storage_encrypted': db['StorageEncrypted'],
'kms_key': db.get('KmsKeyId')
})
return encryption_status
def _check_security_breaches(
self,
start_date: datetime,
end_date: datetime
) -> List[Dict]:
"""Check for potential security breaches (72-hour GDPR notification)"""
guardduty = boto3.client('guardduty', region_name=self.region)
breaches = []
try:
detectors = guardduty.list_detectors()
for detector_id in detectors.get('DetectorIds', []):
findings = guardduty.list_findings(
DetectorId=detector_id,
FindingCriteria={
'Criterion': {
'severity': {'Gte': 7}, # High and Critical only
'updatedAt': {
'Gte': int(start_date.timestamp() * 1000),
'Lte': int(end_date.timestamp() * 1000)
}
}
}
)
if findings.get('FindingIds'):
finding_details = guardduty.get_findings(
DetectorId=detector_id,
FindingIds=findings['FindingIds']
)
for finding in finding_details.get('Findings', []):
breaches.append({
'finding_id': finding['Id'],
'severity': finding['Severity'],
'type': finding['Type'],
'description': finding['Description'],
'created_at': finding['CreatedAt']
})
except Exception as e:
print(f"GuardDuty error: {str(e)}")
return breaches
def _log_to_cloudwatch(self, audit_entry: Dict) -> None:
"""Log audit entry to CloudWatch for real-time monitoring"""
try:
self.cloudwatch.put_log_events(
logGroupName='/healthcare/audit-logs',
logStreamName=datetime.utcnow().strftime('%Y-%m-%d'),
logEvents=[
{
'timestamp': int(datetime.utcnow().timestamp() * 1000),
'message': json.dumps(audit_entry)
}
]
)
except self.cloudwatch.exceptions.ResourceNotFoundException:
# Create log group if it doesn't exist
self.cloudwatch.create_log_group(
logGroupName='/healthcare/audit-logs'
)
self.cloudwatch.put_retention_policy(
logGroupName='/healthcare/audit-logs',
retentionInDays=5475 # 15 years for WZD
)
# Example usage
if __name__ == '__main__':
auditor = HealthcareAuditLogger(region='eu-central-1')
# Log patient access
audit_id = auditor.log_patient_access(
user_id='DR_12345',
patient_id='PATIENT_67890',
access_type='READ',
resource='MEDICAL_RECORD',
purpose='Treatment consultation',
ip_address='10.0.1.50',
user_agent='Mozilla/5.0 (Hospital Workstation)'
)
print(f"Access logged: {audit_id}")
# Log consent change
consent_id = auditor.log_consent_change(
patient_id='PATIENT_67890',
consent_type='RESEARCH_PARTICIPATION',
consent_given=True,
granted_by='PATIENT_67890',
effective_date=datetime.utcnow()
)
print(f"Consent logged: {consent_id}")
# Generate compliance report
report = auditor.generate_nen7510_report(
start_date=datetime.utcnow() - timedelta(days=30),
end_date=datetime.utcnow(),
output_file='nen7510_compliance_report.json'
)
print("\n=== NEN7510 Compliance Report ===")
print(f"Total access events: {report['access_audit']['total_access_events']}")
print(f"Consent events: {report['consent_audit']['total_consent_events']}")
print(f"Security breaches detected: {len(report['breach_incidents'])}")
3. HL7/FHIR Integration for Hospital Systems
// lib/healthcare-integration-stack.ts
import * as cdk from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as apigateway from 'aws-cdk-lib/aws-apigateway';
import * as healthlake from 'aws-cdk-lib/aws-healthlake';
import { Construct } from 'constructs';
export class HealthcareIntegrationStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// AWS HealthLake FHIR datastore
const fhirDatastore = new healthlake.CfnFHIRDatastore(this, 'FHIRDatastore', {
datastoreName: 'patient-records-fhir',
datastoreTypeVersion: 'R4', // FHIR R4 standard
preloadDataConfig: {
preloadDataType: 'SYNTHEA', // Optional: Load synthetic data for testing
},
});
// HL7 to FHIR conversion Lambda
const hl7ToFhirConverter = new lambda.Function(this, 'HL7ToFHIRConverter', {
runtime: lambda.Runtime.PYTHON_3_11,
handler: 'index.handler',
code: lambda.Code.fromInline(`
import json
import boto3
from datetime import datetime
def handler(event, context):
"""
Convert HL7 messages to FHIR resources
Supports HL7 v2.x messages (ADT, ORU, ORM)
"""
hl7_message = event.get('body', '')
# Parse HL7 message
segments = hl7_message.split('\\r')
message_type = extract_message_type(segments)
if message_type.startswith('ADT'): # Admission/Discharge/Transfer
fhir_resource = convert_adt_to_fhir(segments)
elif message_type.startswith('ORU'): # Observation Result
fhir_resource = convert_oru_to_fhir(segments)
elif message_type.startswith('ORM'): # Order Message
fhir_resource = convert_orm_to_fhir(segments)
else:
return {
'statusCode': 400,
'body': json.dumps({'error': f'Unsupported message type: {message_type}'})
}
# Store in HealthLake
healthlake = boto3.client('healthlake')
# Note: Actual HealthLake API call would go here
return {
'statusCode': 200,
'body': json.dumps({
'message': 'HL7 message converted to FHIR',
'fhir_resource': fhir_resource
})
}
def extract_message_type(segments):
"""Extract message type from MSH segment"""
msh_segment = segments[0]
fields = msh_segment.split('|')
return fields[8] if len(fields) > 8 else 'UNKNOWN'
def convert_adt_to_fhir(segments):
"""Convert ADT message to FHIR Patient resource"""
pid_segment = next((s for s in segments if s.startswith('PID')), '')
fields = pid_segment.split('|')
# Example FHIR Patient resource
return {
'resourceType': 'Patient',
'id': fields[3] if len(fields) > 3 else 'unknown',
'name': [{
'family': fields[5].split('^')[0] if len(fields) > 5 else '',
'given': [fields[5].split('^')[1]] if len(fields) > 5 else []
}],
'birthDate': fields[7] if len(fields) > 7 else '',
'gender': 'male' if fields[8] == 'M' else 'female' if fields[8] == 'F' else 'unknown'
}
def convert_oru_to_fhir(segments):
"""Convert ORU message to FHIR Observation resource"""
obr_segment = next((s for s in segments if s.startswith('OBR')), '')
obx_segment = next((s for s in segments if s.startswith('OBX')), '')
return {
'resourceType': 'Observation',
'status': 'final',
'code': {
'coding': [{
'system': 'http://loinc.org',
'code': 'extracted_from_obr'
}]
},
'valueQuantity': {
'value': 'extracted_from_obx'
}
}
def convert_orm_to_fhir(segments):
"""Convert ORM message to FHIR ServiceRequest resource"""
return {
'resourceType': 'ServiceRequest',
'status': 'active',
'intent': 'order'
}
`),
timeout: cdk.Duration.seconds(30),
});
// API Gateway for HL7 ingestion
const hl7Api = new apigateway.RestApi(this, 'HL7IntegrationAPI', {
restApiName: 'HL7 to FHIR Integration',
description: 'Accepts HL7 messages and converts to FHIR',
deployOptions: {
stageName: 'prod',
tracingEnabled: true,
},
});
const hl7Resource = hl7Api.root.addResource('hl7');
hl7Resource.addMethod(
'POST',
new apigateway.LambdaIntegration(hl7ToFhirConverter)
);
new cdk.CfnOutput(this, 'HL7APIEndpoint', {
value: hl7Api.url,
description: 'HL7 ingestion endpoint',
});
new cdk.CfnOutput(this, 'FHIRDatastoreId', {
value: fhirDatastore.attrDatastoreId,
description: 'HealthLake FHIR datastore ID',
});
}
}
Best Practices for Dutch Healthcare Providers
1. Data Protection
- Encrypt all PHI/patient data at rest and in transit
- Use KMS with automatic key rotation (NEN7510)
- Implement pseudonymization for analytics
- Enable S3 Object Lock for immutable records
- 15-year retention for WZD-regulated data
2. Access Control
- Role-based access control (RBAC) for healthcare staff
- Multi-factor authentication for all users
- Break-glass procedures for emergencies
- Regular access reviews and certifications
- Principle of least privilege
3. Audit and Compliance
- Comprehensive audit logging of all patient data access
- Real-time monitoring of unusual access patterns
- 72-hour breach notification procedures (GDPR)
- Regular NEN7510 compliance audits
- Patient consent management system
4. Integration
- Support for HL7 v2.x and FHIR standards
- Secure integration with existing hospital systems
- API gateway with healthcare-specific throttling
- Message queue for reliable HL7 processing
- Data validation and error handling
5. Business Continuity
- Multi-AZ deployment for 99.99% availability
- Cross-region backup within EU
- Regular DR testing with healthcare scenarios
- RTO < 1 hour for critical systems
- Documented emergency procedures
Conclusion
Building compliant healthcare solutions on AWS in the Netherlands requires strict adherence to NEN7510, WZD, and GDPR regulations. By implementing proper encryption, comprehensive audit logging, role-based access controls, and healthcare-specific integration patterns, medical providers can leverage cloud benefits while protecting patient data and meeting regulatory obligations.
Key Takeaways:
- Implement NEN7510-compliant encryption for all patient data
- Maintain comprehensive 15-year audit logs (WZD requirement)
- Use pseudonymization for patient privacy
- Support HL7/FHIR standards for hospital integration
- Regular compliance audits and breach detection
- Multi-factor authentication and RBAC for all access
Ready to modernize your healthcare infrastructure? Forrict specializes in NEN7510-compliant AWS architectures for Dutch healthcare providers, hospitals, and medical device manufacturers.
Resources
- NEN7510 Standard Documentation
- WZD Regulations
- AWS Healthcare Compliance
- FHIR Standard
- HL7 Integration Guide
Forrict Team
AWS expert and consultant at Forrict, specializing in cloud architecture and AWS best practices for Dutch businesses.