← Retourner à la liste des articles
Image blog
Auteur

Par Maxime Jumelle

CTO & Co-Founder

Publié le 7 févr. 2022

Catégorie Data Engineering

Spark Streaming : Bien démarrer

Spark est bien connu pour sa puissance concernant la parallélisation de calculs très volumineux. Outre son interaction facile en Python, il dispose également d'une API haut-niveau avec la définitions d'objets tels que les DataFrames, très puissant pour manipuler des tableaux de données.

Mais il y a également une composante très intéressante à explorer sous Spark : c'est Spark Streaming. Cette dernière, comme son nom l'indique, va nous permettre de manipuler des données non seulement en temps-réel, mais aussi en grande quantités. Et tout ça, c'est une des autres forces de Spark, qui le propulse comme un acteur incontournable de l'event processing.

Vue d'ensemble de Spark Streaming

Spark Streaming est une extension des fonctionnalités principales de Spark appliquée au traitement de données en temps réel. L'avantage de ce système est de pouvoir étendre l'utilisation de MapReduce sur des données réceptionnés par notre programme en provenance de diverses sources de données.

  • Des systèmes de diffusions en temps réel, comme Apache Kafka ou Google Pub/Sub.
  • Des fichiers alimentés en continu sur du HDFS ou du stockage de fichiers plats (Cloud Storage, AWS S3).
  • Des API tierces (réseaux sociaux, tracking utilisateur, etc).

De même, les sorties peuvent être très vastes en fonction du cas d'usage.

  • Des fichiers sur un système de stockage HDFS.
  • Des bases de données qui ingèrent des données toutes les secondes.
  • Des outils de Data Visualization et des reportings en temps réel.

En d'autres termes, avec Spark Streaming, nous n'allons plus construire des pipelines ETL qui traitent les données par batches (par paquets) mais des pipelines en temps réel qui vont directement traiter une données dès qu'elle arrive dans le pipeline.

Spark Streaming ne traite pas les données une par une, mais réalise des micro-batches.

Ce qu'il faut bien comprendre, c'est que Spark Streaming ne procède pas à de la diffusion de données. Autrement dit, Spark Streaming n'ira jamais traiter une donnée de manière individuelle. Il va en réalité effectuer des découpes temporelles à intervalle de temps régulier (toutes les secondes, toutes les 100 millisecondes) et considérer l'ensemble des données réceptionnées dans chaque intervalle de temps. Si par exemple, nous fixons la taille des fenêtres à une seconde, alors Spark Streaming traitera l'ensemble des données survenues au cours de la seconde qui caractérise la fenêtre temporelle.

Et automatiquement, il est probable qu'il y ait plus d'une observation/donnée par seconde, et dans ce cas, Spark Streaming récupérera l'ensemble de ces données. On appelle alors ce procédé du traitement par micro-batch, c'est-à-dire par petits paquets que l'on récupère d'une source qui diffuse des données en continu.


À lire aussi : découvrez notre formation Data Engineer


Cela est utile dans plusieurs situations, si par exemple on souhaite connaître le nombre d'événements à la seconde. Dans le même temps, cela évite aussi une saturation de la bande passante où le script Spark irait requêter de très nombreuses fois vers une source de données.

❓ Comment faire si l'on souhaite traiter chaque donnée de manière indivuelle ?

Dans ce cas, ce n'est plus Spark Streaming qui est utile mais des systèmes d'event streaming comme Apache Kafka (avec l'API Consumer). Ce qu'il faut garder à l'esprit, c'est que l'event processing a pour rôle de traiter plusieurs données qui arrivent en continu comme une source de données, et non en tant que telle.

Discretized Streams (DStreams)

Tout comme nous retrouvons les DataFrames pour Spark SQL, Spark Streaming dispose lui aussi d'une abstraction pour gérer ces micro-batches : ce sont les DStreams. Un DStream est une collection de plusieurs RDD, chaque RDD correspondant aux données réceptionnées à chaque intervalle de temps. Ainsi, le n-ème RDD contiendra toutes les données réceptionnées entre le temps n−1 et le temps n (car Spark récolte tous les événements en un RDD à la fin de chaque intervalle).

Ainsi, chaque opération effectuée sur un DStream est répliqué sur chaque RDD de ce dernier. Nous définissons les opérations à effectuer sur le DStream (un comptage par exemple), et ces opérations seront ensuite effectués sur chaque RDD du DStream, au fur et à mesure qu'ils sont construits.

Prenons par exemple le cas suivant où nous avons du texte qui arrive en continu.

Dans le DStream, chaque RDD contient tous les mots réceptionnés (depuis une API ou dans un fichier) sur une durée d'une seconde. L'objectif ici est de dénombrer le nombre d'occurrences de chaque mot.

Dans un premier temps, nous réalisons un flatMap pour d'une part, séparer chaque mot de la phrase et d'autre part, attribuer la valeur 1 puisqu'initialement, nous parcourons chaque mot une fois.

Et ensuite, nous appliquons un reduceByKey pour regrouper toutes les clés identiques (ici les mots) et ajouter leurs valeurs respectives. Nous obtenons à la fin un DStream, toujours formé par des RDD, mais qui est la résultante des opérations que nous avons défini.

Avec cette stratégie, nous pouvons voir par seconde quelles sont les mots les plus fréquents.

Comptage de mots

Essayons de mettre en place notre comptage de mots avec Spark Streaming. Rappelons-nous que pour effectuer ce calcul, nous devons appliquer l'opération flatMap.

Commençons par charger Spark.

import os
import time
import findspark

# Spécifie le chemin où est stocké Spark
os.environ["SPARK_HOME"] = "/opt/spark-3.0.1-bin-hadoop2.7"

findspark.init()  # Trouve les exécutables dans le dossier SPARK_HOME

from pyspark import SparkContext

sc = SparkContext(master="local[*]")  # Crée un SparkContext local

Pour pouvoir utiliser les objets et les fonctionnalités de streaming, nous allons instancier un StreamingContext. Deux arguments doivent être renseignés.

  • Le SparkContext sur lequel le StreamingContext va se baser.
  • La taille de la fenêtre temporelle (par défaut exprimée en secondes).

À lire aussi : découvrez notre formation Data Engineer


from pyspark.streaming import StreamingContext

# Céation d'un contexte SparkStreaming avec un intervalle d'une seconde
ssc = StreamingContext(sc, 1)

Nous définissons ici des micro-batch d'une seconde. Nous pouvons également spécifier une durée plus petite, exprimée en millisecondes à partir de l'objet Duration de Spark.

En revanche, une particularité importante est qu'un StreamingContext a une durée de vie définie par le démarrage (start) et par le stop (qui peut être manuel via la fonction stop ou automatiquement à la fin du programme avec awaitTermination). Dans tous les cas, le SparkContext est détruit lorsque le StreamingContext est stoppé. Nous reviendrons dessus très bientôt.

Nous allons ensuite pouvoir créer un DStream à partir d'une source de données. Nativement, il existe plusieurs sources possibles.

  • Les sources basiques, depuis un système de fichier ou à partir d'une connexion TCP.
  • Les sources avancées, depuis Apache Kafka, AWS Kinesis ou encore des API tierces.

Ces sources avancées nécessitent des dépendances supplémentaires, tout comme l'utilisation de BigQuery ou de bases de données avec Spark nécessitait d'utiliser des fichiers JAR contenant les dépendances manquantes.

Dans notre exemple, nous allons artificiellement créer un serveur plus loin qui va générer des phrases sur le port 9999 de notre environnement. Nous instancions donc un socketTextStream sur ce même port.

lines = ssc.socketTextStream("localhost", 9999)

Et enfin, nous traitons le DStream lines comme un RDD (bien qu'il n'y ait pas exactement les mêmes fonctions) en enchaînant les opérations.

# On réucpère les mots en séparant par les espaces
words = lines.flatMap(lambda line: line.split(" ")).filter(lambda x: len(x) > 0)
# On calcule les couples (mot, 1)
pairs = words.map(lambda word: (word, 1))
# On ajoute les valeurs pour les mots identiques
counts = pairs.reduceByKey(lambda x, y: x + y)

counts.pprint()

Avec la fonction start_streaming, nous démarrons le StreamingContext, attendons 5 secondes et le stoppons.

La fonction start démarre le StreamingContext dans un autre thread : cela n'est donc pas bloquant pour la suite de l'exécution de notre programme.

En supposant qu'un autre programme envoie régulièrement des mots (au format binaire) sur le port 9999 en local, nous obtiendrons la sortie suivante.

---
Time: 2022-02-07 08:04:27
---
('cat', 8)
('dog', 7)
('horse', 6)

---
Time: 2022-02-07 08:04:28
---
('dog', 5)
('horse', 4)

---
Time: 2022-02-07 08:04:29
---
('cat', 7)
('horse', 3)

---
Time: 2022-02-07 08:04:30
---
('cat', 3)
('horse', 7)
('dog', 5)

---
Time: 2022-02-07 08:04:31
---
('cat', 2)
('dog', 11)
('horse', 6)

Parfait ! Cela fonctionne comme prévu.

Pour bien comprendre, essayons de décomposer chaque transformation pour chaque RDD. Pour cela, nous prenons qu'une seule seconde d'attente pour n'obtenir d'un seul RDD.

def inspect_transforms():
    sc = SparkContext(master="local[*]")  # Crée un SparkContext local
    ssc = StreamingContext(sc, 1)

    lines = ssc.socketTextStream("127.0.0.1", 9999)
    words = lines.flatMap(lambda line: line.split(" ")).filter(lambda x: len(x) > 0)
    pairs = words.map(lambda word: (word, 1))
    counts = pairs.reduceByKey(lambda x, y: x + y)

    lines.pprint()
    pairs.pprint()
    counts.pprint()

    ssc.start()
    time.sleep(1)  # On laisse qu'une seconde cette fois-ci
    ssc.stop()

inspect_transforms()
---
Time: 2022-02-07 08:04:57
---
cat dog cat dog dog horse horse cat dog
dog cat

---
Time: 2022-02-07 08:04:57
---
('cat', 1)
('dog', 1)
('cat', 1)
('dog', 1)
('dog', 1)
('horse', 1)
('horse', 1)
('cat', 1)
('dog', 1)
('dog', 1)
...

---
Time: 2022-02-07 08:04:57
---
('cat', 4)
('dog', 5)
('horse', 2)

Nous voyons premièrement la ou les phrases telles qu'elles ont été récupérées depuis la connexion TCP. Ensuite, le flatMap va découper chaque mot pour créer les couples clé/valeur, et enfin le reduceByKey réalise l'agrégation finale sur les clés (ici les mots). Il ne faut bien sûr pas hésiter à relancer la cellule au-dessus pour s'en convaincre.

Vous souhaitez vous former au Data Engineering ?

Articles similaires

Blog

7 févr. 2024

Data Engineering

Pendant de nombreuses années, le rôle des Data Engineers était de récupérer des données issues de différentes sources, systèmes de stockage et applications tierces et de les centraliser dans un Data Warehouse, dans le but de pouvoir obtenir une vision complète et organisée des données disponibles.

Maxime Jumelle

Maxime Jumelle

CTO & Co-Founder

Lire l'article

Blog

4 déc. 2023

Data Engineering

Pour de nombreuses entreprises, la mise en place et la maintenant de pipelines de données est une étape cruciale pour avoir à disposition d'une vue d'ensemble nette de toutes les données à disposition. Un des challenges quotidien pour les Data Analysts et Data Engineers consiste à s'assurer que ces pipelines de données puissent répondre aux besoins de toutes les équipes d'une entreprise.

Maxime Jumelle

Maxime Jumelle

CTO & Co-Founder

Lire l'article

Blog

14 nov. 2023

Data Engineering

Pour améliorer les opérations commerciales et maintenir la compétitivité, il est essentiel de gérer efficacement les données en entreprise. Cependant, la diversité des sources de données, leur complexité croissante et la façon dont elles sont stockées peuvent rapidement devenir un problème important.

Maxime Jumelle

Maxime Jumelle

CTO & Co-Founder

Lire l'article