Tracking des accès à la donnée dans AWS

Surveiller les accès à la donnée dans AWS

Au sein d’une Data Platform nous pouvons avoir besoin de surveiller les accès à la donnée en lecture de l’ensemble de nos utilisateurs. AWS a d’ailleurs publié un article nommé Auditing, inspecting, and visualizing Amazon Athena usage and cost qui propose une solution pour le mettre en place.

Dans cet article je vais expliciter comment j’ai implémenté l’architecture décrite par AWS et vous partager mes conseils

Les exemples de code qui seront donnés dans l’article seront des extraits pour éviter la surcharge. Pour récupérer les exemples complets, je vous invite à aller voir notre repository GitHub sur le tracking d’accès à la donnée sur AWS.

Voici ce que nous allons déployer ensemble dans cet article :

Tracking Data Access schema AWS

Quels types d’accès surveiller ?

Dans l’exemple que nous allons implémenter, nous choisirons de surveiller les types d’accès les plus courants sur AWS :

  • Un GetObject sur S3
  • Une requête via Athena

Quels informations collecter ?

Ce qui est intéressant c’est de pouvoir vérifier qui a accédé à de la donnée, quand, et quelle donnée ? Pour cela il est intéressant de récupérer à minima :

  • Le chemin dans S3 du fichier
  • L’identifiant de la personne
  • La date et l’heure à laquelle la requête a été lancé
  • La requête SQL (si c’est Athena)
  • L’action (si c’est S3)

Les événements à collecter

Les événements S3

Sur S3 nous n’avons qu’un événement à collecter, c’est le GetObject. C’est l’événement produit quand on télécharge un fichier, peu importe comment. Directement depuis la console web ou via son terminal en local avec un simple aws s3 cp, les deux produiront un GetObject.

Pour le récupérer nous avons besoin de créer un CloudTrail spécifique. Dans la documentation Terraform de aws_cloudtrail, on trouve un exemple qui s’appelle : Logging All S3 Object Events Except For Two S3 Buckets By Using Advanced Event Selectors.

Cet exemple est intéressant car comme son nom l’indique, il permet de récupérer tous les événements venant de S3 sur des objets sauf pour 2 buckets. C’est un pattern très pratique quand par défaut on veut tout surveiller, sauf exception.

Notre exception sera le bucket S3 qui stockera les données de surveillance d’accès à la donnée, qui ne fait pas partie de la donnée métier de la data platform.

resource "aws_cloudtrail" "s3_data_access" {
  name                          = "s3-trail"
  s3_bucket_name                = aws_s3_bucket.firehose_output.bucket
  is_multi_region_trail         = true
  include_global_service_events = true
  enable_log_file_validation    = true
  enable_logging                = true

  advanced_event_selector {
    name = "Log all S3 objects events except some S3 buckets"

    field_selector {
      field  = "eventCategory"
      equals = ["Data"]
    }

    field_selector {
      field = "resources.ARN"

      not_starts_with = [
        aws_s3_bucket.firehose_output.arn
      ]
    }

    field_selector {
      field  = "resources.type"
      equals = ["AWS::S3::Object"]
    }
  }
}

Les événements Athena

Sur Athena c’est un peu plus compliqué. Par défaut lorsqu’on lance une requête, on peut capter les types d’événements suivant avec EventBridge :

resource "aws_cloudwatch_event_rule" "athena_event_query_execution" {
  name        = "athena_event_query_execution"
  description = "Trigger on Athena StartQueryExecution"
  event_pattern = jsonencode({
    "detail-type": [
      "Athena Query State Change"
    ],
    "source": [
      "aws.athena"
    ],
    "detail": {
      "currentState": [
        "SUCCEEDED",
        "FAILED",
        "CANCELED"
      ]
    }
  })
}

Qui vont ressembler à ça :

{
  "version": "0",
  "id": "ce413979-8b1f-54dd-ff93-85d0426fded1",
  "detail-type": "Athena Query State Change",
  "source": "aws.athena",
  "account": "xxx",
  "time": "2025-06-17T13:19:33Z",
  "region": "eu-west-1",
  "resources":[],
  "detail":
  {
    "currentState": "SUCCEEDED",
    "previousState": "RUNNING",
    "queryExecutionId": "3aa90150-b6f8-4d93-a674-2b794bd509cc",
    "sequenceNumber": "3",
    "statementType": "DML",
    "versionId": "0",
    "workgroupName": "tracking-data-access-wg"
  }
}

Nous avons bien l’heure à laquelle la requête a été lancée, mais c’est tout. Pour avoir les informations sur l’identité de l’utilisateur, il va falloir passer par un CloudTrail encore une fois.

resource "aws_cloudtrail" "athena" {
  name                          = "athena-trail"
  s3_bucket_name                = aws_s3_bucket.firehose_output.bucket
  is_multi_region_trail         = true
  include_global_service_events = true
  enable_log_file_validation    = true
  enable_logging                = true

  event_selector {
    read_write_type           = "All"
    include_management_events = true
  }
}

Capter les événements

CloudTrail nous permet de collecter les événements pour les stocker sur S3. Sauf qu’en l’état, ces événements ne sont pas requêtable simplement via Athena.

Nous avons besoin d’une étape de traitement des événements pour récupérer les informations qui nous intéressent. Pour cela nous pouvons utiliser une Lambda. Mais pour la déclencher il faut capter les événements de CloudTrail et les envoyer vers notre Lambda.

EventBridge est la solution poussé par AWS pour ce cas d’usage. Je trouve également que ce service est idéal dans notre cas car :

  • Le service est gratuit tant qu’on est sur des événements managés par AWS envoyés à un service managé du même compte, c’est notre cas ici
  • Nous n’avons pas besoin d’un temps de réaction très rapide
  • Nous n’avons pas besoin d’une garantie dans l’ordre de traitement des événements

Il nous faut créer 2 règles pour nos 2 types d’événements

resource "aws_cloudwatch_event_rule" "athena_event_user_details" {
  name        = "athena_event_user_details"
  description = "Trigger on Athena StartQueryExecution"
  event_pattern = jsonencode({
    "detail-type": ["AWS API Call via CloudTrail"],
    "source": ["aws.athena"],
    "detail": {
      "eventName": ["StartQueryExecution"]
    }
  })
}

resource "aws_cloudwatch_event_rule" "s3_event_get_object" {
  name        = "s3_event_get_object"
  description = "Trigger on GetObject via CloudTrail"
  event_pattern = jsonencode({
    "source": ["aws.s3"],
    "detail-type": ["AWS API Call via CloudTrail"],
    "detail": {
      "eventName": ["GetObject"]
      "sourceIPAddress": [{ "anything-but": ["athena.amazonaws.com"] }]
    }
  })
}

Il y a une petite subtilité pour les événements de GetObject sur S3. Lorsque nous lançons une requête avec Athena par exemple, celui-ci va s’occuper à notre place de faire des GetObject sur S3. Si vous avez une source de données composée de millier de fichiers que Athena doit lire, cela va créer 1000 lignes dans notre table de surveillance des accès à la donnée.

Comme nous avons prévu de traquer les requêtes faites avec Athena, il est plus pratique d’ignorer les GetObject de celui-ci pour éviter d’être inondé d’événements redondants. Pour cela il suffit de surveiller le champs sourceIPAddress pour filtrer l’IP de Athena.

Traitement sur les événements

Pour traiter nos événements et conserver que les informations qui nous intéressent, on va utiliser une lambda :

data "archive_file" "lambda_package" {
  type        = "zip"
  source_file = "../tracking-data-access.py"
  output_path = "tracking-data-access.zip"
}

resource "aws_lambda_function" "athena_audit" {
  function_name = "tracking-data-access"
  role          = aws_iam_role.lambda_exec.arn
  handler       = "tracking-data-access.lambda_handler"
  runtime       = "python3.12"
  filename      = data.archive_file.lambda_package.output_path
  source_code_hash = data.archive_file.lambda_package.output_base64sha256
  timeout = 60

  environment {
    variables = {
      FIREHOSE_STREAM_NAME = aws_kinesis_firehose_delivery_stream.write_data_access.name
    }
  }
}

Ce qui va le plus nous intéresser c’est le code de la Lambda, que je vous propose en Python.

Le handler

def lambda_handler(event, context):
    detail_type = event.get("detail-type")
    detail = event.get("detail", {})

    if (
        detail_type == "AWS API Call via CloudTrail"
        and detail.get("eventName") == "StartQueryExecution"
    ):
        output = extract_user_access_details(event)
    elif (
        detail_type == "AWS API Call via CloudTrail"
        and detail.get("eventName") == "GetObject"
    ):
        output = extract_s3_access_details(event)
    else:
        print("Event type not handled by this function.")
        return {"statusCode": 204, "body": json.dumps("Ignored event type.")}

    firehose.put_record(
        DeliveryStreamName=FIREHOSE_STREAM,
        Record={"Data": (json.dumps(output) + "\n").encode("utf-8")},
    )

    return {"statusCode": 200, "body": json.dumps("Event processed successfully")}

Comme nous pouvons recevoir 2 événements différents dans notre Lambda, le handler est un simple if / else qui redirige le parsing en fonction du type d’événement reçu.

Une fois le parsing fait, on l’envoie à Firehose pour qu’il écrive des micro batch en parquet sur S3.

Récupérer les informations d’un GetObject sur s3

Ici pas de trop de surprise, l’événement contient tout ce dont on a besoin, il n’y a qu’à se servir (j’ai retiré des champs pour l’exemple) :

{
    "detail-type": "AWS API Call via CloudTrail",
    "source": "aws.s3",
    "account": "xxx",
    "detail":
    {
        "userIdentity":
        {
            "type": "IAMUser",
            "principalId": "xxx",
            "arn": "arn:aws:iam::xxx:user/franck.cussac",
            "accountId": "xxx",
            "accessKeyId": "xxx",
            "userName": "franck.cussac"
        },
        "eventTime": "2025-06-17T10:00:28Z",
        "eventSource": "s3.amazonaws.com",
        "eventName": "GetObject",
        "awsRegion": "eu-west-1",
        "sourceIPAddress": "xxx",
        "requestParameters":
        {
            "bucketName": "hymaia-datalake",
            "Host": "hymaia-datalake.s3.eu-west-1.amazonaws.com'",
            "key": "data.json"
        },
        "eventCategory": "Data"
    }
}
def extract_s3_access_details(event):
    detail = event.get("detail", {})
    return {
        "event_type": "s3_user_access",
        "event_time": event.get("time", "<not found>"),
        "user_arn": detail.get("userIdentity", {}).get("arn"),
        "file_path": f"s3://{detail.get('requestParameters', {}).get(
            'bucketName')}/{detail.get('requestParameters', {}).get('key', "<not found>")}",
        "action": detail.get("eventName", "<not found>"),
    }

Récupérer les informations d’une requête Athena

Voici l’événement que reçoit notre Lambda en sortie de EventBridge :

{
    "detail-type": "AWS API Call via CloudTrail",
    "source": "aws.athena",
    "account": "xxx",
    "time": "2025-06-17T13:19:31Z",
    "region": "eu-west-1",
    "detail":
    {
        "userIdentity":
        {
            "type": "IAMUser",
            "principalId": "xxx",
            "arn": "arn:aws:iam::xxx:user/franck.cussac",
            "accountId": "xxx",
            "accessKeyId": "xxx",
            "userName": "franck.cussac",
        },
        "eventTime": "2025-06-17T13:19:31Z",
        "eventSource": "athena.amazonaws.com",
        "eventName": "StartQueryExecution",
        "sourceIPAddress": "xxx",
        "requestParameters":
        {
            "queryString": "***OMITTED***",
            "queryExecutionContext":
            {
                "database": "data_access",
                "catalog": "AwsDataCatalog"
            },
            "workGroup": "tracking-data-access-wg",
        },
        "responseElements":
        {
            "queryExecutionId": "3aa90150-b6f8-4d93-a674-2b794bd509cc"
        },
    }
} 

Cette fois-ci il y a tout sauf 1 information : la requête SQL lancée. Contrairement aux xxx que j’ai moi-même entré pour cacher mes informations personnelles, on peut voir que le champs queryString qui contient la requête SQL est caché.

Pour obtenir cette information, il faut requêter Athena via le queryExecutionId :

def extract_user_access_details(event):
    detail = event.get("detail", {})
    query_id = detail.get("responseElements", {}).get("queryExecutionId")

    if not query_id:
        raise ValueError("queryExecutionId missing in event.")

    try:
        response = athena.get_query_execution(QueryExecutionId=query_id)
        q = response["QueryExecution"]

        return {
            "event_type": "athena_user_access",
            "query_execution_id": detail.get("responseElements", {}).get(
                "queryExecutionId", "<not found>"
            ),
            "query": q.get("Query", "<not found>"),
            "user_arn": detail.get("userIdentity", {}).get("arn", "<not found>"),
            "event_time": event.get("time", "<not found>"),
        }
    except Exception as e:
        print(f"[ERROR] Failed to retrieve Athena query execution: {e}")
        raise

Écrire les informations dans s3

Dernière étape, écrire dans s3 ce qu’on a collecté et rendre la donnée accessible dans le Glue Catalog.

Pour cela Firehose est notre ami. Ce service fonctionne comme un accumulateur de données qui va écrire après qu’une de ses 2 conditions soit remplie :

  • Un certain temps s’est écoulé
  • Une certaine quantité de données a été reçue

Nous pouvons également configurer Firehose pour qu’il écrive au format parquet. Pour cela il faut lui donner une table Glue décrivant le schéma de la donnée cible.

Dernier point à ne jamais négliger quand on traite de la donnée : le partitionnement. Dans 5 ans, lorsqu’on aura collecté quelques centaines de Go de données, il sera beaucoup plus confortable et moins cher de pouvoir optimiser nos recherches d’accès en filtrant sur des partitions. La date du jour est l’exemple le plus typique.

resource "aws_kinesis_firehose_delivery_stream" "write_data_access" {
  name        = "write-data-access"
  destination = "extended_s3"

  extended_s3_configuration {
    role_arn            = aws_iam_role.firehose_role.arn
    bucket_arn          = aws_s3_bucket.firehose_output.arn
    prefix              = "data-access/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/"
    error_output_prefix = "errors/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
    buffering_interval  = 15 # en seconde
    buffering_size      = 64 # en MB

    compression_format = "UNCOMPRESSED"

    data_format_conversion_configuration {
      input_format_configuration {
        deserializer {
          open_x_json_ser_de {}
        }
      }
      output_format_configuration {
        serializer {
          parquet_ser_de {}
        }
      }
      enabled = true
      schema_configuration {
        database_name = aws_glue_catalog_database.data_access.name
        role_arn      = aws_iam_role.firehose_role.arn
        table_name    = aws_glue_catalog_table.data_access.name
      }
    }
  }
}

Attention : On va rencontrer un problème avec Firehose et le partitionnement, c’est qu’il ne gère pas lui même la mise à jour dans le catalogue Glue des nouvelles partitions créées. Cela signifie que dans notre cas, où chaque jour nous créons une nouvelle partition, il va falloir mettre à jour la table.

Pour notre exemple nous nous arrêterons à l’utilisation simple de la commande suivante :

MSCK REPAIR TABLE read_actions

Mais pour une version plus automatisée, on pourrait utiliser les crawlers de Glue. Un prochain article peut-être.

Plus qu’à tester !

Pour tester je vous invite à suivre les instructions du repository GitHub où vous retrouverez le terraform complet ainsi que le code de la lambda.

Aller plus loin

Quelques idées pour aller plus loin :

Nous aurions pu collecter d’autres informations comme :

  • l’adresse IP de l’utilisateur
  • la quantité de données traitée par la requête Athena

Actuellement chaque requête dans la table qui historise les accès à la donnée est historisée dans cette même table, ce qui pourrait être filtré.

Nous pourrions mettre en place une DLQ en cas d’échec de traitement d’un événement par la lambda, pour éviter d’en perdre.

Dans l’article d’AWS sur lequel je me suis basé, il est proposé de créer 1 pipeline d’ingestion par type d’événement afin de bien tout séparer. Cela signifie dans notre cas qu’on devrait créer 2 lambdas, 2 firehoses et 2 tables Glue.

Datadictionary

Poetry Python

Poetry est un outil de gestion de dépendances et de packaging pour Python.

lire plus

Evénements sur le sujet

toutes les événements
No items found.
Télécharger le PDF
10 écueils limitant l’impact de la Data sur les produits et organisations
télécharger
Télécharger le PDF
LLM & IA Generative -
La voie de la raison
télécharger
Télécharger le PDF
Produits d'IA -
Kit de survie pour Product Manager Ambitieux
télécharger

Les prochains événements Hymaïa

avec

Hymaday - AI Adoption At Scale

📅
1/7/2025
👯
100 participants

Forward Data Conference 2025

📅
24/11/2025
👯
700 participants