
Mon premier événement sur l'IA : Build with Gemini
Retour sur ma première journée de conférences autour de l'IA au Grand Rex à Paris
Sommaire
•
Qu'est ce que le format de fichier Delta Lake ?
•
Les bénéfices d'utiliser Delta Lake
•
Installation et configuration d'Apache Spark pour utiliser Delta Lake
•
Enregistrement de la table en delta
•
Mise à jour de la table
•
Revenir à une version précédente
•
Conclusion : optimiser vos coûts de stockage tout en ayant le principe ACID des bases de données
•
Références
Initié par les créateurs du moteur Apache Spark, et également de la solution SaaS Databricks, ce format est une surcouche au format parquet. Il apporte le concept ACID (Atomicité, Cohérence, Isolation et Durabilité) sur les fichiers parquet dans du stockage de type objet (tel que Google Cloud Storage, AWS S3). Ansi, nous pouvons bénéficier d'un stockage à très bas coût et les bénéfices d'une table dans une base de données (en particulier la notion ACID).
Comme vu précédemment, il y a la notion de transaction ACID, à cela s'ajoute les avantages suivants :
Le format Delta Lake se veut être les fondations d'une architecture de type Lakehouse. L'industrie de la data évolue vers cette architecture afin de réduire drastriquement les coûts, et cela permet également de réduire la barrière entre les différents utilisateurs. Avec l'avènement de l'intelligence artificielle, les équipes Data Scientiest ont besoin d'accéder à de la données fraîche.
Reprenons le code de notre précédent article Démarrer avec Apache Spark étape par étape.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DateType
spark = SparkSession.builder.appName("Bike calculation").getOrCreate()
source_file = "source/244400404_comptages-velo-nantes-metropole.csv"
df = spark.read.format("csv").option("delimiter", ";").option("header", True).load(source_file)
df_clean = (
df.select(
col("Numéro de boucle").alias("loop_number"),
col("Libellé").alias("label"),
col("Total").cast(IntegerType()).alias("total"),
col("Date formatée").cast(DateType()).alias("date"),
col("Vacances").alias("holiday_name"),
)
.where(col("Probabilité de présence d'anomalies").isNull())
)
df_clean.write.format("parquet").partitionBy("date").save("datalake/count-bike-nantes.parquet")
Ainsi que le contenu du fichier source/244400404_comptages-velo-nantes-metropole.csv
Numéro de boucle;Libellé;Total;Probabilité de présence d'anomalies;Jour de la semaine;Boucle de comptage;Date formatée;Vacances 0674;Pont Haudaudine vers Sud;657;;2;0674 - Pont Haudaudine vers Sud;2021-03-16;Hors Vacances 0674;Pont Haudaudine vers Sud;689;;4;0674 - Pont Haudaudine vers Sud;2021-03-18;Hors Vacances 0674;Pont Haudaudine vers Sud;589;;5;0674 - Pont Haudaudine vers Sud;2021-03-26;Hors Vacances
Nous allons donner à la session Spark la configuration nécessaire. D'une part, nous allons lui donner les dépendances, et d'autre part la configuration.
Télécharger les jars dans un dossier jars/
Ajoutons les jars dans un premier temps à la session Spark.
spark = ( SparkSession .builder .appName("Bike calculation") .config("spark.jars", "jars/delta-spark_2.12-3.2.0.jar,jars/delta-storage-3.2.0.jar") .getOrCreate() )
Ensuite, ajoutons la configuration pour pouvoir utiliser le format Delta Lake.
spark = ( SparkSession .builder .appName("Bike calculation") .config("spark.jars", "jars/delta-spark_2.12-3.2.0.jar,jars/delta-storage-3.2.0.jar") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate() )
Votre session Spark est prête pour utiliser le format Delta Lake.
Lors de l'écriture de la table dans le dossier datalake/
, il faut changer le format de fichier pour delta.
df_clean.write.format("delta").partitionBy("date").save("datalake/count-bike-nantes")
Voilà, votre table est maintenant enregistrée au format delta.
Note
Il y a eu une mise à jour de la source de données. Il faut donc les intégrer. Pour cela, nous allons utiliser la fonction merge()
de la lib Python Delta Lake.
Cette fonction va automatiquement faire la mise à jour de la table en fonction des conditions. Si la ligne est nouvelle dans la source, alors elle sera ajoutée. Si elle existe déjà et qu'elle a changé, alors la ligne dans la table des destinations elle sera mise à jour.
Voyons en détail son utilisation.
Pour cela, installer le package Python delta-spark
.
Attention à prendre la version correspondante. Se référer à la matrice de compatibilité https://docs.delta.io/latest/releases.html#compatibility-with-apache-spark.
Dans notre cas, nous avons besoin de la version 3.2.0 car nous utilisons Spark 3.2.0.
pip install delta-spark==3.2.0
Note
Nous effectuons toujours notre calcul avec notre nouveau fichier source. Ensuite, nous avons besoin de lire notre table de destination. Généralement, cette table est qualifiée de Gold (Or) car c'est une table avec des données agrégées et à forte valeur.
Voici le nouveau fichier à ingérer avec les nouvelles données. Il y a une mise à jour et une nouvelle ligne.
Numéro de boucle;Libellé;Total;Probabilité de présence d'anomalies;Jour de la semaine;Boucle de comptage;Date formatée;Vacances 0674;Pont Haudaudine vers Sud;1890;;5;0674 - Pont Haudaudine vers Sud;2021-03-26;Hors Vacances 0674;Pont Haudaudine vers Sud;689;;4;0674 - Pont Haudaudine vers Sud;2021-03-27;Hors Vacances
Mettons à jour la variable source_file
pour lire le nouveau fichier. Ajoutons le code pour lire la table Gold en Delta Lake.
source_file = "source/nouveau_comptages-velo-nantes-metropole.csv" (...) # Lecture de la table Gold from delta import DeltaTable delta_table = DeltaTable.forPath(spark, "datalake/count-bike-nantes") # Commenter cette dernière ligne # df_clean.write.format("delta").partitionBy("date").save("datalake/count-bike-nantes")
Appliquons la fonction merge()
pour fusionner les deux DataFrame.
(
delta_table
.alias("gold_table")
.merge(
df_clean.alias("fresh_data"),
condition="fresh_data.loop_number = gold_table.loop_number and fresh_data.date = gold_table.date"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
La fonction merge()
prend en entrée un DataFrame avec lequel faire la comparaison. Ensuite, nous avons une condition de correspondance entre les deux DataFrame. Avec fresh_data.loop_number = gold_table.loop_number and fresh_data.date = gold_table.date
, la comparaison entre les deux DataFrame est effectué sur la colonne date
et loop_number
. Il faut que la condition de correspondance soit discriminante afin de ne faire ressortir qu'une ligne : c'est le cas pour les deux colonnes que nous avons sélectionné.
Ensuite, des conditions de merge sont appliqués :
whenMatchedUpdateAll()
, s'il existe une correspondance entre les deux DataFrame sur ces clefs, alors la ligne dans le DataFrame de destination (la gold) est mise à jour.whenNotMatchedInsertAll()
, s'il n'existe pas de correspondance entre les deux DataFrame sur ces clefs, alors la ligne dans le DataFrame de destination (la gold) est ajoutée.Enfin, la fonction execute()
va appliquer les modifications.
Avant l'exécution, nous avons ces données :
+-----------+--------------------+-----+----------+-------------+ |loop_number| label|total| date| holiday_name| +-----------+--------------------+-----+----------+-------------+ | 0674|Pont Haudaudine v...| 689|2021-03-18|Hors Vacances| | 0674|Pont Haudaudine v...| 589|2021-03-26|Hors Vacances| | 0674|Pont Haudaudine v...| 657|2021-03-16|Hors Vacances| +-----------+--------------------+-----+----------+-------------+
Après l'exécution, la table est à jour :
+-----------+--------------------+-----+----------+-------------+ |loop_number| label|total| date| holiday_name| +-----------+--------------------+-----+----------+-------------+ | 0674|Pont Haudaudine v...| 689|2021-03-18|Hors Vacances| | 0674|Pont Haudaudine v...| 657|2021-03-16|Hors Vacances| | 0674|Pont Haudaudine v...| 1890|2021-03-26|Hors Vacances| | 0674|Pont Haudaudine v...| 689|2021-03-27|Hors Vacances| +-----------+--------------------+-----+----------+-------------+
En quelques lignes, votre table sera facilement mise à jour. En fonction de votre besoin métier, ajustez les conditons de merge.
En plus de respecter les principes ACID, le format Delta Lake a d'autres fonctionnalités tel que le retour en arrière.
Ainsi, si je souhaite annuler une opération, je peux revenir à une version précédente de la table.
Dans un nouveau script, rollback.py
, ajouter le code suivant qui permet de lister les différentes version de la table.
from delta import DeltaTable from pyspark.sql import SparkSession spark = ( SparkSession.builder.appName("Bike calculation") .config( "spark.jars", "jars/delta-spark_2.12-3.2.0.jar,jars/delta-storage-3.2.0.jar" ) .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config( "spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog", ) .getOrCreate() ) delta_table = DeltaTable.forPath(spark, "datalake/count-bike-nantes") delta_table.history().show()
Nous avons bien deux versions.
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+ |version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend| operationMetrics|userMetadata| engineInfo| +-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+ | 1|2025-01-30 14:43:...| NULL| NULL| MERGE|{predicate -> ["(...|NULL| NULL| NULL| 0| Serializable| false|{numTargetRowsCop...| NULL|Apache-Spark/3.5....| | 0|2025-01-30 14:42:...| NULL| NULL| WRITE|{mode -> ErrorIfE...|NULL| NULL| NULL| NULL| Serializable| true|{numFiles -> 3, n...| NULL|Apache-Spark/3.5....| +-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
Pour revenir à la version zéro, j'applique la fonction restoreToVersion()
avec le numéro de version souhaité.
delta_table.restoreToVersion(0)
En consultant de nouveau l'historique de la table, il y a une entrée de type RESTORE. Ainsi, si je lis de nouveau la table, je serais bien à la version avec les données du début de l'article.
Historique de la table :
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+ |version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend| operationMetrics|userMetadata| engineInfo| +-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+ | 2|2025-01-30 14:44:...| NULL| NULL| RESTORE|{version -> 0, ti...|NULL| NULL| NULL| 1| Serializable| false|{numRestoredFiles...| NULL|Apache-Spark/3.5....| | 1|2025-01-30 14:43:...| NULL| NULL| MERGE|{predicate -> ["(...|NULL| NULL| NULL| 0| Serializable| false|{numTargetRowsCop...| NULL|Apache-Spark/3.5....| | 0|2025-01-30 14:42:...| NULL| NULL| WRITE|{mode -> ErrorIfE...|NULL| NULL| NULL| NULL| Serializable| true|{numFiles -> 3, n...| NULL|Apache-Spark/3.5....| +-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
Contenu de la table :
+-----------+--------------------+-----+----------+-------------+ |loop_number| label|total| date| holiday_name| +-----------+--------------------+-----+----------+-------------+ | 0674|Pont Haudaudine v...| 657|2021-03-16|Hors Vacances| | 0674|Pont Haudaudine v...| 589|2021-03-26|Hors Vacances| | 0674|Pont Haudaudine v...| 689|2021-03-18|Hors Vacances| +-----------+--------------------+-----+----------+-------------+
La gestion des versions dans Delta Lake permet de facilement retourner en arrière en cas d'erreur.
A travers cet article, nous avons découvert un format de fichier qui permet de stocker des données comme dans une base de données traditionnelle. Elle facilite la manipulation et la mise à jour des tables grâce à la fonction de merge()
. Les fonctionnalités d'historique et de retour en arrière offre une sécurité pour supplémentaire en cas d'erreur de manipulation. Il est alors aisé de faire un retour en arrière.
Alors n'hésitez plus un instant et utilisez Delta Lake.
Code complet
# main.py from pyspark.sql import SparkSession from pyspark.sql.functions import col from pyspark.sql.types import IntegerType, DateType spark = ( SparkSession.builder.appName("Bike calculation") .config( "spark.jars", "jars/delta-spark_2.12-3.2.0.jar,jars/delta-storage-3.2.0.jar" ) .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config( "spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog", ) .getOrCreate() ) source_file = "source/nouveau_comptages-velo-nantes-metropole.csv" df = ( spark.read.format("csv") .option("delimiter", ";") .option("header", True) .load(source_file) ) df_clean = df.select( col("Numéro de boucle").alias("loop_number"), col("Libellé").alias("label"), col("Total").cast(IntegerType()).alias("total"), col("Date formatée").cast(DateType()).alias("date"), col("Vacances").alias("holiday_name"), ).where(col("Probabilité de présence d'anomalies").isNull()) from delta import DeltaTable delta_table = DeltaTable.forPath(spark, "datalake/count-bike-nantes") ( delta_table.alias("gold_table") .merge( df_clean.alias("fresh_data"), condition="fresh_data.loop_number = gold_table.loop_number and fresh_data.date = gold_table.date", ) .whenMatchedUpdateAll() .whenNotMatchedInsertAll() .execute() ) # df_clean.write.format("delta").partitionBy("date").save("datalake/count-bike-nantes")
# rollback.py from delta import DeltaTable from pyspark.sql import SparkSession spark = ( SparkSession.builder.appName("Bike calculation") .config( "spark.jars", "jars/delta-spark_2.12-3.2.0.jar,jars/delta-storage-3.2.0.jar" ) .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config( "spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog", ) .getOrCreate() ) delta_table = DeltaTable.forPath(spark, "datalake/count-bike-nantes") delta_table.history().show() delta_table.restoreToVersion(0)
Auteur(s)
Thierry T.
Super Data Boy
Vous souhaitez en savoir plus sur le sujet ?
Organisons un échange !
Notre équipe d'experts répond à toutes vos questions.
Nous contacterDécouvrez nos autres contenus dans le même thème
Retour sur ma première journée de conférences autour de l'IA au Grand Rex à Paris
Il arrive qu'une fonction ou action ne puisse pas être réalisée à un instant donné. Cela peut être dû à plusieurs facteurs qui ne sont pas maîtrisés. Il est alors possible d'effectuer une nouvelle tentative plus tard. Dans cet article, voyons comment le faire.
Le formatage du code est une source de querelle entre les membres d'une équipe. Résolvons-le une bonne fois pour toute avec le formateur de code Black.