integrates
group as our main container.# integrates/back/integrates/custom_utils/logs.py
TRANSACTIONS_LOGGER: logging.Logger = logging.getLogger("transactional") def cloudwatch_log( request: Request | WebSocket | Any,
msg: str,
extra: dict,
user_email: str = "",
) -> None: if user_email:
TRANSACTIONS_LOGGER.info(
msg,
extra={
"extra": {
"environment": FI_ENVIRONMENT, "user_email": user_email, **extra}
},
)
else:
aio.to_background(cloudwatch_log_async(request, msg, extra))
user_email
is provided, the function logs synchronously with the specified user context. Otherwise, it delegates to an asynchronous function that extracts the user information from the request context without blocking the main execution thread.# integrates/back/integrates/custom_utils/logs.py
async def cloudwatch_log_async(request: Request | WebSocket | Any, msg: str, extra: dict) -> None:
try:
user_data = await sessions_domain.get_jwt_content(request)
except (ExpiredToken, InvalidAuthorization):
user_data = {"user_email": "unauthenticated"}
TRANSACTIONS_LOGGER.info(
msg,
extra={
"extra": {
"environment": FI_ENVIRONMENT,
"user_email": user_data["user_email"],
**extra,
}
},
)
Watchtower
as a handler for Python logging, configured centrally through our logging configuration system. This handler efficiently transmits logs to CloudWatch while providing important features like queuing, batching, and throttling:# integrates/back/integrates/settings/logging/handlers.py
def get_watchtower_handler(*, stream_name: str) -> dict:
return {
"boto3_client": BOTO3_SESSION.client("logs"),
"class": "watchtower.CloudWatchLogHandler",
"create_log_group": False,
"create_log_stream": False,
"filters": ["production_only"],
"formatter": "json",
"level": "INFO",
"log_group_name": "integrates",
"log_stream_name": stream_name,
"use_queues": True,
}
JSON
structure for easy querying and parsing.production_only
filter ensures development logs don't flood our production CloudWatch account.use_queues
option enables background processing of log events, preventing application blocking.boto3
session with appropriate IAM credentials.JSON
formatters that properly handle complex Python objects.# integrates/back/integrates/api/mutations/add_secret.py
logs_utils.cloudwatch_log(
info.context,
"Blocked attept to create ROOT type secret for GitRoot",
extra={
"group_name": kwargs["group_name"],
"resource_type": resource_type,
"resource_id": kwargs["resource_id"],
"root_type": "GitRoot",
"user_email": email,
"log_type": "Security",
},
)
# integrates/back/integrates/api/mutations/add_secret.py
logs_utils.cloudwatch_log( info.context, "Added secret",
extra={ "group_name": kwargs["group_name"], "resource_type": resource_type, "resource_id": kwargs["resource_id"], "log_type": "Security", }, )
# integrates/back/integrates/api/__init__.py logs_utils.cloudwatch_log( request, "GraphQL mutation executed",
extra={ "operation": operation_name,
"log_type": "Audit",
}, )
# Security events by user in the last 24h
fields @timestamp, extra.user_email, message, extra.resource_type
| filter extra.log_type = "Security"
| sort @timestamp desc
| limit 100
#Errors in critical operations fields @timestamp, message, extra.operation | filter level = "ERROR"
| sort @timestamp desc | limit 50