AWS for Financial Sector in the Netherlands: DNB Compliance & Secure Banking Architecture
Build secure, compliant AWS infrastructures for Dutch financial institutions meeting DNB regulations, PSD2 requirements, and data sovereignty mandates
Introduction
The Dutch financial sector faces stringent regulatory requirements from De Nederlandsche Bank (DNB), the European Banking Authority (EBA), and various EU directives. Moving financial workloads to AWS requires careful architectural design to meet compliance obligations while leveraging cloud benefits like scalability, resilience, and innovation.
This comprehensive guide explores how to build AWS architectures that comply with Dutch and European financial regulations, including DNB’s cloud guidelines, PSD2 payment services directive, DORA (Digital Operational Resilience Act), and MiFID II requirements.
What You’ll Learn:
- DNB cloud policy and regulatory requirements for Dutch banks
- PSD2 compliance architecture for payment service providers
- Data residency and sovereignty in EU regions (eu-central-1)
- Financial-grade security controls and encryption
- Audit logging and compliance reporting with CloudTrail
- Real-world architecture patterns for banking applications
- Infrastructure-as-Code examples for compliant deployments
Understanding Dutch Financial Regulations
De Nederlandsche Bank (DNB) Cloud Policy
DNB, the Dutch central bank and prudential supervisor, has established clear guidelines for financial institutions using cloud services:
Key Requirements:
- Risk Assessment: Comprehensive cloud risk assessment before adoption
- Data Location: Clear understanding of data storage locations
- Outsourcing Register: Registration of cloud providers in DNB’s outsourcing register
- Exit Strategy: Documented cloud exit and migration plans
- Audit Rights: Ability to audit cloud infrastructure and providers
- Business Continuity: Robust disaster recovery and business continuity plans
- Security Controls: Appropriate technical and organizational security measures
DNB’s Four Key Principles:
- Controllability: Maintain control over outsourced activities
- Integrity: Ensure data integrity and availability
-
- Continuity: Guarantee business continuity
- Compliance: Meet all regulatory obligations
European Financial Regulations
PSD2 (Payment Services Directive 2)
PSD2 revolutionized European payment services by requiring:
- Strong Customer Authentication (SCA): Multi-factor authentication for payments
- Open Banking APIs: Secure API access for third-party providers (TPPs)
- Dynamic Linking: Transaction-specific security codes
- Secure Communication: TLS 1.2+ for all API communications
DORA (Digital Operational Resilience Act)
Effective from January 2025, DORA mandates:
- ICT Risk Management: Comprehensive IT risk management framework
- Incident Reporting: Mandatory reporting of major ICT incidents to regulators
- Operational Resilience Testing: Regular resilience and penetration testing
- Third-Party Risk: Due diligence on ICT service providers (including cloud)
- Information Sharing: Participation in cyber threat information sharing
MiFID II
For investment firms, MiFID II requires:
- Transaction Reporting: Detailed logging of all transactions
- Data Retention: Minimum 5-year retention of transaction data
- Best Execution: Proof of best execution for client orders
- Clock Synchronization: UTC timestamp accuracy requirements
Data Residency and Sovereignty
EU Data Residency Requirements:
- Financial data must remain within EU/EEA jurisdictions
- Transfer to non-EU countries requires adequate safeguards
- GDPR compliance for personal financial data
- Schrems II implications for US cloud providers
AWS EU Regions:
- eu-central-1 (Frankfurt): Primary region for Dutch financial institutions
- eu-west-1 (Ireland): Common DR region
- eu-west-3 (Paris): Alternative EU region
- eu-north-1 (Stockholm): Nordic alternative
DNB-Compliant AWS Architecture Patterns
1. Data Residency Enforcement Architecture
// lib/data-residency-stack.ts
import * as cdk from 'aws-cdk-lib';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as kms from 'aws-cdk-lib/aws-kms';
import * as config from 'aws-cdk-lib/aws-config';
import * as iam from 'aws-cdk-lib/aws-iam';
import { Construct } from 'constructs';
export class DataResidencyStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, {
...props,
env: {
account: process.env.CDK_DEFAULT_ACCOUNT,
region: 'eu-central-1', // Enforce EU region
},
});
// Organization-wide SCP to prevent resource creation outside EU
const scpPolicy = new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
sid: 'DenyNonEURegions',
effect: iam.Effect.DENY,
actions: ['*'],
resources: ['*'],
conditions: {
StringNotEquals: {
'aws:RequestedRegion': [
'eu-central-1',
'eu-west-1',
'eu-west-3',
'eu-north-1',
],
},
},
}),
new iam.PolicyStatement({
sid: 'DenyS3CrossRegionReplication',
effect: iam.Effect.DENY,
actions: ['s3:PutReplicationConfiguration'],
resources: ['*'],
conditions: {
StringNotLike: {
's3:x-amz-replication-destination-region': 'eu-*',
},
},
}),
],
});
// KMS key for encryption with EU-only access
const financialDataKey = new kms.Key(this, 'FinancialDataKey', {
description: 'KMS key for financial data encryption - EU only',
enableKeyRotation: true,
removalPolicy: cdk.RemovalPolicy.RETAIN,
policy: new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
sid: 'Enable IAM User Permissions',
effect: iam.Effect.ALLOW,
principals: [new iam.AccountRootPrincipal()],
actions: ['kms:*'],
resources: ['*'],
}),
new iam.PolicyStatement({
sid: 'Deny non-EU access',
effect: iam.Effect.DENY,
principals: [new iam.AnyPrincipal()],
actions: ['kms:Decrypt', 'kms:DescribeKey'],
resources: ['*'],
conditions: {
StringNotEquals: {
'aws:RequestedRegion': [
'eu-central-1',
'eu-west-1',
'eu-west-3',
'eu-north-1',
],
},
},
}),
],
}),
});
// S3 bucket for financial data with strict controls
const financialDataBucket = new s3.Bucket(this, 'FinancialDataBucket', {
bucketName: `financial-data-${this.account}-eu-central-1`,
encryption: s3.BucketEncryption.KMS,
encryptionKey: financialDataKey,
versioned: true,
blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
enforceSSL: true,
objectOwnership: s3.ObjectOwnership.BUCKET_OWNER_ENFORCED,
lifecycleRules: [
{
id: 'TransitionToInfrequentAccess',
transitions: [
{
storageClass: s3.StorageClass.INTELLIGENT_TIERING,
transitionAfter: cdk.Duration.days(90),
},
],
},
{
id: 'RetainFor7Years',
// MiFID II requires 5-year retention, we use 7 for safety margin
expiration: cdk.Duration.days(2555),
},
],
removalPolicy: cdk.RemovalPolicy.RETAIN,
});
// AWS Config rule to monitor data residency
const s3LocationRule = new config.ManagedRule(this, 'S3BucketLocationRule', {
configRuleName: 's3-bucket-in-eu-regions',
identifier: config.ManagedRuleIdentifiers.S3_BUCKET_REPLICATION_ENABLED,
description: 'Ensures S3 buckets are in EU regions only',
});
// Custom Config rule for RDS location
const rdsLocationRule = new config.CustomRule(this, 'RDSLocationRule', {
configRuleName: 'rds-instance-in-eu-regions',
lambdaFunction: this.createLocationCheckLambda(),
periodic: true,
maximumExecutionFrequency: config.MaximumExecutionFrequency.TWELVE_HOURS,
});
// Bucket policy to prevent cross-region access
financialDataBucket.addToResourcePolicy(
new iam.PolicyStatement({
sid: 'DenyNonEUAccess',
effect: iam.Effect.DENY,
principals: [new iam.AnyPrincipal()],
actions: ['s3:GetObject', 's3:PutObject'],
resources: [financialDataBucket.arnForObjects('*')],
conditions: {
StringNotEquals: {
'aws:RequestedRegion': ['eu-central-1', 'eu-west-1'],
},
},
})
);
// CloudFormation stack policy to prevent region changes
const stackPolicy = new cdk.CfnStackPolicy(this, {
statements: [
new iam.PolicyStatement({
effect: iam.Effect.DENY,
principals: [new iam.AnyPrincipal()],
actions: ['Update:*'],
resources: ['*'],
conditions: {
StringNotEquals: {
'aws:RequestedRegion': ['eu-central-1'],
},
},
}),
],
});
// Outputs
new cdk.CfnOutput(this, 'FinancialDataBucketName', {
value: financialDataBucket.bucketName,
description: 'S3 bucket for financial data (EU-only)',
});
new cdk.CfnOutput(this, 'FinancialDataKMSKeyId', {
value: financialDataKey.keyId,
description: 'KMS key for financial data encryption',
});
}
private createLocationCheckLambda(): cdk.aws_lambda.Function {
return new cdk.aws_lambda.Function(this, 'LocationCheckFunction', {
runtime: cdk.aws_lambda.Runtime.PYTHON_3_11,
handler: 'index.handler',
code: cdk.aws_lambda.Code.fromInline(`
import boto3
import json
def handler(event, context):
"""Check if RDS instances are in allowed EU regions"""
rds = boto3.client('rds')
config_client = boto3.client('config')
allowed_regions = ['eu-central-1', 'eu-west-1', 'eu-west-3', 'eu-north-1']
current_region = context.invoked_function_arn.split(':')[3]
compliance = 'COMPLIANT' if current_region in allowed_regions else 'NON_COMPLIANT'
evaluation = {
'ComplianceResourceType': 'AWS::RDS::DBInstance',
'ComplianceResourceId': event.get('configRuleArn', 'unknown'),
'ComplianceType': compliance,
'OrderingTimestamp': event['notificationCreationTime']
}
config_client.put_evaluations(
Evaluations=[evaluation],
ResultToken=event['resultToken']
)
return {'compliance': compliance}
`),
timeout: cdk.Duration.seconds(30),
});
}
}
2. PSD2-Compliant API Gateway Architecture
// lib/psd2-api-stack.ts
import * as cdk from 'aws-cdk-lib';
import * as apigateway from 'aws-cdk-lib/aws-apigateway';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as cognito from 'aws-cdk-lib/aws-cognito';
import * as waf from 'aws-cdk-lib/aws-wafv2';
import * as acm from 'aws-cdk-lib/aws-certificatemanager';
import * as logs from 'aws-cdk-lib/aws-logs';
import { Construct } from 'constructs';
export class PSD2APIStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// Cognito User Pool for Strong Customer Authentication (SCA)
const userPool = new cognito.UserPool(this, 'PSD2UserPool', {
userPoolName: 'psd2-customer-authentication',
// PSD2 requires MFA
mfa: cognito.Mfa.REQUIRED,
mfaSecondFactor: {
sms: true,
otp: true, // Time-based OTP for authenticator apps
},
// Password policy aligned with PSD2 security requirements
passwordPolicy: {
minLength: 12,
requireLowercase: true,
requireUppercase: true,
requireDigits: true,
requireSymbols: true,
tempPasswordValidity: cdk.Duration.days(1),
},
// Account recovery
accountRecovery: cognito.AccountRecovery.EMAIL_AND_PHONE_WITHOUT_MFA,
// Advanced security features
advancedSecurityMode: cognito.AdvancedSecurityMode.ENFORCED,
// Device tracking for suspicious activity
deviceTracking: {
challengeRequiredOnNewDevice: true,
deviceOnlyRememberedOnUserPrompt: true,
},
// User attributes
standardAttributes: {
email: {
required: true,
mutable: false,
},
phoneNumber: {
required: true,
mutable: true,
},
},
customAttributes: {
'customerId': new cognito.StringAttribute({
minLen: 1,
maxLen: 50,
mutable: false,
}),
'accountType': new cognito.StringAttribute({
minLen: 1,
maxLen: 20,
mutable: true,
}),
},
});
// App client for web applications
const webAppClient = userPool.addClient('WebAppClient', {
authFlows: {
userPassword: true,
userSrp: true,
custom: true,
},
oAuth: {
flows: {
authorizationCodeGrant: true,
},
scopes: [
cognito.OAuthScope.OPENID,
cognito.OAuthScope.PROFILE,
cognito.OAuthScope.EMAIL,
],
},
// PSD2 requires session timeout
accessTokenValidity: cdk.Duration.minutes(60),
idTokenValidity: cdk.Duration.minutes(60),
refreshTokenValidity: cdk.Duration.days(1),
});
// Lambda authorizer for API Gateway with dynamic linking
const authorizerFunction = new lambda.Function(this, 'PSD2Authorizer', {
runtime: lambda.Runtime.PYTHON_3_11,
handler: 'index.handler',
code: lambda.Code.fromInline(`
import json
import boto3
import hashlib
import hmac
from datetime import datetime
def handler(event, context):
"""
PSD2-compliant authorizer with dynamic linking
Validates transaction-specific authentication codes
"""
token = event['authorizationToken']
method_arn = event['methodArn']
# Extract transaction details from headers
headers = event.get('headers', {})
transaction_amount = headers.get('X-Transaction-Amount')
transaction_currency = headers.get('X-Transaction-Currency')
payee_account = headers.get('X-Payee-Account')
# Validate dynamic linking (transaction-specific code)
dynamic_code = headers.get('X-Dynamic-Auth-Code')
# Verify the code is transaction-specific
expected_code = generate_dynamic_code(
transaction_amount,
transaction_currency,
payee_account,
token
)
if dynamic_code != expected_code:
raise Exception('Unauthorized: Invalid dynamic authentication code')
# Verify token with Cognito
cognito = boto3.client('cognito-idp')
try:
response = cognito.get_user(AccessToken=token)
user_id = response['Username']
# Generate IAM policy
policy = generate_policy(user_id, 'Allow', method_arn)
# Add user context
policy['context'] = {
'userId': user_id,
'transactionAmount': transaction_amount,
'transactionCurrency': transaction_currency,
'timestamp': datetime.utcnow().isoformat()
}
return policy
except Exception as e:
print(f"Authorization failed: {str(e)}")
raise Exception('Unauthorized')
def generate_dynamic_code(amount, currency, payee, token):
"""Generate transaction-specific authentication code"""
message = f"{amount}:{currency}:{payee}:{token}"
# Use HMAC-SHA256 for secure code generation
code = hmac.new(
'secret-key'.encode(),
message.encode(),
hashlib.sha256
).hexdigest()[:8]
return code
def generate_policy(principal_id, effect, resource):
"""Generate IAM policy for API Gateway"""
return {
'principalId': principal_id,
'policyDocument': {
'Version': '2012-10-17',
'Statement': [{
'Action': 'execute-api:Invoke',
'Effect': effect,
'Resource': resource
}]
}
}
`),
timeout: cdk.Duration.seconds(10),
environment: {
USER_POOL_ID: userPool.userPoolId,
},
});
// Payment initiation Lambda
const paymentFunction = new lambda.Function(this, 'PaymentFunction', {
runtime: lambda.Runtime.PYTHON_3_11,
handler: 'index.handler',
code: lambda.Code.fromInline(`
import json
import boto3
import uuid
from datetime import datetime
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('PaymentTransactions')
def handler(event, context):
"""Process PSD2 payment initiation"""
# Extract user context from authorizer
user_id = event['requestContext']['authorizer']['userId']
# Parse payment request
body = json.loads(event['body'])
payment_id = str(uuid.uuid4())
timestamp = datetime.utcnow().isoformat()
# Store transaction (MiFID II compliance)
transaction = {
'paymentId': payment_id,
'userId': user_id,
'amount': body['amount'],
'currency': body['currency'],
'payeeAccount': body['payeeAccount'],
'payerAccount': body['payerAccount'],
'timestamp': timestamp,
'status': 'INITIATED',
'authMethod': 'SCA_COMPLIANT',
'ipAddress': event['requestContext']['identity']['sourceIp'],
'userAgent': event['headers'].get('User-Agent', 'unknown')
}
table.put_item(Item=transaction)
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'X-Payment-Id': payment_id,
},
'body': json.dumps({
'paymentId': payment_id,
'status': 'INITIATED',
'timestamp': timestamp
})
}
`),
timeout: cdk.Duration.seconds(30),
});
// CloudWatch Logs with 7-year retention (MiFID II)
const apiLogs = new logs.LogGroup(this, 'PSD2APILogs', {
logGroupName: '/aws/apigateway/psd2-api',
retention: logs.RetentionDays.TEN_YEARS, // 3650 days
removalPolicy: cdk.RemovalPolicy.RETAIN,
});
// API Gateway with PSD2 requirements
const api = new apigateway.RestApi(this, 'PSD2API', {
restApiName: 'PSD2 Payment Services API',
description: 'PSD2-compliant payment initiation API',
deployOptions: {
stageName: 'prod',
tracingEnabled: true, // X-Ray tracing for audit
accessLogDestination: new apigateway.LogGroupLogDestination(apiLogs),
accessLogFormat: apigateway.AccessLogFormat.jsonWithStandardFields({
caller: true,
httpMethod: true,
ip: true,
protocol: true,
requestTime: true,
resourcePath: true,
responseLength: true,
status: true,
user: true,
}),
throttlingRateLimit: 1000,
throttlingBurstLimit: 2000,
},
// Enforce TLS 1.2 (PSD2 requirement)
policy: new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
effect: iam.Effect.DENY,
principals: [new iam.AnyPrincipal()],
actions: ['execute-api:Invoke'],
resources: ['execute-api:/*'],
conditions: {
StringNotEquals: {
'aws:SecureTransport': 'true',
},
},
}),
new iam.PolicyStatement({
effect: iam.Effect.DENY,
principals: [new iam.AnyPrincipal()],
actions: ['execute-api:Invoke'],
resources: ['execute-api:/*'],
conditions: {
NumericLessThan: {
's3:TlsVersion': 1.2,
},
},
}),
],
}),
});
// Request validator for input validation
const requestValidator = new apigateway.RequestValidator(
this,
'RequestValidator',
{
restApi: api,
requestValidatorName: 'payment-request-validator',
validateRequestBody: true,
validateRequestParameters: true,
}
);
// Payment initiation endpoint
const payments = api.root.addResource('payments');
const paymentMethod = payments.addMethod(
'POST',
new apigateway.LambdaIntegration(paymentFunction),
{
authorizer: new apigateway.TokenAuthorizer(this, 'PSD2TokenAuthorizer', {
handler: authorizerFunction,
identitySource: 'method.request.header.Authorization',
}),
requestValidator,
requestModels: {
'application/json': new apigateway.Model(this, 'PaymentModel', {
restApi: api,
contentType: 'application/json',
modelName: 'PaymentRequest',
schema: {
type: apigateway.JsonSchemaType.OBJECT,
required: ['amount', 'currency', 'payeeAccount', 'payerAccount'],
properties: {
amount: {
type: apigateway.JsonSchemaType.NUMBER,
minimum: 0.01,
},
currency: {
type: apigateway.JsonSchemaType.STRING,
enum: ['EUR', 'USD', 'GBP'],
},
payeeAccount: {
type: apigateway.JsonSchemaType.STRING,
pattern: '^[A-Z]{2}[0-9]{2}[A-Z0-9]+$', // IBAN format
},
payerAccount: {
type: apigateway.JsonSchemaType.STRING,
pattern: '^[A-Z]{2}[0-9]{2}[A-Z0-9]+$',
},
description: {
type: apigateway.JsonSchemaType.STRING,
maxLength: 140,
},
},
},
}),
},
}
);
// WAF for API protection
const webAcl = new waf.CfnWebACL(this, 'PSD2APIWAF', {
scope: 'REGIONAL',
defaultAction: { allow: {} },
rules: [
{
name: 'RateLimitRule',
priority: 1,
statement: {
rateBasedStatement: {
limit: 2000,
aggregateKeyType: 'IP',
},
},
action: { block: {} },
visibilityConfig: {
sampledRequestsEnabled: true,
cloudWatchMetricsEnabled: true,
metricName: 'RateLimitRule',
},
},
{
name: 'GeoBlockingRule',
priority: 2,
statement: {
geoMatchStatement: {
countryCodes: ['EU'], // Only allow EU traffic
},
},
action: { allow: {} },
visibilityConfig: {
sampledRequestsEnabled: true,
cloudWatchMetricsEnabled: true,
metricName: 'GeoBlocking',
},
},
],
visibilityConfig: {
sampledRequestsEnabled: true,
cloudWatchMetricsEnabled: true,
metricName: 'PSD2APIWAF',
},
});
// Associate WAF with API Gateway
new waf.CfnWebACLAssociation(this, 'WAFAssociation', {
resourceArn: api.deploymentStage.stageArn,
webAclArn: webAcl.attrArn,
});
// Outputs
new cdk.CfnOutput(this, 'APIEndpoint', {
value: api.url,
description: 'PSD2 API endpoint (TLS 1.2+ enforced)',
});
new cdk.CfnOutput(this, 'UserPoolId', {
value: userPool.userPoolId,
description: 'Cognito User Pool for SCA',
});
}
}
3. Audit Logging and Compliance Architecture
# scripts/dnb_compliance_audit.py
import boto3
import json
from datetime import datetime, timedelta
from typing import Dict, List
import csv
class DNBComplianceAuditor:
"""DNB compliance audit and reporting for financial institutions"""
def __init__(self, region: str = 'eu-central-1'):
self.cloudtrail = boto3.client('cloudtrail', region_name=region)
self.config = boto3.client('config', region_name=region)
self.s3 = boto3.client('s3', region_name=region)
self.guardduty = boto3.client('guardduty', region_name=region)
self.securityhub = boto3.client('securityhub', region_name=region)
self.region = region
def generate_dnb_audit_report(
self,
start_date: datetime,
end_date: datetime,
output_file: str = 'dnb_audit_report.json'
) -> Dict:
"""
Generate comprehensive DNB audit report
Covers:
- Data access patterns
- Configuration changes
- Security findings
- Compliance status
"""
print(f"Generating DNB audit report from {start_date} to {end_date}")
report = {
'report_metadata': {
'generated_at': datetime.utcnow().isoformat(),
'period_start': start_date.isoformat(),
'period_end': end_date.isoformat(),
'region': self.region,
'auditor': 'DNB Compliance System'
},
'data_access': self.audit_data_access(start_date, end_date),
'configuration_changes': self.audit_config_changes(start_date, end_date),
'security_findings': self.audit_security_findings(),
'compliance_status': self.check_compliance_status(),
'encryption_status': self.audit_encryption_status(),
'data_residency': self.verify_data_residency(),
}
# Save report
with open(output_file, 'w') as f:
json.dump(report, f, indent=2, default=str)
print(f"Audit report saved to {output_file}")
return report
def audit_data_access(
self,
start_date: datetime,
end_date: datetime
) -> Dict:
"""Audit all data access events"""
# Query CloudTrail for S3 data access
response = self.cloudtrail.lookup_events(
LookupAttributes=[
{
'AttributeKey': 'EventName',
'AttributeValue': 'GetObject'
},
],
StartTime=start_date,
EndTime=end_date,
MaxResults=1000
)
access_events = []
for event in response.get('Events', []):
event_data = json.loads(event['CloudTrailEvent'])
# Extract relevant information
access_events.append({
'timestamp': event['EventTime'].isoformat(),
'user': event.get('Username', 'unknown'),
'resource': event_data.get('requestParameters', {}).get('bucketName'),
'object_key': event_data.get('requestParameters', {}).get('key'),
'source_ip': event_data.get('sourceIPAddress'),
'user_agent': event_data.get('userAgent'),
'mfa_used': event_data.get('additionalEventData', {}).get('MFAUsed', 'false')
})
return {
'total_access_events': len(access_events),
'events': access_events,
'mfa_compliance_rate': self._calculate_mfa_rate(access_events)
}
def audit_config_changes(
self,
start_date: datetime,
end_date: datetime
) -> Dict:
"""Audit infrastructure configuration changes"""
# Get configuration timeline
response = self.config.get_resource_config_history(
resourceType='AWS::EC2::Instance',
laterTime=end_date,
earlierTime=start_date,
limit=100
)
config_changes = []
for item in response.get('configurationItems', []):
config_changes.append({
'timestamp': item['configurationItemCaptureTime'].isoformat(),
'resource_type': item['resourceType'],
'resource_id': item['resourceId'],
'configuration_state': item['configurationItemStatus'],
'changes': item.get('configurationItemDiff', {})
})
return {
'total_changes': len(config_changes),
'changes': config_changes
}
def audit_security_findings(self) -> Dict:
"""Audit security findings from GuardDuty and Security Hub"""
findings = {
'guardduty': [],
'security_hub': []
}
# Get GuardDuty findings
try:
detectors = self.guardduty.list_detectors()
for detector_id in detectors.get('DetectorIds', []):
finding_response = self.guardduty.list_findings(
DetectorId=detector_id,
FindingCriteria={
'Criterion': {
'severity': {
'Gte': 4 # Medium and above
}
}
}
)
if finding_response.get('FindingIds'):
finding_details = self.guardduty.get_findings(
DetectorId=detector_id,
FindingIds=finding_response['FindingIds'][:50]
)
for finding in finding_details.get('Findings', []):
findings['guardduty'].append({
'id': finding['Id'],
'type': finding['Type'],
'severity': finding['Severity'],
'title': finding['Title'],
'description': finding['Description'],
'resource': finding.get('Resource', {})
})
except Exception as e:
print(f"GuardDuty error: {str(e)}")
# Get Security Hub findings
try:
hub_findings = self.securityhub.get_findings(
Filters={
'ComplianceStatus': [
{'Value': 'FAILED', 'Comparison': 'EQUALS'}
],
'RecordState': [
{'Value': 'ACTIVE', 'Comparison': 'EQUALS'}
]
},
MaxResults=100
)
for finding in hub_findings.get('Findings', []):
findings['security_hub'].append({
'id': finding['Id'],
'title': finding['Title'],
'severity': finding['Severity']['Label'],
'compliance_status': finding.get('Compliance', {}).get('Status'),
'resource': finding.get('Resources', [{}])[0]
})
except Exception as e:
print(f"Security Hub error: {str(e)}")
return findings
def check_compliance_status(self) -> Dict:
"""Check AWS Config compliance rules status"""
compliance_summary = {
'compliant_rules': 0,
'non_compliant_rules': 0,
'rules_detail': []
}
# Get all Config rules
rules = self.config.describe_config_rules()
for rule in rules.get('ConfigRules', []):
rule_name = rule['ConfigRuleName']
# Get compliance status
compliance = self.config.describe_compliance_by_config_rule(
ConfigRuleNames=[rule_name]
)
for compliance_item in compliance.get('ComplianceByConfigRules', []):
status = compliance_item['Compliance']['ComplianceType']
if status == 'COMPLIANT':
compliance_summary['compliant_rules'] += 1
else:
compliance_summary['non_compliant_rules'] += 1
compliance_summary['rules_detail'].append({
'rule_name': rule_name,
'compliance_status': status,
'description': rule.get('Description', '')
})
return compliance_summary
def audit_encryption_status(self) -> Dict:
"""Verify encryption status of all data stores"""
encryption_audit = {
's3_buckets': [],
'rds_instances': [],
'ebs_volumes': []
}
# Check S3 bucket encryption
buckets = self.s3.list_buckets()
for bucket in buckets.get('Buckets', []):
bucket_name = bucket['Name']
try:
encryption = self.s3.get_bucket_encryption(Bucket=bucket_name)
encryption_audit['s3_buckets'].append({
'bucket': bucket_name,
'encrypted': True,
'algorithm': encryption['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm']
})
except self.s3.exceptions.ServerSideEncryptionConfigurationNotFoundError:
encryption_audit['s3_buckets'].append({
'bucket': bucket_name,
'encrypted': False,
'algorithm': None
})
# Check RDS encryption
rds = boto3.client('rds', region_name=self.region)
db_instances = rds.describe_db_instances()
for db in db_instances.get('DBInstances', []):
encryption_audit['rds_instances'].append({
'db_instance': db['DBInstanceIdentifier'],
'encrypted': db['StorageEncrypted'],
'kms_key': db.get('KmsKeyId') if db['StorageEncrypted'] else None
})
# Check EBS encryption
ec2 = boto3.client('ec2', region_name=self.region)
volumes = ec2.describe_volumes()
for volume in volumes.get('Volumes', []):
encryption_audit['ebs_volumes'].append({
'volume_id': volume['VolumeId'],
'encrypted': volume['Encrypted'],
'kms_key': volume.get('KmsKeyId') if volume['Encrypted'] else None
})
return encryption_audit
def verify_data_residency(self) -> Dict:
"""Verify all resources are in allowed EU regions"""
allowed_regions = ['eu-central-1', 'eu-west-1', 'eu-west-3', 'eu-north-1']
residency_status = {
'compliant': True,
'violations': [],
'resources_by_region': {}
}
ec2 = boto3.client('ec2', region_name=self.region)
# Check all regions for resources
for region in ec2.describe_regions()['Regions']:
region_name = region['RegionName']
# Count resources in each region
regional_ec2 = boto3.client('ec2', region_name=region_name)
instances = regional_ec2.describe_instances()
instance_count = sum(
len(reservation['Instances'])
for reservation in instances['Reservations']
)
if instance_count > 0:
residency_status['resources_by_region'][region_name] = instance_count
if region_name not in allowed_regions:
residency_status['compliant'] = False
residency_status['violations'].append({
'region': region_name,
'resource_count': instance_count,
'violation': 'Resources outside EU regions'
})
return residency_status
def _calculate_mfa_rate(self, access_events: List[Dict]) -> str:
"""Calculate MFA usage rate"""
if not access_events:
return "0%"
mfa_events = sum(1 for e in access_events if e.get('mfa_used') == 'true')
rate = (mfa_events / len(access_events)) * 100
return f"{rate:.2f}%"
# Example usage
if __name__ == '__main__':
auditor = DNBComplianceAuditor(region='eu-central-1')
# Generate audit report for last 30 days
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=30)
report = auditor.generate_dnb_audit_report(
start_date=start_date,
end_date=end_date,
output_file='dnb_compliance_report.json'
)
print("\n=== DNB Compliance Summary ===")
print(f"Total data access events: {report['data_access']['total_access_events']}")
print(f"MFA compliance rate: {report['data_access']['mfa_compliance_rate']}")
print(f"Configuration changes: {report['configuration_changes']['total_changes']}")
print(f"Data residency compliant: {report['data_residency']['compliant']}")
Best Practices for Dutch Financial Institutions
1. Data Governance
- Implement data classification and tagging
- Enforce EU-only data residency with SCPs
- Use KMS for all encryption with EU-based keys
- Maintain detailed data lineage documentation
- Regular data protection impact assessments (DPIA)
2. Security Controls
- Implement defense in depth architecture
- Use AWS Security Hub for centralized security
- Enable GuardDuty for threat detection
- Enforce MFA for all privileged access
- Regular penetration testing (DORA requirement)
3. Audit and Compliance
- Enable CloudTrail in all regions with EU storage
- Implement Config rules for continuous compliance
- Maintain 7-year audit logs (MiFID II)
- Regular compliance reporting to DNB
- Document all third-party cloud services
4. Business Continuity
- Multi-AZ deployment for high availability
- Cross-region DR in EU regions only
- Regular DR testing and documentation
- RTO/RPO aligned with DNB expectations
- Documented cloud exit strategy
5. Operational Excellence
- Infrastructure-as-Code for all deployments
- Automated compliance checking in CI/CD
- Regular training on cloud financial regulations
- Incident response playbooks
- Change management aligned with DNB requirements
Conclusion
Building compliant AWS architectures for the Dutch financial sector requires deep understanding of DNB regulations, PSD2 requirements, and emerging frameworks like DORA. By implementing proper data residency controls, strong authentication mechanisms, comprehensive audit logging, and robust security controls, financial institutions can leverage AWS cloud benefits while meeting regulatory obligations.
Key Takeaways:
- Enforce EU data residency with AWS Organizations SCPs
- Implement PSD2-compliant authentication with Cognito MFA
- Maintain comprehensive audit trails with CloudTrail and Config
- Use Infrastructure-as-Code for compliance automation
- Regular testing and documentation of DR procedures
- Engage with DNB on cloud outsourcing notifications
Ready to build DNB-compliant AWS architectures? Forrict specializes in financial services cloud implementations, helping Dutch banks and payment providers navigate regulatory requirements while modernizing their infrastructure.
Resources
- DNB Cloud Computing Guidelines
- PSD2 Technical Standards (EBA)
- DORA - Digital Operational Resilience Act
- AWS Financial Services Compliance
- AWS GDPR Center
Forrict Team
AWS expert and consultant at Forrict, specializing in cloud architecture and AWS best practices for Dutch businesses.