cloudwatch_log function as a centralized point to send structured logs to CloudWatch. This utility ensures consistent formatting, proper context inclusion, and asynchronous operation to minimize performance impact:# integrates/back/integrates/custom_utils/logs.py
TRANSACTIONS_LOGGER: logging.Logger = logging.getLogger("transactional") def cloudwatch_log( request: Request | WebSocket | Any,msg: str,extra: dict,user_email: str = "",) -> None: if user_email:TRANSACTIONS_LOGGER.info(
            msg,
            extra={
                "extra": {
                    "environment": FI_ENVIRONMENT, "user_email": user_email, **extra}
            },        
        )
    else:        
        aio.to_background(cloudwatch_log_async(request, msg, extra))user_email is provided, the function logs synchronously with the specified user context. Otherwise, it delegates to an asynchronous function that extracts the user information from the request context without blocking the main execution thread.# integrates/back/integrates/custom_utils/logs.pyasync def cloudwatch_log_async(request: Request | WebSocket | Any, msg: str, extra: dict) -> None:
    try:        
user_data = await sessions_domain.get_jwt_content(request)except (ExpiredToken, InvalidAuthorization):
        user_data = {"user_email": "unauthenticated"}
    TRANSACTIONS_LOGGER.info(
        msg,
        extra={
            "extra": {
                "environment": FI_ENVIRONMENT,
                "user_email": user_data["user_email"],
                **extra,
            }
        },
    )
Watchtower as a handler for Python logging, configured centrally through our logging configuration system. This handler efficiently transmits logs to CloudWatch while providing important features like queuing, batching, and throttling:# integrates/back/integrates/settings/logging/handlers.py
def get_watchtower_handler(*, stream_name: str) -> dict:return {"boto3_client": BOTO3_SESSION.client("logs"),"class": "watchtower.CloudWatchLogHandler","create_log_group": False,"create_log_stream": False,"filters": ["production_only"],"formatter": "json","level": "INFO","log_group_name": "integrates","log_stream_name": stream_name,"use_queues": True,}
JSON structure for easy querying and parsing.production_only filter ensures development logs don't flood our production CloudWatch account.use_queues option enables background processing of log events, preventing application blocking.boto3 session with appropriate IAM credentials.JSON formatters that properly handle complex Python objects.# integrates/back/integrates/api/mutations/add_secret.py
logs_utils.cloudwatch_log(
    info.context,
    "Blocked attept to create ROOT type secret for GitRoot",    
    extra={
        "group_name": kwargs["group_name"],        
        "resource_type": resource_type,        
        "resource_id": kwargs["resource_id"],
        "root_type": "GitRoot",        
        "user_email": email,        
        "log_type": "Security",    
    },
)
# integrates/back/integrates/api/mutations/add_secret.pylogs_utils.cloudwatch_log( info.context, "Added secret",extra={ "group_name": kwargs["group_name"], "resource_type": resource_type, "resource_id": kwargs["resource_id"], "log_type": "Security", }, )
# integrates/back/integrates/api/__init__.py logs_utils.cloudwatch_log( request, "GraphQL mutation executed",extra={ "operation": operation_name,"log_type": "Audit",}, )
fields @timestamp, extra.user_email, message, extra.resource_type
| filter extra.log_type = "Security"| sort @timestamp desc
| limit 100fields @timestamp, message, extra.operation | filter level = "ERROR"| sort @timestamp desc | limit 50