sifts_state
PAY_PER_REQUEST
(on-demand).hash_key
(pk), range_key
(sk).resource "aws_dynamodb_table" "sifts_state" {
name = "sifts_state"
billing_mode = "PAY_PER_REQUEST"
hash_key = "pk"
range_key = "sk"
deletion_protection_enabled = true
point_in_time_recovery {
enabled = true
}
server_side_encryption {
enabled = true
}
}
resource "aws_lambda_event_source_mapping" "integrates_streams" {
for_each = local.triggers
event_source_arn = aws_dynamodb_table.integrates_vms.stream_arn
function_name = aws_lambda_function.integrates_streams[each.key].arn
batch_size = each.value.batch_size
bisect_batch_on_function_error = true
maximum_retry_attempts = -1
starting_position = "TRIM_HORIZON"
}
/aws/lambda/integrates_streams_*
.ConsumedReadCapacityUnits
/ ConsumedWriteCapacityUnits
ReadThrottleEvents
/ WriteThrottleEvents
ThrottledRequests
(at API level)SystemErrors
/ UserErrors
SuccessfulRequestLatency
IteratorAge
- Age of last processed event (critical for detecting lag)Errors
- Failed executionsDuration
- Processing timeThrottles
- Lambda concurrency issues/aws/lambda/integrates_streams_*
.fields @timestamp, @message
| filter @message like /ProvisionedThroughputExceededException|Throttling|TransactionCanceledException/
| sort @timestamp desc
| limit 100
fields @timestamp, @message
| filter @message like /ConditionalCheckFailedException/
| sort @timestamp desc
| limit 100
fields @timestamp, @message
| filter @message like /ClientError|ReadTimeout|ConnectTimeout|Retry/
| sort @timestamp desc
| limit 100
fields @timestamp, @message
| filter @message like /BulkIndexError|bisect|retry|validation|UnprocessedItems/
| sort @timestamp desc
| limit 100
runs/vpc/infra/alarms.tf
. resource "aws_cloudwatch_metric_alarm" "dynamodb_write_capacity_alarm" {
for_each = toset(var.dynamodb_table_names)
metric_name = "ConsumedWriteCapacityUnits"
namespace = "AWS/DynamoDB"
period = 300
statistic = "Sum"
threshold = var.write_capacity_threshold
alarm_actions = [aws_sns_topic.central_alarms.arn]
}
var.dynamodb_table_names
variable.