Healthcare Cloud Solutions with AWS in the Netherlands: NEN7510 & WZD Compliance | Forrict Skip to main content
Healthcare Compliance Security

Healthcare Cloud Solutions with AWS in the Netherlands: NEN7510 & WZD Compliance

Forrict Team
Healthcare Cloud Solutions with AWS in the Netherlands: NEN7510 & WZD Compliance
Build HIPAA-aligned, NEN7510-compliant healthcare systems on AWS for Dutch healthcare providers with patient data encryption and audit controls

Healthcare Cloud Solutions with AWS in the Netherlands: NEN7510 & WZD Compliance

Design secure, compliant AWS healthcare architectures meeting Dutch NEN7510 standards, WZD regulations, and GDPR requirements for patient data protection

Introduction

The Dutch healthcare sector operates under strict regulatory frameworks designed to protect patient privacy and ensure data security. Moving healthcare workloads to AWS requires compliance with NEN7510 (the Dutch healthcare information security standard), WZD (Wet Zorg en Dwang - Care and Coercion Act), GDPR, and alignment with international standards like HIPAA.

This comprehensive guide explores how to build AWS architectures specifically designed for Dutch healthcare providers, hospitals, and medical device manufacturers while meeting all regulatory obligations.

What You’ll Learn:

  • NEN7510 compliance requirements and AWS implementation
  • WZD regulations for patient data handling
  • Healthcare-specific security controls and encryption standards
  • Audit trails and access logging for medical records
  • Patient consent management systems
  • Infrastructure-as-Code for compliant healthcare deployments
  • Integration patterns for legacy hospital systems (HL7, FHIR)

Understanding Dutch Healthcare Regulations

NEN7510: Healthcare Information Security Standard

NEN7510 is the Dutch healthcare sector’s adaptation of ISO 27002, specifically tailored for healthcare organizations.

Core Requirements:

  1. Risk Assessment: Comprehensive information security risk analysis
  2. Access Control: Role-based access with least privilege principle
  3. Audit Logging: Detailed logging of all patient data access
  4. Encryption: Data encryption at rest and in transit
  5. Business Continuity: Healthcare-specific DR and backup requirements
  6. Incident Management: Security incident response procedures
  7. Third-Party Management: Due diligence on cloud service providers

NEN7510 Security Domains:

  • Organizational Security: Policies, procedures, and governance
  • Physical Security: Data center and facility security
  • Technical Security: Encryption, access control, monitoring
  • Operational Security: Change management, incident response

WZD (Wet Zorg en Dwang)

The Care and Coercion Act regulates the use of coercion and force in healthcare:

Key Technical Requirements:

  • Incident Registration: All coercion incidents must be logged
  • Data Retention: Minimum 15-year retention for coercion records
  • Access Controls: Restricted access to sensitive mental health data
  • Audit Trail: Complete audit trail of data access and modifications
  • Patient Rights: Support for patient data access requests

GDPR in Healthcare Context

Healthcare data is “special category data” under GDPR:

Additional Requirements:

  • Explicit Consent: Patient consent for data processing
  • Purpose Limitation: Data only used for specified medical purposes
  • Data Minimization: Collect only necessary medical data
  • Right to Erasure: Complex in healthcare due to retention requirements
  • Data Portability: Patients can request their medical records
  • Breach Notification: 72-hour breach notification to DPA

HIPAA Alignment

While not required in Netherlands, HIPAA alignment provides additional security:

HIPAA Principles Applied:

  • Administrative Safeguards: Security management processes
  • Physical Safeguards: Facility access controls, workstation security
  • Technical Safeguards: Access control, audit controls, integrity controls
  • Privacy Rule: Patient privacy protections
  • Security Rule: Electronic PHI (ePHI) protection

NEN7510-Compliant AWS Architecture

1. Healthcare Data Encryption Architecture

// lib/healthcare-encryption-stack.ts
import * as cdk from 'aws-cdk-lib';
import * as kms from 'aws-cdk-lib/aws-kms';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as rds from 'aws-cdk-lib/aws-rds';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as secretsmanager from 'aws-cdk-lib/aws-secretsmanager';
import { Construct } from 'constructs';

export class HealthcareEncryptionStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // KMS key for PHI encryption (NEN7510 requirement)
    const phiEncryptionKey = new kms.Key(this, 'PHIEncryptionKey', {
      description: 'KMS key for Protected Health Information encryption',
      enableKeyRotation: true, // NEN7510 requires key rotation
      removalPolicy: cdk.RemovalPolicy.RETAIN,
      policy: new iam.PolicyDocument({
        statements: [
          new iam.PolicyStatement({
            sid: 'Enable IAM User Permissions',
            effect: iam.Effect.ALLOW,
            principals: [new iam.AccountRootPrincipal()],
            actions: ['kms:*'],
            resources: ['*'],
          }),
          new iam.PolicyStatement({
            sid: 'Allow healthcare services',
            effect: iam.Effect.ALLOW,
            principals: [
              new iam.ServicePrincipal('s3.amazonaws.com'),
              new iam.ServicePrincipal('rds.amazonaws.com'),
              new iam.ServicePrincipal('lambda.amazonaws.com'),
            ],
            actions: [
              'kms:Decrypt',
              'kms:DescribeKey',
              'kms:CreateGrant',
            ],
            resources: ['*'],
            conditions: {
              StringEquals: {
                'kms:ViaService': [
                  `s3.${this.region}.amazonaws.com`,
                  `rds.${this.region}.amazonaws.com`,
                ],
              },
            },
          }),
        ],
      }),
    });

    // S3 bucket for medical imaging (DICOM files)
    const medicalImagingBucket = new s3.Bucket(this, 'MedicalImagingBucket', {
      bucketName: `medical-imaging-${this.account}-${this.region}`,
      encryption: s3.BucketEncryption.KMS,
      encryptionKey: phiEncryptionKey,
      versioned: true, // NEN7510 requires version control
      blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
      enforceSSL: true,
      objectOwnership: s3.ObjectOwnership.BUCKET_OWNER_ENFORCED,
      lifecycleRules: [
        {
          id: 'TransitionOldScans',
          transitions: [
            {
              storageClass: s3.StorageClass.INTELLIGENT_TIERING,
              transitionAfter: cdk.Duration.days(90),
            },
            {
              storageClass: s3.StorageClass.GLACIER,
              transitionAfter: cdk.Duration.days(365),
            },
          ],
          // WZD requires 15-year retention for mental health records
          expiration: cdk.Duration.days(5475), // 15 years
        },
      ],
      serverAccessLogsPrefix: 'access-logs/',
      removalPolicy: cdk.RemovalPolicy.RETAIN,
    });

    // Enable S3 Object Lock for immutable medical records
    const cfnBucket = medicalImagingBucket.node.defaultChild as s3.CfnBucket;
    cfnBucket.objectLockEnabled = true;
    cfnBucket.objectLockConfiguration = {
      objectLockEnabled: 'Enabled',
      rule: {
        defaultRetention: {
          mode: 'GOVERNANCE',
          years: 15, // WZD requirement
        },
      },
    };

    // VPC for healthcare workloads
    const vpc = new ec2.Vpc(this, 'HealthcareVPC', {
      maxAzs: 3,
      natGateways: 2,
      subnetConfiguration: [
        {
          name: 'Public',
          subnetType: ec2.SubnetType.PUBLIC,
          cidrMask: 24,
        },
        {
          name: 'Private',
          subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS,
          cidrMask: 24,
        },
        {
          name: 'Isolated',
          subnetType: ec2.SubnetType.PRIVATE_ISOLATED,
          cidrMask: 24,
        },
      ],
      flowLogs: {
        's3': {
          destination: ec2.FlowLogDestination.toS3(medicalImagingBucket, 'vpc-flow-logs/'),
          trafficType: ec2.FlowLogTrafficType.ALL,
        },
      },
    });

    // Database credentials in Secrets Manager
    const dbCredentials = new secretsmanager.Secret(this, 'DBCredentials', {
      secretName: 'healthcare-db-credentials',
      description: 'RDS credentials for patient database',
      generateSecretString: {
        secretStringTemplate: JSON.stringify({ username: 'admin' }),
        generateStringKey: 'password',
        excludePunctuation: true,
        passwordLength: 32,
        requireEachIncludedType: true,
      },
      encryptionKey: phiEncryptionKey,
    });

    // RDS for patient records with encryption
    const patientDatabase = new rds.DatabaseInstance(this, 'PatientDatabase', {
      engine: rds.DatabaseInstanceEngine.postgres({
        version: rds.PostgresEngineVersion.VER_15_4,
      }),
      instanceType: ec2.InstanceType.of(
        ec2.InstanceClass.R6G,
        ec2.InstanceSize.XLARGE
      ),
      credentials: rds.Credentials.fromSecret(dbCredentials),
      vpc,
      vpcSubnets: {
        subnetType: ec2.SubnetType.PRIVATE_ISOLATED, // No internet access
      },
      multiAz: true, // High availability for healthcare
      storageEncrypted: true,
      storageEncryptionKey: phiEncryptionKey,
      // NEN7510 requires backup
      backupRetention: cdk.Duration.days(35),
      preferredBackupWindow: '03:00-04:00',
      deletionProtection: true,
      cloudwatchLogsExports: ['postgresql', 'upgrade'],
      // Enable Performance Insights with encryption
      enablePerformanceInsights: true,
      performanceInsightEncryptionKey: phiEncryptionKey,
      performanceInsightRetention: rds.PerformanceInsightRetention.MONTHS_6,
      // Audit logging
      parameterGroup: new rds.ParameterGroup(this, 'PatientDBParams', {
        engine: rds.DatabaseInstanceEngine.postgres({
          version: rds.PostgresEngineVersion.VER_15_4,
        }),
        parameters: {
          'log_connections': '1',
          'log_disconnections': '1',
          'log_duration': '1',
          'log_statement': 'all', // Log all SQL statements for audit
          'rds.force_ssl': '1', // Enforce SSL connections
        },
      }),
      removalPolicy: cdk.RemovalPolicy.RETAIN,
    });

    // Security group for database - very restrictive
    patientDatabase.connections.allowFrom(
      ec2.Peer.ipv4(vpc.vpcCidrBlock),
      ec2.Port.tcp(5432),
      'Allow PostgreSQL from VPC only'
    );

    // CloudWatch alarms for security monitoring
    const unauthorizedAccessAlarm = new cdk.aws_cloudwatch.Alarm(
      this,
      'UnauthorizedAccessAlarm',
      {
        alarmName: 'healthcare-unauthorized-access',
        metric: new cdk.aws_cloudwatch.Metric({
          namespace: 'AWS/RDS',
          metricName: 'FailedSQLServerAgentJobsCount',
          dimensionsMap: {
            DBInstanceIdentifier: patientDatabase.instanceIdentifier,
          },
        }),
        threshold: 5,
        evaluationPeriods: 1,
        datapointsToAlarm: 1,
      }
    );

    // Secrets rotation for NEN7510 compliance
    new secretsmanager.SecretRotation(this, 'SecretRotation', {
      application: secretsmanager.SecretRotationApplication.POSTGRES_ROTATION_SINGLE_USER,
      secret: dbCredentials,
      target: patientDatabase,
      vpc,
      automaticallyAfter: cdk.Duration.days(90), // Rotate every 90 days
    });

    // Outputs
    new cdk.CfnOutput(this, 'PHIEncryptionKeyArn', {
      value: phiEncryptionKey.keyArn,
      description: 'KMS key for PHI encryption',
    });

    new cdk.CfnOutput(this, 'MedicalImagingBucketName', {
      value: medicalImagingBucket.bucketName,
      description: 'S3 bucket for medical imaging (DICOM)',
    });

    new cdk.CfnOutput(this, 'PatientDatabaseEndpoint', {
      value: patientDatabase.dbInstanceEndpointAddress,
      description: 'Patient database endpoint (private)',
    });
  }
}

2. Audit Logging and Access Control

# scripts/healthcare_audit.py
import boto3
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import hashlib

class HealthcareAuditLogger:
    """NEN7510-compliant audit logging for healthcare applications"""

    def __init__(self, region: str = 'eu-central-1'):
        self.cloudtrail = boto3.client('cloudtrail', region_name=region)
        self.cloudwatch = boto3.client('logs', region_name=region)
        self.dynamodb = boto3.resource('dynamodb', region_name=region)
        self.region = region

        # Audit log table (15-year retention for WZD)
        self.audit_table = self.dynamodb.Table('HealthcareAuditLog')

    def log_patient_access(
        self,
        user_id: str,
        patient_id: str,
        access_type: str,
        resource: str,
        purpose: str,
        ip_address: str,
        user_agent: str
    ) -> str:
        """
        Log patient data access (NEN7510 requirement)

        Args:
            user_id: Healthcare professional ID
            patient_id: Patient identifier (hashed)
            access_type: READ, WRITE, DELETE, EXPORT
            resource: Resource accessed (record type)
            purpose: Medical purpose for access
            ip_address: Source IP
            user_agent: User agent string

        Returns:
            Audit log ID
        """
        timestamp = datetime.utcnow()
        audit_id = hashlib.sha256(
            f"{user_id}{patient_id}{timestamp.isoformat()}".encode()
        ).hexdigest()

        # Hash patient ID for privacy (NEN7510 pseudonymization)
        patient_hash = hashlib.sha256(patient_id.encode()).hexdigest()

        audit_entry = {
            'auditId': audit_id,
            'timestamp': timestamp.isoformat(),
            'userId': user_id,
            'patientHash': patient_hash,  # Pseudonymized
            'accessType': access_type,
            'resource': resource,
            'purpose': purpose,
            'ipAddress': ip_address,
            'userAgent': user_agent,
            'ttl': int((timestamp + timedelta(days=5475)).timestamp())  # 15 years
        }

        # Store in DynamoDB with TTL
        self.audit_table.put_item(Item=audit_entry)

        # Also log to CloudWatch for real-time monitoring
        self._log_to_cloudwatch(audit_entry)

        return audit_id

    def log_consent_change(
        self,
        patient_id: str,
        consent_type: str,
        consent_given: bool,
        granted_by: str,
        effective_date: datetime
    ) -> str:
        """
        Log patient consent changes (GDPR requirement)

        Args:
            patient_id: Patient identifier
            consent_type: Type of consent (treatment, research, data sharing)
            consent_given: True if consent granted, False if withdrawn
            granted_by: Who granted/withdrew consent
            effective_date: When consent becomes effective

        Returns:
            Consent log ID
        """
        timestamp = datetime.utcnow()
        consent_id = hashlib.sha256(
            f"CONSENT_{patient_id}{timestamp.isoformat()}".encode()
        ).hexdigest()

        patient_hash = hashlib.sha256(patient_id.encode()).hexdigest()

        consent_entry = {
            'consentId': consent_id,
            'timestamp': timestamp.isoformat(),
            'patientHash': patient_hash,
            'consentType': consent_type,
            'consentGiven': consent_given,
            'grantedBy': granted_by,
            'effectiveDate': effective_date.isoformat(),
            'ttl': int((timestamp + timedelta(days=5475)).timestamp())
        }

        self.audit_table.put_item(Item=consent_entry)

        return consent_id

    def query_patient_access_history(
        self,
        patient_id: str,
        start_date: datetime,
        end_date: datetime
    ) -> List[Dict]:
        """
        Query patient access history (for patient rights requests)

        Args:
            patient_id: Patient identifier
            start_date: Start of query period
            end_date: End of query period

        Returns:
            List of access events
        """
        patient_hash = hashlib.sha256(patient_id.encode()).hexdigest()

        # Query DynamoDB
        response = self.audit_table.query(
            IndexName='PatientHashIndex',
            KeyConditionExpression='patientHash = :ph AND timestamp BETWEEN :start AND :end',
            ExpressionAttributeValues={
                ':ph': patient_hash,
                ':start': start_date.isoformat(),
                ':end': end_date.isoformat()
            }
        )

        return response.get('Items', [])

    def generate_nen7510_report(
        self,
        start_date: datetime,
        end_date: datetime,
        output_file: str = 'nen7510_audit_report.json'
    ) -> Dict:
        """
        Generate NEN7510 compliance audit report

        Returns comprehensive audit report including:
        - Access patterns
        - Consent management
        - Security incidents
        - Encryption status
        """
        report = {
            'report_metadata': {
                'standard': 'NEN7510:2017',
                'generated_at': datetime.utcnow().isoformat(),
                'period_start': start_date.isoformat(),
                'period_end': end_date.isoformat(),
                'region': self.region
            },
            'access_audit': self._audit_access_patterns(start_date, end_date),
            'consent_audit': self._audit_consent_management(start_date, end_date),
            'security_audit': self._audit_security_controls(),
            'encryption_audit': self._audit_encryption_status(),
            'breach_incidents': self._check_security_breaches(start_date, end_date),
        }

        # Save report
        with open(output_file, 'w') as f:
            json.dump(report, f, indent=2, default=str)

        print(f"NEN7510 audit report saved to {output_file}")
        return report

    def _audit_access_patterns(
        self,
        start_date: datetime,
        end_date: datetime
    ) -> Dict:
        """Analyze access patterns for anomalies"""

        # Scan audit table for period
        response = self.audit_table.scan(
            FilterExpression='#ts BETWEEN :start AND :end',
            ExpressionAttributeNames={'#ts': 'timestamp'},
            ExpressionAttributeValues={
                ':start': start_date.isoformat(),
                ':end': end_date.isoformat()
            }
        )

        access_events = response.get('Items', [])

        # Analyze patterns
        access_by_type = {}
        access_by_user = {}
        unusual_access_times = []

        for event in access_events:
            # Count by access type
            access_type = event.get('accessType', 'UNKNOWN')
            access_by_type[access_type] = access_by_type.get(access_type, 0) + 1

            # Count by user
            user_id = event.get('userId', 'UNKNOWN')
            access_by_user[user_id] = access_by_user.get(user_id, 0) + 1

            # Check for unusual access times (outside business hours)
            event_time = datetime.fromisoformat(event['timestamp'])
            if event_time.hour < 7 or event_time.hour > 19:
                unusual_access_times.append({
                    'timestamp': event['timestamp'],
                    'user': user_id,
                    'resource': event.get('resource')
                })

        return {
            'total_access_events': len(access_events),
            'access_by_type': access_by_type,
            'top_users': sorted(
                access_by_user.items(),
                key=lambda x: x[1],
                reverse=True
            )[:10],
            'unusual_access_times': unusual_access_times[:50]
        }

    def _audit_consent_management(
        self,
        start_date: datetime,
        end_date: datetime
    ) -> Dict:
        """Audit consent management compliance"""

        response = self.audit_table.scan(
            FilterExpression='begins_with(consentId, :prefix) AND #ts BETWEEN :start AND :end',
            ExpressionAttributeNames={'#ts': 'timestamp'},
            ExpressionAttributeValues={
                ':prefix': 'CONSENT_',
                ':start': start_date.isoformat(),
                ':end': end_date.isoformat()
            }
        )

        consent_events = response.get('Items', [])

        consents_granted = sum(1 for e in consent_events if e.get('consentGiven'))
        consents_withdrawn = sum(1 for e in consent_events if not e.get('consentGiven'))

        return {
            'total_consent_events': len(consent_events),
            'consents_granted': consents_granted,
            'consents_withdrawn': consents_withdrawn,
            'consent_types': list(set(e.get('consentType') for e in consent_events))
        }

    def _audit_security_controls(self) -> Dict:
        """Verify security controls are in place"""

        kms = boto3.client('kms', region_name=self.region)
        config = boto3.client('config', region_name=self.region)

        # Check KMS key rotation
        keys = kms.list_keys()
        key_rotation_status = []

        for key in keys.get('Keys', [])[:10]:  # Check first 10 keys
            key_id = key['KeyId']
            try:
                rotation = kms.get_key_rotation_status(KeyId=key_id)
                key_rotation_status.append({
                    'key_id': key_id,
                    'rotation_enabled': rotation.get('KeyRotationEnabled', False)
                })
            except:
                pass

        # Check AWS Config compliance
        compliance = config.describe_compliance_by_config_rule()

        compliant_rules = sum(
            1 for r in compliance.get('ComplianceByConfigRules', [])
            if r.get('Compliance', {}).get('ComplianceType') == 'COMPLIANT'
        )

        return {
            'kms_key_rotation': key_rotation_status,
            'config_compliant_rules': compliant_rules,
            'total_config_rules': len(compliance.get('ComplianceByConfigRules', []))
        }

    def _audit_encryption_status(self) -> Dict:
        """Verify encryption of all healthcare data"""

        s3 = boto3.client('s3', region_name=self.region)
        rds = boto3.client('rds', region_name=self.region)

        encryption_status = {
            's3_buckets': [],
            'rds_instances': []
        }

        # Check S3 encryption
        buckets = s3.list_buckets()
        for bucket in buckets.get('Buckets', []):
            bucket_name = bucket['Name']
            if 'healthcare' in bucket_name or 'medical' in bucket_name or 'patient' in bucket_name:
                try:
                    encryption = s3.get_bucket_encryption(Bucket=bucket_name)
                    encryption_status['s3_buckets'].append({
                        'bucket': bucket_name,
                        'encrypted': True,
                        'sse_algorithm': encryption['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm']
                    })
                except:
                    encryption_status['s3_buckets'].append({
                        'bucket': bucket_name,
                        'encrypted': False
                    })

        # Check RDS encryption
        db_instances = rds.describe_db_instances()
        for db in db_instances.get('DBInstances', []):
            if 'patient' in db['DBInstanceIdentifier'] or 'healthcare' in db['DBInstanceIdentifier']:
                encryption_status['rds_instances'].append({
                    'db_instance': db['DBInstanceIdentifier'],
                    'storage_encrypted': db['StorageEncrypted'],
                    'kms_key': db.get('KmsKeyId')
                })

        return encryption_status

    def _check_security_breaches(
        self,
        start_date: datetime,
        end_date: datetime
    ) -> List[Dict]:
        """Check for potential security breaches (72-hour GDPR notification)"""

        guardduty = boto3.client('guardduty', region_name=self.region)
        breaches = []

        try:
            detectors = guardduty.list_detectors()
            for detector_id in detectors.get('DetectorIds', []):
                findings = guardduty.list_findings(
                    DetectorId=detector_id,
                    FindingCriteria={
                        'Criterion': {
                            'severity': {'Gte': 7},  # High and Critical only
                            'updatedAt': {
                                'Gte': int(start_date.timestamp() * 1000),
                                'Lte': int(end_date.timestamp() * 1000)
                            }
                        }
                    }
                )

                if findings.get('FindingIds'):
                    finding_details = guardduty.get_findings(
                        DetectorId=detector_id,
                        FindingIds=findings['FindingIds']
                    )

                    for finding in finding_details.get('Findings', []):
                        breaches.append({
                            'finding_id': finding['Id'],
                            'severity': finding['Severity'],
                            'type': finding['Type'],
                            'description': finding['Description'],
                            'created_at': finding['CreatedAt']
                        })
        except Exception as e:
            print(f"GuardDuty error: {str(e)}")

        return breaches

    def _log_to_cloudwatch(self, audit_entry: Dict) -> None:
        """Log audit entry to CloudWatch for real-time monitoring"""

        try:
            self.cloudwatch.put_log_events(
                logGroupName='/healthcare/audit-logs',
                logStreamName=datetime.utcnow().strftime('%Y-%m-%d'),
                logEvents=[
                    {
                        'timestamp': int(datetime.utcnow().timestamp() * 1000),
                        'message': json.dumps(audit_entry)
                    }
                ]
            )
        except self.cloudwatch.exceptions.ResourceNotFoundException:
            # Create log group if it doesn't exist
            self.cloudwatch.create_log_group(
                logGroupName='/healthcare/audit-logs'
            )
            self.cloudwatch.put_retention_policy(
                logGroupName='/healthcare/audit-logs',
                retentionInDays=5475  # 15 years for WZD
            )

# Example usage
if __name__ == '__main__':
    auditor = HealthcareAuditLogger(region='eu-central-1')

    # Log patient access
    audit_id = auditor.log_patient_access(
        user_id='DR_12345',
        patient_id='PATIENT_67890',
        access_type='READ',
        resource='MEDICAL_RECORD',
        purpose='Treatment consultation',
        ip_address='10.0.1.50',
        user_agent='Mozilla/5.0 (Hospital Workstation)'
    )

    print(f"Access logged: {audit_id}")

    # Log consent change
    consent_id = auditor.log_consent_change(
        patient_id='PATIENT_67890',
        consent_type='RESEARCH_PARTICIPATION',
        consent_given=True,
        granted_by='PATIENT_67890',
        effective_date=datetime.utcnow()
    )

    print(f"Consent logged: {consent_id}")

    # Generate compliance report
    report = auditor.generate_nen7510_report(
        start_date=datetime.utcnow() - timedelta(days=30),
        end_date=datetime.utcnow(),
        output_file='nen7510_compliance_report.json'
    )

    print("\n=== NEN7510 Compliance Report ===")
    print(f"Total access events: {report['access_audit']['total_access_events']}")
    print(f"Consent events: {report['consent_audit']['total_consent_events']}")
    print(f"Security breaches detected: {len(report['breach_incidents'])}")

3. HL7/FHIR Integration for Hospital Systems

// lib/healthcare-integration-stack.ts
import * as cdk from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as apigateway from 'aws-cdk-lib/aws-apigateway';
import * as healthlake from 'aws-cdk-lib/aws-healthlake';
import { Construct } from 'constructs';

export class HealthcareIntegrationStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // AWS HealthLake FHIR datastore
    const fhirDatastore = new healthlake.CfnFHIRDatastore(this, 'FHIRDatastore', {
      datastoreName: 'patient-records-fhir',
      datastoreTypeVersion: 'R4', // FHIR R4 standard
      preloadDataConfig: {
        preloadDataType: 'SYNTHEA', // Optional: Load synthetic data for testing
      },
    });

    // HL7 to FHIR conversion Lambda
    const hl7ToFhirConverter = new lambda.Function(this, 'HL7ToFHIRConverter', {
      runtime: lambda.Runtime.PYTHON_3_11,
      handler: 'index.handler',
      code: lambda.Code.fromInline(`
import json
import boto3
from datetime import datetime

def handler(event, context):
    """
    Convert HL7 messages to FHIR resources
    Supports HL7 v2.x messages (ADT, ORU, ORM)
    """
    hl7_message = event.get('body', '')

    # Parse HL7 message
    segments = hl7_message.split('\\r')
    message_type = extract_message_type(segments)

    if message_type.startswith('ADT'):  # Admission/Discharge/Transfer
        fhir_resource = convert_adt_to_fhir(segments)
    elif message_type.startswith('ORU'):  # Observation Result
        fhir_resource = convert_oru_to_fhir(segments)
    elif message_type.startswith('ORM'):  # Order Message
        fhir_resource = convert_orm_to_fhir(segments)
    else:
        return {
            'statusCode': 400,
            'body': json.dumps({'error': f'Unsupported message type: {message_type}'})
        }

    # Store in HealthLake
    healthlake = boto3.client('healthlake')
    # Note: Actual HealthLake API call would go here

    return {
        'statusCode': 200,
        'body': json.dumps({
            'message': 'HL7 message converted to FHIR',
            'fhir_resource': fhir_resource
        })
    }

def extract_message_type(segments):
    """Extract message type from MSH segment"""
    msh_segment = segments[0]
    fields = msh_segment.split('|')
    return fields[8] if len(fields) > 8 else 'UNKNOWN'

def convert_adt_to_fhir(segments):
    """Convert ADT message to FHIR Patient resource"""
    pid_segment = next((s for s in segments if s.startswith('PID')), '')
    fields = pid_segment.split('|')

    # Example FHIR Patient resource
    return {
        'resourceType': 'Patient',
        'id': fields[3] if len(fields) > 3 else 'unknown',
        'name': [{
            'family': fields[5].split('^')[0] if len(fields) > 5 else '',
            'given': [fields[5].split('^')[1]] if len(fields) > 5 else []
        }],
        'birthDate': fields[7] if len(fields) > 7 else '',
        'gender': 'male' if fields[8] == 'M' else 'female' if fields[8] == 'F' else 'unknown'
    }

def convert_oru_to_fhir(segments):
    """Convert ORU message to FHIR Observation resource"""
    obr_segment = next((s for s in segments if s.startswith('OBR')), '')
    obx_segment = next((s for s in segments if s.startswith('OBX')), '')

    return {
        'resourceType': 'Observation',
        'status': 'final',
        'code': {
            'coding': [{
                'system': 'http://loinc.org',
                'code': 'extracted_from_obr'
            }]
        },
        'valueQuantity': {
            'value': 'extracted_from_obx'
        }
    }

def convert_orm_to_fhir(segments):
    """Convert ORM message to FHIR ServiceRequest resource"""
    return {
        'resourceType': 'ServiceRequest',
        'status': 'active',
        'intent': 'order'
    }
      `),
      timeout: cdk.Duration.seconds(30),
    });

    // API Gateway for HL7 ingestion
    const hl7Api = new apigateway.RestApi(this, 'HL7IntegrationAPI', {
      restApiName: 'HL7 to FHIR Integration',
      description: 'Accepts HL7 messages and converts to FHIR',
      deployOptions: {
        stageName: 'prod',
        tracingEnabled: true,
      },
    });

    const hl7Resource = hl7Api.root.addResource('hl7');
    hl7Resource.addMethod(
      'POST',
      new apigateway.LambdaIntegration(hl7ToFhirConverter)
    );

    new cdk.CfnOutput(this, 'HL7APIEndpoint', {
      value: hl7Api.url,
      description: 'HL7 ingestion endpoint',
    });

    new cdk.CfnOutput(this, 'FHIRDatastoreId', {
      value: fhirDatastore.attrDatastoreId,
      description: 'HealthLake FHIR datastore ID',
    });
  }
}

Best Practices for Dutch Healthcare Providers

1. Data Protection

  • Encrypt all PHI/patient data at rest and in transit
  • Use KMS with automatic key rotation (NEN7510)
  • Implement pseudonymization for analytics
  • Enable S3 Object Lock for immutable records
  • 15-year retention for WZD-regulated data

2. Access Control

  • Role-based access control (RBAC) for healthcare staff
  • Multi-factor authentication for all users
  • Break-glass procedures for emergencies
  • Regular access reviews and certifications
  • Principle of least privilege

3. Audit and Compliance

  • Comprehensive audit logging of all patient data access
  • Real-time monitoring of unusual access patterns
  • 72-hour breach notification procedures (GDPR)
  • Regular NEN7510 compliance audits
  • Patient consent management system

4. Integration

  • Support for HL7 v2.x and FHIR standards
  • Secure integration with existing hospital systems
  • API gateway with healthcare-specific throttling
  • Message queue for reliable HL7 processing
  • Data validation and error handling

5. Business Continuity

  • Multi-AZ deployment for 99.99% availability
  • Cross-region backup within EU
  • Regular DR testing with healthcare scenarios
  • RTO < 1 hour for critical systems
  • Documented emergency procedures

Conclusion

Building compliant healthcare solutions on AWS in the Netherlands requires strict adherence to NEN7510, WZD, and GDPR regulations. By implementing proper encryption, comprehensive audit logging, role-based access controls, and healthcare-specific integration patterns, medical providers can leverage cloud benefits while protecting patient data and meeting regulatory obligations.

Key Takeaways:

  • Implement NEN7510-compliant encryption for all patient data
  • Maintain comprehensive 15-year audit logs (WZD requirement)
  • Use pseudonymization for patient privacy
  • Support HL7/FHIR standards for hospital integration
  • Regular compliance audits and breach detection
  • Multi-factor authentication and RBAC for all access

Ready to modernize your healthcare infrastructure? Forrict specializes in NEN7510-compliant AWS architectures for Dutch healthcare providers, hospitals, and medical device manufacturers.

Resources

F

Forrict Team

AWS expert and consultant at Forrict, specializing in cloud architecture and AWS best practices for Dutch businesses.

Tags

AWS Healthcare NEN7510 WZD Compliance Netherlands Security HIPAA Medical Data Privacy

Related Articles

Ready to Transform Your AWS Infrastructure?

Let's discuss how we can help optimize your cloud journey