Par Maxime Jumelle
CTO & Co-Founder
Publié le 7 févr. 2022
Catégorie Data Engineering
Spark est bien connu pour sa puissance concernant la parallélisation de calculs très volumineux. Outre son interaction facile en Python, il dispose également d'une API haut-niveau avec la définitions d'objets tels que les DataFrames, très puissant pour manipuler des tableaux de données.
Mais il y a également une composante très intéressante à explorer sous Spark : c'est Spark Streaming. Cette dernière, comme son nom l'indique, va nous permettre de manipuler des données non seulement en temps-réel, mais aussi en grande quantités. Et tout ça, c'est une des autres forces de Spark, qui le propulse comme un acteur incontournable de l'event processing.
Spark Streaming est une extension des fonctionnalités principales de Spark appliquée au traitement de données en temps réel. L'avantage de ce système est de pouvoir étendre l'utilisation de MapReduce sur des données réceptionnés par notre programme en provenance de diverses sources de données.
De même, les sorties peuvent être très vastes en fonction du cas d'usage.
En d'autres termes, avec Spark Streaming, nous n'allons plus construire des pipelines ETL qui traitent les données par batches (par paquets) mais des pipelines en temps réel qui vont directement traiter une données dès qu'elle arrive dans le pipeline.
Spark Streaming ne traite pas les données une par une, mais réalise des micro-batches.
Ce qu'il faut bien comprendre, c'est que Spark Streaming ne procède pas à de la diffusion de données. Autrement dit, Spark Streaming n'ira jamais traiter une donnée de manière individuelle. Il va en réalité effectuer des découpes temporelles à intervalle de temps régulier (toutes les secondes, toutes les 100 millisecondes) et considérer l'ensemble des données réceptionnées dans chaque intervalle de temps. Si par exemple, nous fixons la taille des fenêtres à une seconde, alors Spark Streaming traitera l'ensemble des données survenues au cours de la seconde qui caractérise la fenêtre temporelle.
Et automatiquement, il est probable qu'il y ait plus d'une observation/donnée par seconde, et dans ce cas, Spark Streaming récupérera l'ensemble de ces données. On appelle alors ce procédé du traitement par micro-batch, c'est-à-dire par petits paquets que l'on récupère d'une source qui diffuse des données en continu.
À lire aussi : découvrez notre formation Data Engineer
Cela est utile dans plusieurs situations, si par exemple on souhaite connaître le nombre d'événements à la seconde. Dans le même temps, cela évite aussi une saturation de la bande passante où le script Spark irait requêter de très nombreuses fois vers une source de données.
❓ Comment faire si l'on souhaite traiter chaque donnée de manière indivuelle ?
Dans ce cas, ce n'est plus Spark Streaming qui est utile mais des systèmes d'event streaming comme Apache Kafka (avec l'API Consumer). Ce qu'il faut garder à l'esprit, c'est que l'event processing a pour rôle de traiter plusieurs données qui arrivent en continu comme une source de données, et non en tant que telle.
Tout comme nous retrouvons les DataFrames pour Spark SQL, Spark Streaming dispose lui aussi d'une abstraction pour gérer ces micro-batches : ce sont les DStreams. Un DStream est une collection de plusieurs RDD, chaque RDD correspondant aux données réceptionnées à chaque intervalle de temps. Ainsi, le n-ème RDD contiendra toutes les données réceptionnées entre le temps n−1 et le temps n (car Spark récolte tous les événements en un RDD à la fin de chaque intervalle).
Ainsi, chaque opération effectuée sur un DStream est répliqué sur chaque RDD de ce dernier. Nous définissons les opérations à effectuer sur le DStream (un comptage par exemple), et ces opérations seront ensuite effectués sur chaque RDD du DStream, au fur et à mesure qu'ils sont construits.
Prenons par exemple le cas suivant où nous avons du texte qui arrive en continu.
Dans le DStream, chaque RDD contient tous les mots réceptionnés (depuis une API ou dans un fichier) sur une durée d'une seconde. L'objectif ici est de dénombrer le nombre d'occurrences de chaque mot.
Dans un premier temps, nous réalisons un flatMap
pour d'une part, séparer chaque mot de la phrase et d'autre part, attribuer la valeur 1 puisqu'initialement, nous parcourons chaque mot une fois.
Et ensuite, nous appliquons un reduceByKey
pour regrouper toutes les clés identiques (ici les mots) et ajouter leurs valeurs respectives. Nous obtenons à la fin un DStream, toujours formé par des RDD, mais qui est la résultante des opérations que nous avons défini.
Avec cette stratégie, nous pouvons voir par seconde quelles sont les mots les plus fréquents.
Essayons de mettre en place notre comptage de mots avec Spark Streaming. Rappelons-nous que pour effectuer ce calcul, nous devons appliquer l'opération flatMap
.
Commençons par charger Spark.
import os import time import findspark # Spécifie le chemin où est stocké Spark os.environ["SPARK_HOME"] = "/opt/spark-3.0.1-bin-hadoop2.7" findspark.init() # Trouve les exécutables dans le dossier SPARK_HOME from pyspark import SparkContext sc = SparkContext(master="local[*]") # Crée un SparkContext local
Pour pouvoir utiliser les objets et les fonctionnalités de streaming, nous allons instancier un StreamingContext
. Deux arguments doivent être renseignés.
SparkContext
sur lequel le StreamingContext
va se baser.À lire aussi : découvrez notre formation Data Engineer
from pyspark.streaming import StreamingContext # Céation d'un contexte SparkStreaming avec un intervalle d'une seconde ssc = StreamingContext(sc, 1)
Nous définissons ici des micro-batch d'une seconde. Nous pouvons également spécifier une durée plus petite, exprimée en millisecondes à partir de l'objet Duration
de Spark.
En revanche, une particularité importante est qu'un StreamingContext
a une durée de vie définie par le démarrage (start
) et par le stop (qui peut être manuel via la fonction stop
ou automatiquement à la fin du programme avec awaitTermination
). Dans tous les cas, le SparkContext
est détruit lorsque le StreamingContext
est stoppé. Nous reviendrons dessus très bientôt.
Nous allons ensuite pouvoir créer un DStream à partir d'une source de données. Nativement, il existe plusieurs sources possibles.
Ces sources avancées nécessitent des dépendances supplémentaires, tout comme l'utilisation de BigQuery ou de bases de données avec Spark nécessitait d'utiliser des fichiers JAR contenant les dépendances manquantes.
Dans notre exemple, nous allons artificiellement créer un serveur plus loin qui va générer des phrases sur le port 9999 de notre environnement. Nous instancions donc un socketTextStream
sur ce même port.
lines = ssc.socketTextStream("localhost", 9999)
Et enfin, nous traitons le DStream lines
comme un RDD (bien qu'il n'y ait pas exactement les mêmes fonctions) en enchaînant les opérations.
# On réucpère les mots en séparant par les espaces words = lines.flatMap(lambda line: line.split(" ")).filter(lambda x: len(x) > 0) # On calcule les couples (mot, 1) pairs = words.map(lambda word: (word, 1)) # On ajoute les valeurs pour les mots identiques counts = pairs.reduceByKey(lambda x, y: x + y) counts.pprint()
Avec la fonction start_streaming
, nous démarrons le StreamingContext
, attendons 5 secondes et le stoppons.
La fonction
start
démarre leStreamingContext
dans un autre thread : cela n'est donc pas bloquant pour la suite de l'exécution de notre programme.
En supposant qu'un autre programme envoie régulièrement des mots (au format binaire) sur le port 9999 en local, nous obtiendrons la sortie suivante.
--- Time: 2022-02-07 08:04:27 --- ('cat', 8) ('dog', 7) ('horse', 6) --- Time: 2022-02-07 08:04:28 --- ('dog', 5) ('horse', 4) --- Time: 2022-02-07 08:04:29 --- ('cat', 7) ('horse', 3) --- Time: 2022-02-07 08:04:30 --- ('cat', 3) ('horse', 7) ('dog', 5) --- Time: 2022-02-07 08:04:31 --- ('cat', 2) ('dog', 11) ('horse', 6)
Parfait ! Cela fonctionne comme prévu.
Pour bien comprendre, essayons de décomposer chaque transformation pour chaque RDD. Pour cela, nous prenons qu'une seule seconde d'attente pour n'obtenir d'un seul RDD.
def inspect_transforms(): sc = SparkContext(master="local[*]") # Crée un SparkContext local ssc = StreamingContext(sc, 1) lines = ssc.socketTextStream("127.0.0.1", 9999) words = lines.flatMap(lambda line: line.split(" ")).filter(lambda x: len(x) > 0) pairs = words.map(lambda word: (word, 1)) counts = pairs.reduceByKey(lambda x, y: x + y) lines.pprint() pairs.pprint() counts.pprint() ssc.start() time.sleep(1) # On laisse qu'une seconde cette fois-ci ssc.stop() inspect_transforms()
--- Time: 2022-02-07 08:04:57 --- cat dog cat dog dog horse horse cat dog dog cat --- Time: 2022-02-07 08:04:57 --- ('cat', 1) ('dog', 1) ('cat', 1) ('dog', 1) ('dog', 1) ('horse', 1) ('horse', 1) ('cat', 1) ('dog', 1) ('dog', 1) ... --- Time: 2022-02-07 08:04:57 --- ('cat', 4) ('dog', 5) ('horse', 2)
Nous voyons premièrement la ou les phrases telles qu'elles ont été récupérées depuis la connexion TCP. Ensuite, le flatMap
va découper chaque mot pour créer les couples clé/valeur, et enfin le reduceByKey
réalise l'agrégation finale sur les clés (ici les mots). Il ne faut bien sûr pas hésiter à relancer la cellule au-dessus pour s'en convaincre.
Vous souhaitez vous former au Data Engineering ?
Articles similaires
7 févr. 2024
Pendant de nombreuses années, le rôle des Data Engineers était de récupérer des données issues de différentes sources, systèmes de stockage et applications tierces et de les centraliser dans un Data Warehouse, dans le but de pouvoir obtenir une vision complète et organisée des données disponibles.
Maxime Jumelle
CTO & Co-Founder
Lire l'article
4 déc. 2023
Pour de nombreuses entreprises, la mise en place et la maintenant de pipelines de données est une étape cruciale pour avoir à disposition d'une vue d'ensemble nette de toutes les données à disposition. Un des challenges quotidien pour les Data Analysts et Data Engineers consiste à s'assurer que ces pipelines de données puissent répondre aux besoins de toutes les équipes d'une entreprise.
Maxime Jumelle
CTO & Co-Founder
Lire l'article
14 nov. 2023
Pour améliorer les opérations commerciales et maintenir la compétitivité, il est essentiel de gérer efficacement les données en entreprise. Cependant, la diversité des sources de données, leur complexité croissante et la façon dont elles sont stockées peuvent rapidement devenir un problème important.
Maxime Jumelle
CTO & Co-Founder
Lire l'article
60 rue François 1er
75008 Paris
Blent est une plateforme 100% en ligne pour se former aux métiers Tech & Data.
Organisme de formation n°11755985075.
Data Engineering
IA Générative
MLOps
Cloud & DevOps
À propos
Gestion des cookies
© 2024 Blent.ai | Tous droits réservés