AWS for Financial Sector in the Netherlands: DNB Compliance & Secure Banking Architecture | Forrict Skip to main content
Financial Services Compliance Security

AWS for Financial Sector in the Netherlands: DNB Compliance & Secure Banking Architecture

Forrict Team
AWS for Financial Sector in the Netherlands: DNB Compliance & Secure Banking Architecture
Complete guide to building DNB-compliant AWS architectures for Dutch financial institutions including PSD2, data residency, and regulatory requirements

AWS for Financial Sector in the Netherlands: DNB Compliance & Secure Banking Architecture

Build secure, compliant AWS infrastructures for Dutch financial institutions meeting DNB regulations, PSD2 requirements, and data sovereignty mandates

Introduction

The Dutch financial sector faces stringent regulatory requirements from De Nederlandsche Bank (DNB), the European Banking Authority (EBA), and various EU directives. Moving financial workloads to AWS requires careful architectural design to meet compliance obligations while leveraging cloud benefits like scalability, resilience, and innovation.

This comprehensive guide explores how to build AWS architectures that comply with Dutch and European financial regulations, including DNB’s cloud guidelines, PSD2 payment services directive, DORA (Digital Operational Resilience Act), and MiFID II requirements.

What You’ll Learn:

  • DNB cloud policy and regulatory requirements for Dutch banks
  • PSD2 compliance architecture for payment service providers
  • Data residency and sovereignty in EU regions (eu-central-1)
  • Financial-grade security controls and encryption
  • Audit logging and compliance reporting with CloudTrail
  • Real-world architecture patterns for banking applications
  • Infrastructure-as-Code examples for compliant deployments

Understanding Dutch Financial Regulations

De Nederlandsche Bank (DNB) Cloud Policy

DNB, the Dutch central bank and prudential supervisor, has established clear guidelines for financial institutions using cloud services:

Key Requirements:

  1. Risk Assessment: Comprehensive cloud risk assessment before adoption
  2. Data Location: Clear understanding of data storage locations
  3. Outsourcing Register: Registration of cloud providers in DNB’s outsourcing register
  4. Exit Strategy: Documented cloud exit and migration plans
  5. Audit Rights: Ability to audit cloud infrastructure and providers
  6. Business Continuity: Robust disaster recovery and business continuity plans
  7. Security Controls: Appropriate technical and organizational security measures

DNB’s Four Key Principles:

  • Controllability: Maintain control over outsourced activities
  • Integrity: Ensure data integrity and availability
    • Continuity: Guarantee business continuity
  • Compliance: Meet all regulatory obligations

European Financial Regulations

PSD2 (Payment Services Directive 2)

PSD2 revolutionized European payment services by requiring:

  • Strong Customer Authentication (SCA): Multi-factor authentication for payments
  • Open Banking APIs: Secure API access for third-party providers (TPPs)
  • Dynamic Linking: Transaction-specific security codes
  • Secure Communication: TLS 1.2+ for all API communications

DORA (Digital Operational Resilience Act)

Effective from January 2025, DORA mandates:

  • ICT Risk Management: Comprehensive IT risk management framework
  • Incident Reporting: Mandatory reporting of major ICT incidents to regulators
  • Operational Resilience Testing: Regular resilience and penetration testing
  • Third-Party Risk: Due diligence on ICT service providers (including cloud)
  • Information Sharing: Participation in cyber threat information sharing

MiFID II

For investment firms, MiFID II requires:

  • Transaction Reporting: Detailed logging of all transactions
  • Data Retention: Minimum 5-year retention of transaction data
  • Best Execution: Proof of best execution for client orders
  • Clock Synchronization: UTC timestamp accuracy requirements

Data Residency and Sovereignty

EU Data Residency Requirements:

  • Financial data must remain within EU/EEA jurisdictions
  • Transfer to non-EU countries requires adequate safeguards
  • GDPR compliance for personal financial data
  • Schrems II implications for US cloud providers

AWS EU Regions:

  • eu-central-1 (Frankfurt): Primary region for Dutch financial institutions
  • eu-west-1 (Ireland): Common DR region
  • eu-west-3 (Paris): Alternative EU region
  • eu-north-1 (Stockholm): Nordic alternative

DNB-Compliant AWS Architecture Patterns

1. Data Residency Enforcement Architecture

// lib/data-residency-stack.ts
import * as cdk from 'aws-cdk-lib';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as kms from 'aws-cdk-lib/aws-kms';
import * as config from 'aws-cdk-lib/aws-config';
import * as iam from 'aws-cdk-lib/aws-iam';
import { Construct } from 'constructs';

export class DataResidencyStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, {
      ...props,
      env: {
        account: process.env.CDK_DEFAULT_ACCOUNT,
        region: 'eu-central-1', // Enforce EU region
      },
    });

    // Organization-wide SCP to prevent resource creation outside EU
    const scpPolicy = new iam.PolicyDocument({
      statements: [
        new iam.PolicyStatement({
          sid: 'DenyNonEURegions',
          effect: iam.Effect.DENY,
          actions: ['*'],
          resources: ['*'],
          conditions: {
            StringNotEquals: {
              'aws:RequestedRegion': [
                'eu-central-1',
                'eu-west-1',
                'eu-west-3',
                'eu-north-1',
              ],
            },
          },
        }),
        new iam.PolicyStatement({
          sid: 'DenyS3CrossRegionReplication',
          effect: iam.Effect.DENY,
          actions: ['s3:PutReplicationConfiguration'],
          resources: ['*'],
          conditions: {
            StringNotLike: {
              's3:x-amz-replication-destination-region': 'eu-*',
            },
          },
        }),
      ],
    });

    // KMS key for encryption with EU-only access
    const financialDataKey = new kms.Key(this, 'FinancialDataKey', {
      description: 'KMS key for financial data encryption - EU only',
      enableKeyRotation: true,
      removalPolicy: cdk.RemovalPolicy.RETAIN,
      policy: new iam.PolicyDocument({
        statements: [
          new iam.PolicyStatement({
            sid: 'Enable IAM User Permissions',
            effect: iam.Effect.ALLOW,
            principals: [new iam.AccountRootPrincipal()],
            actions: ['kms:*'],
            resources: ['*'],
          }),
          new iam.PolicyStatement({
            sid: 'Deny non-EU access',
            effect: iam.Effect.DENY,
            principals: [new iam.AnyPrincipal()],
            actions: ['kms:Decrypt', 'kms:DescribeKey'],
            resources: ['*'],
            conditions: {
              StringNotEquals: {
                'aws:RequestedRegion': [
                  'eu-central-1',
                  'eu-west-1',
                  'eu-west-3',
                  'eu-north-1',
                ],
              },
            },
          }),
        ],
      }),
    });

    // S3 bucket for financial data with strict controls
    const financialDataBucket = new s3.Bucket(this, 'FinancialDataBucket', {
      bucketName: `financial-data-${this.account}-eu-central-1`,
      encryption: s3.BucketEncryption.KMS,
      encryptionKey: financialDataKey,
      versioned: true,
      blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
      enforceSSL: true,
      objectOwnership: s3.ObjectOwnership.BUCKET_OWNER_ENFORCED,
      lifecycleRules: [
        {
          id: 'TransitionToInfrequentAccess',
          transitions: [
            {
              storageClass: s3.StorageClass.INTELLIGENT_TIERING,
              transitionAfter: cdk.Duration.days(90),
            },
          ],
        },
        {
          id: 'RetainFor7Years',
          // MiFID II requires 5-year retention, we use 7 for safety margin
          expiration: cdk.Duration.days(2555),
        },
      ],
      removalPolicy: cdk.RemovalPolicy.RETAIN,
    });

    // AWS Config rule to monitor data residency
    const s3LocationRule = new config.ManagedRule(this, 'S3BucketLocationRule', {
      configRuleName: 's3-bucket-in-eu-regions',
      identifier: config.ManagedRuleIdentifiers.S3_BUCKET_REPLICATION_ENABLED,
      description: 'Ensures S3 buckets are in EU regions only',
    });

    // Custom Config rule for RDS location
    const rdsLocationRule = new config.CustomRule(this, 'RDSLocationRule', {
      configRuleName: 'rds-instance-in-eu-regions',
      lambdaFunction: this.createLocationCheckLambda(),
      periodic: true,
      maximumExecutionFrequency: config.MaximumExecutionFrequency.TWELVE_HOURS,
    });

    // Bucket policy to prevent cross-region access
    financialDataBucket.addToResourcePolicy(
      new iam.PolicyStatement({
        sid: 'DenyNonEUAccess',
        effect: iam.Effect.DENY,
        principals: [new iam.AnyPrincipal()],
        actions: ['s3:GetObject', 's3:PutObject'],
        resources: [financialDataBucket.arnForObjects('*')],
        conditions: {
          StringNotEquals: {
            'aws:RequestedRegion': ['eu-central-1', 'eu-west-1'],
          },
        },
      })
    );

    // CloudFormation stack policy to prevent region changes
    const stackPolicy = new cdk.CfnStackPolicy(this, {
      statements: [
        new iam.PolicyStatement({
          effect: iam.Effect.DENY,
          principals: [new iam.AnyPrincipal()],
          actions: ['Update:*'],
          resources: ['*'],
          conditions: {
            StringNotEquals: {
              'aws:RequestedRegion': ['eu-central-1'],
            },
          },
        }),
      ],
    });

    // Outputs
    new cdk.CfnOutput(this, 'FinancialDataBucketName', {
      value: financialDataBucket.bucketName,
      description: 'S3 bucket for financial data (EU-only)',
    });

    new cdk.CfnOutput(this, 'FinancialDataKMSKeyId', {
      value: financialDataKey.keyId,
      description: 'KMS key for financial data encryption',
    });
  }

  private createLocationCheckLambda(): cdk.aws_lambda.Function {
    return new cdk.aws_lambda.Function(this, 'LocationCheckFunction', {
      runtime: cdk.aws_lambda.Runtime.PYTHON_3_11,
      handler: 'index.handler',
      code: cdk.aws_lambda.Code.fromInline(`
import boto3
import json

def handler(event, context):
    """Check if RDS instances are in allowed EU regions"""
    rds = boto3.client('rds')
    config_client = boto3.client('config')

    allowed_regions = ['eu-central-1', 'eu-west-1', 'eu-west-3', 'eu-north-1']
    current_region = context.invoked_function_arn.split(':')[3]

    compliance = 'COMPLIANT' if current_region in allowed_regions else 'NON_COMPLIANT'

    evaluation = {
        'ComplianceResourceType': 'AWS::RDS::DBInstance',
        'ComplianceResourceId': event.get('configRuleArn', 'unknown'),
        'ComplianceType': compliance,
        'OrderingTimestamp': event['notificationCreationTime']
    }

    config_client.put_evaluations(
        Evaluations=[evaluation],
        ResultToken=event['resultToken']
    )

    return {'compliance': compliance}
      `),
      timeout: cdk.Duration.seconds(30),
    });
  }
}

2. PSD2-Compliant API Gateway Architecture

// lib/psd2-api-stack.ts
import * as cdk from 'aws-cdk-lib';
import * as apigateway from 'aws-cdk-lib/aws-apigateway';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as cognito from 'aws-cdk-lib/aws-cognito';
import * as waf from 'aws-cdk-lib/aws-wafv2';
import * as acm from 'aws-cdk-lib/aws-certificatemanager';
import * as logs from 'aws-cdk-lib/aws-logs';
import { Construct } from 'constructs';

export class PSD2APIStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // Cognito User Pool for Strong Customer Authentication (SCA)
    const userPool = new cognito.UserPool(this, 'PSD2UserPool', {
      userPoolName: 'psd2-customer-authentication',
      // PSD2 requires MFA
      mfa: cognito.Mfa.REQUIRED,
      mfaSecondFactor: {
        sms: true,
        otp: true, // Time-based OTP for authenticator apps
      },
      // Password policy aligned with PSD2 security requirements
      passwordPolicy: {
        minLength: 12,
        requireLowercase: true,
        requireUppercase: true,
        requireDigits: true,
        requireSymbols: true,
        tempPasswordValidity: cdk.Duration.days(1),
      },
      // Account recovery
      accountRecovery: cognito.AccountRecovery.EMAIL_AND_PHONE_WITHOUT_MFA,
      // Advanced security features
      advancedSecurityMode: cognito.AdvancedSecurityMode.ENFORCED,
      // Device tracking for suspicious activity
      deviceTracking: {
        challengeRequiredOnNewDevice: true,
        deviceOnlyRememberedOnUserPrompt: true,
      },
      // User attributes
      standardAttributes: {
        email: {
          required: true,
          mutable: false,
        },
        phoneNumber: {
          required: true,
          mutable: true,
        },
      },
      customAttributes: {
        'customerId': new cognito.StringAttribute({
          minLen: 1,
          maxLen: 50,
          mutable: false,
        }),
        'accountType': new cognito.StringAttribute({
          minLen: 1,
          maxLen: 20,
          mutable: true,
        }),
      },
    });

    // App client for web applications
    const webAppClient = userPool.addClient('WebAppClient', {
      authFlows: {
        userPassword: true,
        userSrp: true,
        custom: true,
      },
      oAuth: {
        flows: {
          authorizationCodeGrant: true,
        },
        scopes: [
          cognito.OAuthScope.OPENID,
          cognito.OAuthScope.PROFILE,
          cognito.OAuthScope.EMAIL,
        ],
      },
      // PSD2 requires session timeout
      accessTokenValidity: cdk.Duration.minutes(60),
      idTokenValidity: cdk.Duration.minutes(60),
      refreshTokenValidity: cdk.Duration.days(1),
    });

    // Lambda authorizer for API Gateway with dynamic linking
    const authorizerFunction = new lambda.Function(this, 'PSD2Authorizer', {
      runtime: lambda.Runtime.PYTHON_3_11,
      handler: 'index.handler',
      code: lambda.Code.fromInline(`
import json
import boto3
import hashlib
import hmac
from datetime import datetime

def handler(event, context):
    """
    PSD2-compliant authorizer with dynamic linking
    Validates transaction-specific authentication codes
    """
    token = event['authorizationToken']
    method_arn = event['methodArn']

    # Extract transaction details from headers
    headers = event.get('headers', {})
    transaction_amount = headers.get('X-Transaction-Amount')
    transaction_currency = headers.get('X-Transaction-Currency')
    payee_account = headers.get('X-Payee-Account')

    # Validate dynamic linking (transaction-specific code)
    dynamic_code = headers.get('X-Dynamic-Auth-Code')

    # Verify the code is transaction-specific
    expected_code = generate_dynamic_code(
        transaction_amount,
        transaction_currency,
        payee_account,
        token
    )

    if dynamic_code != expected_code:
        raise Exception('Unauthorized: Invalid dynamic authentication code')

    # Verify token with Cognito
    cognito = boto3.client('cognito-idp')
    try:
        response = cognito.get_user(AccessToken=token)
        user_id = response['Username']

        # Generate IAM policy
        policy = generate_policy(user_id, 'Allow', method_arn)

        # Add user context
        policy['context'] = {
            'userId': user_id,
            'transactionAmount': transaction_amount,
            'transactionCurrency': transaction_currency,
            'timestamp': datetime.utcnow().isoformat()
        }

        return policy

    except Exception as e:
        print(f"Authorization failed: {str(e)}")
        raise Exception('Unauthorized')

def generate_dynamic_code(amount, currency, payee, token):
    """Generate transaction-specific authentication code"""
    message = f"{amount}:{currency}:{payee}:{token}"
    # Use HMAC-SHA256 for secure code generation
    code = hmac.new(
        'secret-key'.encode(),
        message.encode(),
        hashlib.sha256
    ).hexdigest()[:8]
    return code

def generate_policy(principal_id, effect, resource):
    """Generate IAM policy for API Gateway"""
    return {
        'principalId': principal_id,
        'policyDocument': {
            'Version': '2012-10-17',
            'Statement': [{
                'Action': 'execute-api:Invoke',
                'Effect': effect,
                'Resource': resource
            }]
        }
    }
      `),
      timeout: cdk.Duration.seconds(10),
      environment: {
        USER_POOL_ID: userPool.userPoolId,
      },
    });

    // Payment initiation Lambda
    const paymentFunction = new lambda.Function(this, 'PaymentFunction', {
      runtime: lambda.Runtime.PYTHON_3_11,
      handler: 'index.handler',
      code: lambda.Code.fromInline(`
import json
import boto3
import uuid
from datetime import datetime

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('PaymentTransactions')

def handler(event, context):
    """Process PSD2 payment initiation"""

    # Extract user context from authorizer
    user_id = event['requestContext']['authorizer']['userId']

    # Parse payment request
    body = json.loads(event['body'])

    payment_id = str(uuid.uuid4())
    timestamp = datetime.utcnow().isoformat()

    # Store transaction (MiFID II compliance)
    transaction = {
        'paymentId': payment_id,
        'userId': user_id,
        'amount': body['amount'],
        'currency': body['currency'],
        'payeeAccount': body['payeeAccount'],
        'payerAccount': body['payerAccount'],
        'timestamp': timestamp,
        'status': 'INITIATED',
        'authMethod': 'SCA_COMPLIANT',
        'ipAddress': event['requestContext']['identity']['sourceIp'],
        'userAgent': event['headers'].get('User-Agent', 'unknown')
    }

    table.put_item(Item=transaction)

    return {
        'statusCode': 200,
        'headers': {
            'Content-Type': 'application/json',
            'X-Payment-Id': payment_id,
        },
        'body': json.dumps({
            'paymentId': payment_id,
            'status': 'INITIATED',
            'timestamp': timestamp
        })
    }
      `),
      timeout: cdk.Duration.seconds(30),
    });

    // CloudWatch Logs with 7-year retention (MiFID II)
    const apiLogs = new logs.LogGroup(this, 'PSD2APILogs', {
      logGroupName: '/aws/apigateway/psd2-api',
      retention: logs.RetentionDays.TEN_YEARS, // 3650 days
      removalPolicy: cdk.RemovalPolicy.RETAIN,
    });

    // API Gateway with PSD2 requirements
    const api = new apigateway.RestApi(this, 'PSD2API', {
      restApiName: 'PSD2 Payment Services API',
      description: 'PSD2-compliant payment initiation API',
      deployOptions: {
        stageName: 'prod',
        tracingEnabled: true, // X-Ray tracing for audit
        accessLogDestination: new apigateway.LogGroupLogDestination(apiLogs),
        accessLogFormat: apigateway.AccessLogFormat.jsonWithStandardFields({
          caller: true,
          httpMethod: true,
          ip: true,
          protocol: true,
          requestTime: true,
          resourcePath: true,
          responseLength: true,
          status: true,
          user: true,
        }),
        throttlingRateLimit: 1000,
        throttlingBurstLimit: 2000,
      },
      // Enforce TLS 1.2 (PSD2 requirement)
      policy: new iam.PolicyDocument({
        statements: [
          new iam.PolicyStatement({
            effect: iam.Effect.DENY,
            principals: [new iam.AnyPrincipal()],
            actions: ['execute-api:Invoke'],
            resources: ['execute-api:/*'],
            conditions: {
              StringNotEquals: {
                'aws:SecureTransport': 'true',
              },
            },
          }),
          new iam.PolicyStatement({
            effect: iam.Effect.DENY,
            principals: [new iam.AnyPrincipal()],
            actions: ['execute-api:Invoke'],
            resources: ['execute-api:/*'],
            conditions: {
              NumericLessThan: {
                's3:TlsVersion': 1.2,
              },
            },
          }),
        ],
      }),
    });

    // Request validator for input validation
    const requestValidator = new apigateway.RequestValidator(
      this,
      'RequestValidator',
      {
        restApi: api,
        requestValidatorName: 'payment-request-validator',
        validateRequestBody: true,
        validateRequestParameters: true,
      }
    );

    // Payment initiation endpoint
    const payments = api.root.addResource('payments');
    const paymentMethod = payments.addMethod(
      'POST',
      new apigateway.LambdaIntegration(paymentFunction),
      {
        authorizer: new apigateway.TokenAuthorizer(this, 'PSD2TokenAuthorizer', {
          handler: authorizerFunction,
          identitySource: 'method.request.header.Authorization',
        }),
        requestValidator,
        requestModels: {
          'application/json': new apigateway.Model(this, 'PaymentModel', {
            restApi: api,
            contentType: 'application/json',
            modelName: 'PaymentRequest',
            schema: {
              type: apigateway.JsonSchemaType.OBJECT,
              required: ['amount', 'currency', 'payeeAccount', 'payerAccount'],
              properties: {
                amount: {
                  type: apigateway.JsonSchemaType.NUMBER,
                  minimum: 0.01,
                },
                currency: {
                  type: apigateway.JsonSchemaType.STRING,
                  enum: ['EUR', 'USD', 'GBP'],
                },
                payeeAccount: {
                  type: apigateway.JsonSchemaType.STRING,
                  pattern: '^[A-Z]{2}[0-9]{2}[A-Z0-9]+$', // IBAN format
                },
                payerAccount: {
                  type: apigateway.JsonSchemaType.STRING,
                  pattern: '^[A-Z]{2}[0-9]{2}[A-Z0-9]+$',
                },
                description: {
                  type: apigateway.JsonSchemaType.STRING,
                  maxLength: 140,
                },
              },
            },
          }),
        },
      }
    );

    // WAF for API protection
    const webAcl = new waf.CfnWebACL(this, 'PSD2APIWAF', {
      scope: 'REGIONAL',
      defaultAction: { allow: {} },
      rules: [
        {
          name: 'RateLimitRule',
          priority: 1,
          statement: {
            rateBasedStatement: {
              limit: 2000,
              aggregateKeyType: 'IP',
            },
          },
          action: { block: {} },
          visibilityConfig: {
            sampledRequestsEnabled: true,
            cloudWatchMetricsEnabled: true,
            metricName: 'RateLimitRule',
          },
        },
        {
          name: 'GeoBlockingRule',
          priority: 2,
          statement: {
            geoMatchStatement: {
              countryCodes: ['EU'], // Only allow EU traffic
            },
          },
          action: { allow: {} },
          visibilityConfig: {
            sampledRequestsEnabled: true,
            cloudWatchMetricsEnabled: true,
            metricName: 'GeoBlocking',
          },
        },
      ],
      visibilityConfig: {
        sampledRequestsEnabled: true,
        cloudWatchMetricsEnabled: true,
        metricName: 'PSD2APIWAF',
      },
    });

    // Associate WAF with API Gateway
    new waf.CfnWebACLAssociation(this, 'WAFAssociation', {
      resourceArn: api.deploymentStage.stageArn,
      webAclArn: webAcl.attrArn,
    });

    // Outputs
    new cdk.CfnOutput(this, 'APIEndpoint', {
      value: api.url,
      description: 'PSD2 API endpoint (TLS 1.2+ enforced)',
    });

    new cdk.CfnOutput(this, 'UserPoolId', {
      value: userPool.userPoolId,
      description: 'Cognito User Pool for SCA',
    });
  }
}

3. Audit Logging and Compliance Architecture

# scripts/dnb_compliance_audit.py
import boto3
import json
from datetime import datetime, timedelta
from typing import Dict, List
import csv

class DNBComplianceAuditor:
    """DNB compliance audit and reporting for financial institutions"""

    def __init__(self, region: str = 'eu-central-1'):
        self.cloudtrail = boto3.client('cloudtrail', region_name=region)
        self.config = boto3.client('config', region_name=region)
        self.s3 = boto3.client('s3', region_name=region)
        self.guardduty = boto3.client('guardduty', region_name=region)
        self.securityhub = boto3.client('securityhub', region_name=region)
        self.region = region

    def generate_dnb_audit_report(
        self,
        start_date: datetime,
        end_date: datetime,
        output_file: str = 'dnb_audit_report.json'
    ) -> Dict:
        """
        Generate comprehensive DNB audit report

        Covers:
        - Data access patterns
        - Configuration changes
        - Security findings
        - Compliance status
        """
        print(f"Generating DNB audit report from {start_date} to {end_date}")

        report = {
            'report_metadata': {
                'generated_at': datetime.utcnow().isoformat(),
                'period_start': start_date.isoformat(),
                'period_end': end_date.isoformat(),
                'region': self.region,
                'auditor': 'DNB Compliance System'
            },
            'data_access': self.audit_data_access(start_date, end_date),
            'configuration_changes': self.audit_config_changes(start_date, end_date),
            'security_findings': self.audit_security_findings(),
            'compliance_status': self.check_compliance_status(),
            'encryption_status': self.audit_encryption_status(),
            'data_residency': self.verify_data_residency(),
        }

        # Save report
        with open(output_file, 'w') as f:
            json.dump(report, f, indent=2, default=str)

        print(f"Audit report saved to {output_file}")
        return report

    def audit_data_access(
        self,
        start_date: datetime,
        end_date: datetime
    ) -> Dict:
        """Audit all data access events"""

        # Query CloudTrail for S3 data access
        response = self.cloudtrail.lookup_events(
            LookupAttributes=[
                {
                    'AttributeKey': 'EventName',
                    'AttributeValue': 'GetObject'
                },
            ],
            StartTime=start_date,
            EndTime=end_date,
            MaxResults=1000
        )

        access_events = []
        for event in response.get('Events', []):
            event_data = json.loads(event['CloudTrailEvent'])

            # Extract relevant information
            access_events.append({
                'timestamp': event['EventTime'].isoformat(),
                'user': event.get('Username', 'unknown'),
                'resource': event_data.get('requestParameters', {}).get('bucketName'),
                'object_key': event_data.get('requestParameters', {}).get('key'),
                'source_ip': event_data.get('sourceIPAddress'),
                'user_agent': event_data.get('userAgent'),
                'mfa_used': event_data.get('additionalEventData', {}).get('MFAUsed', 'false')
            })

        return {
            'total_access_events': len(access_events),
            'events': access_events,
            'mfa_compliance_rate': self._calculate_mfa_rate(access_events)
        }

    def audit_config_changes(
        self,
        start_date: datetime,
        end_date: datetime
    ) -> Dict:
        """Audit infrastructure configuration changes"""

        # Get configuration timeline
        response = self.config.get_resource_config_history(
            resourceType='AWS::EC2::Instance',
            laterTime=end_date,
            earlierTime=start_date,
            limit=100
        )

        config_changes = []
        for item in response.get('configurationItems', []):
            config_changes.append({
                'timestamp': item['configurationItemCaptureTime'].isoformat(),
                'resource_type': item['resourceType'],
                'resource_id': item['resourceId'],
                'configuration_state': item['configurationItemStatus'],
                'changes': item.get('configurationItemDiff', {})
            })

        return {
            'total_changes': len(config_changes),
            'changes': config_changes
        }

    def audit_security_findings(self) -> Dict:
        """Audit security findings from GuardDuty and Security Hub"""

        findings = {
            'guardduty': [],
            'security_hub': []
        }

        # Get GuardDuty findings
        try:
            detectors = self.guardduty.list_detectors()
            for detector_id in detectors.get('DetectorIds', []):
                finding_response = self.guardduty.list_findings(
                    DetectorId=detector_id,
                    FindingCriteria={
                        'Criterion': {
                            'severity': {
                                'Gte': 4  # Medium and above
                            }
                        }
                    }
                )

                if finding_response.get('FindingIds'):
                    finding_details = self.guardduty.get_findings(
                        DetectorId=detector_id,
                        FindingIds=finding_response['FindingIds'][:50]
                    )

                    for finding in finding_details.get('Findings', []):
                        findings['guardduty'].append({
                            'id': finding['Id'],
                            'type': finding['Type'],
                            'severity': finding['Severity'],
                            'title': finding['Title'],
                            'description': finding['Description'],
                            'resource': finding.get('Resource', {})
                        })
        except Exception as e:
            print(f"GuardDuty error: {str(e)}")

        # Get Security Hub findings
        try:
            hub_findings = self.securityhub.get_findings(
                Filters={
                    'ComplianceStatus': [
                        {'Value': 'FAILED', 'Comparison': 'EQUALS'}
                    ],
                    'RecordState': [
                        {'Value': 'ACTIVE', 'Comparison': 'EQUALS'}
                    ]
                },
                MaxResults=100
            )

            for finding in hub_findings.get('Findings', []):
                findings['security_hub'].append({
                    'id': finding['Id'],
                    'title': finding['Title'],
                    'severity': finding['Severity']['Label'],
                    'compliance_status': finding.get('Compliance', {}).get('Status'),
                    'resource': finding.get('Resources', [{}])[0]
                })
        except Exception as e:
            print(f"Security Hub error: {str(e)}")

        return findings

    def check_compliance_status(self) -> Dict:
        """Check AWS Config compliance rules status"""

        compliance_summary = {
            'compliant_rules': 0,
            'non_compliant_rules': 0,
            'rules_detail': []
        }

        # Get all Config rules
        rules = self.config.describe_config_rules()

        for rule in rules.get('ConfigRules', []):
            rule_name = rule['ConfigRuleName']

            # Get compliance status
            compliance = self.config.describe_compliance_by_config_rule(
                ConfigRuleNames=[rule_name]
            )

            for compliance_item in compliance.get('ComplianceByConfigRules', []):
                status = compliance_item['Compliance']['ComplianceType']

                if status == 'COMPLIANT':
                    compliance_summary['compliant_rules'] += 1
                else:
                    compliance_summary['non_compliant_rules'] += 1

                compliance_summary['rules_detail'].append({
                    'rule_name': rule_name,
                    'compliance_status': status,
                    'description': rule.get('Description', '')
                })

        return compliance_summary

    def audit_encryption_status(self) -> Dict:
        """Verify encryption status of all data stores"""

        encryption_audit = {
            's3_buckets': [],
            'rds_instances': [],
            'ebs_volumes': []
        }

        # Check S3 bucket encryption
        buckets = self.s3.list_buckets()
        for bucket in buckets.get('Buckets', []):
            bucket_name = bucket['Name']

            try:
                encryption = self.s3.get_bucket_encryption(Bucket=bucket_name)
                encryption_audit['s3_buckets'].append({
                    'bucket': bucket_name,
                    'encrypted': True,
                    'algorithm': encryption['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm']
                })
            except self.s3.exceptions.ServerSideEncryptionConfigurationNotFoundError:
                encryption_audit['s3_buckets'].append({
                    'bucket': bucket_name,
                    'encrypted': False,
                    'algorithm': None
                })

        # Check RDS encryption
        rds = boto3.client('rds', region_name=self.region)
        db_instances = rds.describe_db_instances()

        for db in db_instances.get('DBInstances', []):
            encryption_audit['rds_instances'].append({
                'db_instance': db['DBInstanceIdentifier'],
                'encrypted': db['StorageEncrypted'],
                'kms_key': db.get('KmsKeyId') if db['StorageEncrypted'] else None
            })

        # Check EBS encryption
        ec2 = boto3.client('ec2', region_name=self.region)
        volumes = ec2.describe_volumes()

        for volume in volumes.get('Volumes', []):
            encryption_audit['ebs_volumes'].append({
                'volume_id': volume['VolumeId'],
                'encrypted': volume['Encrypted'],
                'kms_key': volume.get('KmsKeyId') if volume['Encrypted'] else None
            })

        return encryption_audit

    def verify_data_residency(self) -> Dict:
        """Verify all resources are in allowed EU regions"""

        allowed_regions = ['eu-central-1', 'eu-west-1', 'eu-west-3', 'eu-north-1']
        residency_status = {
            'compliant': True,
            'violations': [],
            'resources_by_region': {}
        }

        ec2 = boto3.client('ec2', region_name=self.region)

        # Check all regions for resources
        for region in ec2.describe_regions()['Regions']:
            region_name = region['RegionName']

            # Count resources in each region
            regional_ec2 = boto3.client('ec2', region_name=region_name)
            instances = regional_ec2.describe_instances()

            instance_count = sum(
                len(reservation['Instances'])
                for reservation in instances['Reservations']
            )

            if instance_count > 0:
                residency_status['resources_by_region'][region_name] = instance_count

                if region_name not in allowed_regions:
                    residency_status['compliant'] = False
                    residency_status['violations'].append({
                        'region': region_name,
                        'resource_count': instance_count,
                        'violation': 'Resources outside EU regions'
                    })

        return residency_status

    def _calculate_mfa_rate(self, access_events: List[Dict]) -> str:
        """Calculate MFA usage rate"""
        if not access_events:
            return "0%"

        mfa_events = sum(1 for e in access_events if e.get('mfa_used') == 'true')
        rate = (mfa_events / len(access_events)) * 100
        return f"{rate:.2f}%"

# Example usage
if __name__ == '__main__':
    auditor = DNBComplianceAuditor(region='eu-central-1')

    # Generate audit report for last 30 days
    end_date = datetime.utcnow()
    start_date = end_date - timedelta(days=30)

    report = auditor.generate_dnb_audit_report(
        start_date=start_date,
        end_date=end_date,
        output_file='dnb_compliance_report.json'
    )

    print("\n=== DNB Compliance Summary ===")
    print(f"Total data access events: {report['data_access']['total_access_events']}")
    print(f"MFA compliance rate: {report['data_access']['mfa_compliance_rate']}")
    print(f"Configuration changes: {report['configuration_changes']['total_changes']}")
    print(f"Data residency compliant: {report['data_residency']['compliant']}")

Best Practices for Dutch Financial Institutions

1. Data Governance

  • Implement data classification and tagging
  • Enforce EU-only data residency with SCPs
  • Use KMS for all encryption with EU-based keys
  • Maintain detailed data lineage documentation
  • Regular data protection impact assessments (DPIA)

2. Security Controls

  • Implement defense in depth architecture
  • Use AWS Security Hub for centralized security
  • Enable GuardDuty for threat detection
  • Enforce MFA for all privileged access
  • Regular penetration testing (DORA requirement)

3. Audit and Compliance

  • Enable CloudTrail in all regions with EU storage
  • Implement Config rules for continuous compliance
  • Maintain 7-year audit logs (MiFID II)
  • Regular compliance reporting to DNB
  • Document all third-party cloud services

4. Business Continuity

  • Multi-AZ deployment for high availability
  • Cross-region DR in EU regions only
  • Regular DR testing and documentation
  • RTO/RPO aligned with DNB expectations
  • Documented cloud exit strategy

5. Operational Excellence

  • Infrastructure-as-Code for all deployments
  • Automated compliance checking in CI/CD
  • Regular training on cloud financial regulations
  • Incident response playbooks
  • Change management aligned with DNB requirements

Conclusion

Building compliant AWS architectures for the Dutch financial sector requires deep understanding of DNB regulations, PSD2 requirements, and emerging frameworks like DORA. By implementing proper data residency controls, strong authentication mechanisms, comprehensive audit logging, and robust security controls, financial institutions can leverage AWS cloud benefits while meeting regulatory obligations.

Key Takeaways:

  • Enforce EU data residency with AWS Organizations SCPs
  • Implement PSD2-compliant authentication with Cognito MFA
  • Maintain comprehensive audit trails with CloudTrail and Config
  • Use Infrastructure-as-Code for compliance automation
  • Regular testing and documentation of DR procedures
  • Engage with DNB on cloud outsourcing notifications

Ready to build DNB-compliant AWS architectures? Forrict specializes in financial services cloud implementations, helping Dutch banks and payment providers navigate regulatory requirements while modernizing their infrastructure.

Resources

F

Forrict Team

AWS expert and consultant at Forrict, specializing in cloud architecture and AWS best practices for Dutch businesses.

Tags

AWS Financial Services DNB PSD2 Compliance Netherlands Security Banking DORA MiFID II

Related Articles

Ready to Transform Your AWS Infrastructure?

Let's discuss how we can help optimize your cloud journey