DataFrames PySpark & Pandas : très similaires à l'usage, mais un fonctionnement interne très différent
L’erreur classique est de lire un gros volume de données avec Spark, le convertir en Pandas parce qu’on est plus à l’aise pour débugger et de continuer toutes nos transformations avec Pandas, parce que… bah c’est pareil non ?
Bah non. Et la sentence sera sans équivoque : Out Of Memory (OOM), Java Heap space. Mais qu’est-ce que cela signifie réellement ? Reprenons depuis le début et prenons le temps de bien détailler les différences entre Pandas et PySpark.
Qu’est-ce que la Java Heap ?
Lorsqu’une application Java est lancée, il faut allouer des ressources pour exécuter le code mais aussi pour tout ce qui est gestionnaire de thread et compilateur JIT. La mémoire allouée pour exécuter le code est appelée Java Heap, et pour le reste on parle de Non-Heap.
Je code en python, pourquoi j’ai une erreur Java ?
Spark étant développé en Scala, il utilise une JVM pour fonctionner. Lorsque vous utilisez l’API Python de Spark, votre SparkContext est créé dans un process Python mais Spark crée également un process JVM qui s’appelle Py4J. Ce process fait le lien entre votre SparkContext Python et les Spark Workers qui font partie du monde Java en convertissant les instructions PySpark en leur équivalent Scala Spark.
Pour les UDFs, c’est l’inverse. Mais nous y reviendrons une prochaine fois dans un article dédié.
Maintenant que l’on sait que même en PySpark nous utilisons des JVMs, on comprend mieux pourquoi on se retrouve avec des erreurs Java en Python.
La cause la plus fréquente de l’OOM Java Heap Space
D’après mon expérience, la cause la plus fréquente d’une erreur OOM Java Heap space dans une application Spark est un collect() appelé sur un Dataset un peu trop gros. C’est cette cause qui nous intéressera.
La méthode collect() sert à récupérer le contenu d’un DataFrame dans une Liste. C’est à dire que l’intégralité de votre jeu de données sera téléchargée et chargée en mémoire sur votre driver Spark. Si celui-ci fait 200Go, il vous faudra 200Go de mémoire vive pour que cela fonctionne. Sinon c’est l’OOM Java Heap space.
En introduction, je vous parlais de convertir un DataFrame Spark en Pandas. Étant donné qu’en Pandas nous manipulons également des DataFrames avec une API très similaire, nous avons tendance à penser que cela fonctionne de la même façon que Spark. Dans la réalité, la méthode toPandas() fait la même chose que la méthode collect() et l’objet Pandas DataFrame fonctionne comme un super dictionnaire Python.
Donc, si jamais le DataFrame que vous souhaitez convertir en Pandas fait 200Go et que vous n’avez pas 200Go de mémoire vive sur votre driver, c’est encore l’OOM.
La différence entre Spark et Pandas
Alors que Pandas est une simple bibliothèque de structure de données Python, Spark est une application distribuée qui va utiliser plusieurs workers pour traiter la donnée. Pour répartir cette donnée sur l’ensemble des workers, Spark va la découper en plusieurs morceaux en suivant 3 règles.
Fichiers de moins de 4mo
On ne le répétera jamais assez, Spark est conçu pour traiter des gros volumes de données (à partir de plusieurs dizaine de Go). Mais il peut arriver que lors de l’écriture d’une source de données partitionnée, certaines partitions soient très petites. Lorsque Spark doit lire des fichiers de moins de 4mo, il les regroupe pour former une seule partition en mémoire afin d’optimiser le traitement. Cette valeur est configurable, ce paramètre s’appelle spark.sql.files.openCostInBytes.
Fichiers de 4mo à 128mo
Pour tous les fichiers entre 4mo et 128mo, Spark créera une partition par fichier et par tâche qu’un worker peut traiter en parallèle.
Fichiers de plus de 128mo
Pour tous les fichiers de plus de 128mo, Spark découpera le fichier en blocs de 128mo. Cette valeur est configurable via le paramètre spark.sql.files.maxPartitionBytes. Vous l’aurez remarqué, cette valeur correspond à la taille des blocs par défaut dans HDFS. Cela a été fait exprès pour choisir par défaut d’optimiser les performances de Spark sur HDFS.
Traitement des partitions dans Spark
Maintenant que l’on sait comment Spark découpe la donnée en partitions, il faut comprendre comment il les traite. Pour cela, il faut comprendre la notion de tâche.
Dans Spark, un job est composé de stages. Les stages sont des successions de transformations sur la donnée qui commencent et terminent sur un shuffle.
Exemple :
Ici, nous avons 3 stages :
- De la lecture de notre fichier parquet à la méthode groupyBy() qui prépare un shuffle ;
- De la méthode count() qui crée un shuffle jusqu’au join() ;
- Du join() qui crée un shuffle jusqu’à l’écriture de notre fichier.
Si nous prenons notre premier stage, il contient 4 transformations :
- Une lecture de fichier ;
- Un filtre ;
- Une projection (select()) ;
- Un groupBy().
Ces 4 transformations mises bout à bout forment une tâche et cette tâche sera assignée à chaque partition de notre jeu de données. Pour accélérer le traitement, Spark distribue cette tâche sur chaque worker. En fonction des ressources disponible sur un worker et de sa configuration, celui-ci peut traiter plusieurs tâches en parallèle. Par défaut, un worker utilise 1 CPU par tâche. La mémoire vive est partagée entre toutes les tâches.
Spark et Pandas : ce qu’il faut retenir
Si on résume, dans un job Spark la donnée n’est pas chargée entièrement par les workers. Elle est découpée en partitions de taille variable et traitée morceau par morceau, parallélisée sur un grand nombre de machines. Cela permet de traiter indifféremment des fichiers de 1Go ou de 100To sans provoquer d’OOM à cause de la Java Heap.
Dans un job Pandas, qui au final n’est qu’un simple process Python, la donnée doit être entièrement chargée en mémoire sur une seule machine pour être traitée, ce qui crée des limitations sur les ressources de la machine en question.
Cela ne signifie pas pour autant que nous ne pouvons plus profiter de PyPlot via Pandas. Il va juste falloir être un peu malin et choisir le bon outil au bon moment. Si vous souhaitez afficher des graphiques via PyPlot, il vous faudra par exemple faire quelques agrégations en amont qui réduiront la taille de votre données.
Pour tout ce qui concerne les transformations à appliquer à la donnée, en passant par Spark tout ira très bien.
Après avoir appliqué vos agrégations et obtenu un DataFrame assez petit pour être chargée en mémoire par le driver, vous pouvez faire votre conversion en Pandas. Cette étape doit être la dernière avant d’appliquer votre Dataviz.