Spark講座
この講座では、大規模データの分散処理フレームワークであるApache Sparkと、そのPythonインターフェースであるPySparkについて、実際にコードを実行しながら学びます。
AWSのETLサービスであるGlueは、内部的にApache Sparkを利用しています。本講座ではSparkの仕組みとPySparkによるデータ処理の方法を理解することで、Glueジョブの中で効率的なETL処理を実装できるようになることを目指します。
- Apache Sparkの概要とアーキテクチャ
- Docker環境でのPySpark実行環境の構築
- DataFrameの操作(select、filter、groupBy、joinなど)
- Spark SQLによるデータ操作
- CSVファイルの読み込みと書き出し
- データ変換の実践(型変換、null処理、重複除去)
1. 事前準備
この講座のハンズオンでは、以下のツールが必要です。まだ準備できていない場合は、リンク先の手順に沿って準備をお願いします。
| 💡 ポイント |
|---|
PySparkの実行にはJavaが必要ですが、本講座ではDockerを使うことで、Javaのインストールなしで環境を構築します。PySpark・Java・PythonがすべてプリインストールされたDockerイメージを利用するため、ローカル環境を汚すことなく、docker run コマンド一つで学習環境が整います。 |
2. Sparkの概要
2.1 Apache Sparkとは
Apache Sparkは、大規模データを高速に分散処理するためのオープンソースフレームワークです。従来のHadoop MapReduceがディスクベースの処理であったのに対し、Sparkはインメモリ処理(データをメモリ上に保持して処理する方式)を採用しており、大幅に高速な処理が可能です。
| 📝 Hadoop MapReduceとは |
|---|
| Hadoop MapReduceは、Sparkが登場する以前から使われていた大規模データの分散処理フレームワークです。処理の途中結果をディスク(ハードディスク)に書き出しながら進めるため、安定性は高い一方で、ディスクの読み書きがボトルネックとなり処理速度に限界がありました。Sparkはこの課題をインメモリ処理で解決し、Hadoop MapReduceに比べて最大100倍高速に処理できるとされています。 |
Sparkは複数のプログラミング言語に対応しており、Python(PySpark)、Scala、Java、Rで利用できます。本講座では、最も広く使われており、AWS Glueでも採用されているPySpark(PythonからSparkを操作するインターフェース)を中心に解説します。
2.2 Sparkのアーキテクチャ
SparkはDriver、Cluster Manager、Executorの3つのコンポーネントで構成されています。以下の図は、各コンポーネントの関係を示しています。
flowchart TB
A[Driver<br>実行計画の作成] -->|タスクの分散| B[Cluster Manager<br>リソースの割り当て]
B --> C[Executor 1<br>タスク実行]
B --> D[Executor 2<br>タスク実行]
B --> E[Executor 3<br>タスク実行]
それぞれのコンポーネントについて説明します。
ドライバ(Driver)
Driverは、Sparkアプリケーション全体の司令塔です。ユーザが書いたPySparkのコードを受け取り、どのような順序でどのようにデータを処理するかという実行計画を作成します。実行計画ができたら、処理を小さなタスクに分割し、Cluster Managerを通じて複数のExecutorに配布します。すべてのタスクの進捗管理や、最終結果の取りまとめもDriverが行います。
クラスタマネージャ(Cluster Manager)
Cluster Managerは、クラスタ全体のリソース管理を担当します。Driverからタスクの実行要求を受けると、各ExecutorにどれだけのメモリやCPUを割り当てるかを決定し、タスクを適切なExecutorに振り分けます。Executorの起動・停止もCluster Managerが管理します。
エグゼキュータ(Executor)
Executorは、実際のデータ処理を行うワーカプロセスです。Driverから割り当てられたタスクを受け取り、データの読み込み、フィルタリング、集計などの処理を実行します。各Executorはそれぞれ独立してメモリを持ち、担当するデータをインメモリで保持しながら処理を行います。複数のExecutorが並列に動くことで、大規模なデータでも高速に処理できる仕組みです。
Sparkアプリケーションが実行されると、Driverが処理全体の実行計画を作成し、Cluster Managerを通じて複数のExecutorにタスクを分散します。この一連の流れはSparkが自動的に行うため、ユーザはPySparkのコードを書くことに集中できます。
| 💡 ポイント |
|---|
| AWS Glueを使う場合、Cluster Managerの管理はGlueが自動で行うため、ユーザがクラスタの構築や管理を意識する必要はありません。Glueジョブで指定するDPU(Data Processing Unit)の数に応じて、Executorの数やメモリが自動的に割り当てられます。 |
2.3 なぜSparkを学ぶのか
「データ分析基盤を構築しよう」のハンズオンでは、AWS GlueのGUI(ビジュアルエディタ)を使ってETLジョブを作成しました。GUIでは、データソースの指定や出力先の設定をマウス操作だけで行えるため、Sparkを意識する必要はありませんでした。
しかし、実務ではGUIだけでは対応しきれないケースが出てきます。たとえば、複数のデータソースを結合して集計したり、条件に応じてデータを加工したり、パフォーマンスを考慮した出力形式を指定したりする場合には、PySparkでコードを直接書く必要があります。AWS GlueのETLジョブは内部的にApache Sparkで動作しているため、PySparkを学ぶことがGlueを使いこなすための鍵になります。
Sparkの仕組みとPySparkの操作方法を理解しておくことで、以下のメリットがあります。
- GUIでは実現できない複雑なデータ変換や集計処理をコードで柔軟に実装できるようになる
- ジョブの実行が遅い場合に、原因の特定とパフォーマンスの改善ができるようになる
- Sparkの知識はAWS Glueだけでなく、Amazon EMRやDatabricksなど他のプラットフォームでも活用できる
2.4 RDDとDataFrame
Sparkには主に2つのデータ構造があります。それぞれの特徴を理解したうえで、適切なデータ構造を選択することが重要です。
RDD(Resilient Distributed Dataset)
RDDは、Sparkの最も基本的なデータ構造です。名前の「Resilient(回復力のある)Distributed(分散された)Dataset(データの集合)」が示すとおり、データを複数のノードに分散して保持し、一部のノードに障害が発生しても自動的に復旧できる仕組みを持っています。
RDDは型の制約がない柔軟なデータ構造で、map、filter、reduce などの関数型APIを使って操作します。自由度が高い反面、Sparkの内部オプティマイザが効きにくく、パフォーマンスの最適化は開発者自身が行う必要があります。
DataFrame
DataFrameは、RDDの上に構築されたより高レベルなデータ構造です。データベースのテーブルのように、列名とデータ型を持つ構造化されたデータを扱います。select、filter、groupBy などのSQLライクなAPIで操作でき、Pythonのpandasライブラリに馴染みのある方にとっても直感的に使えます。
DataFrameの大きな利点は、Sparkの内部オプティマイザであるCatalystが実行計画を自動的に最適化してくれる点です。開発者が手動でチューニングしなくても、高いパフォーマンスが得られます。
RDDとDataFrameの比較
以下の表に、RDDとDataFrameの違いをまとめます。
| 比較項目 | RDD | DataFrame |
|---|---|---|
| データ構造 | 型なしの分散コレクション | 列名とデータ型を持つ構造化データ |
| 操作方法 | map、filter、reduceなどの関数型API | select、filter、groupByなどのSQLライクなAPI |
| パフォーマンス | 手動での最適化が必要 | Catalystオプティマイザによる自動最適化 |
| 主な用途 | 非構造化データ、細かい制御が必要な場合 | 構造化データの分析、ETL処理 |
| 💡 ポイント |
|---|
| 本講座では、実務で最も使われるDataFrame APIを中心に解説します。ETL処理やデータ分析のほとんどのユースケースはDataFrame APIで対応できます。RDDは現在でも内部的に使われていますが、通常の開発で直接操作する機会はほとんどありません。 |
3. PySpark実行環境の構築
PySparkをローカルで実行するにはJavaのインストールが必要ですが、Docker環境を使えばJavaのセットアップなしですぐに始められます。ここでは、PySpark入りのDockerコンテナを起動して、対話シェルでコードを実行する環境を構築します。
3.1 サンプルデータの準備
まず、この講座で使用するサンプルデータを用意します。任意の場所にpyspark-handsonフォルダを作成し、その中にCSVファイルを作成してください。
pyspark-handson/ ← このフォルダを作成
├── sales.csv ← このファイルを作成
└── products.csv ← このファイルを作成
sales.csv に以下の内容を記述して保存してください。注文データ(10件)のサンプルです。
order_id,customer_name,product_id,quantity,unit_price,order_date,region
ORD001,田中太郎,P001,1,120000,2025-01-05,東京
ORD002,鈴木花子,P002,2,3000,2025-01-05,大阪
ORD003,山田一郎,P003,3,2500,2025-01-06,東京
ORD004,佐藤美咲,P004,5,1500,2025-01-06,福岡
ORD005,高橋健太,P005,1,8000,2025-01-07,東京
ORD006,田中太郎,P002,1,3500,2025-01-07,東京
ORD007,鈴木花子,P004,2,4000,2025-01-08,大阪
ORD008,山田一郎,P001,1,35000,2025-01-08,東京
ORD009,佐藤美咲,P002,3,2800,2025-01-09,福岡
ORD010,高橋健太,P003,10,1200,2025-01-09,東京
products.csv に以下の内容を記述して保存してください。商品マスタ(5件)のサンプルです。
product_id,product_name,category
P001,ノートPC,電子機器
P002,プログラミング入門,書籍
P003,マウス,電子機器
P004,Tシャツ,衣類
P005,キーボード,電子機器
3.2 Dockerコンテナの起動
このハンズオンでは、Jupyterプロジェクトが提供するpyspark-notebookというDockerイメージを使用します。このイメージにはPython、Java、Apache Spark、PySparkがすべてプリインストールされているため、個別のセットアップなしでPySparkを実行できます。
| 📝 pyspark-notebookとは |
|---|
| pyspark-notebookは、Jupyter Docker Stacksプロジェクトが提供する公式Dockerイメージの一つです。本来はJupyter Notebook上でPySparkを実行するためのイメージですが、今回はNotebookは使わず、コンテナ内のbashシェルからPySparkの対話シェルを直接操作します。 |
コマンドプロンプト(Windows)またはターミナル(Mac)を開き、pyspark-handsonフォルダに移動します。
cd pyspark-handson
以下のコマンドでDockerコンテナを起動し、コンテナ内のbashにログインします。
docker run -it -v $(pwd):/work -w /work quay.io/jupyter/pyspark-notebook bash
初回実行時はローカルにイメージが存在しないため、レジストリ(quay.io)から自動的にダウンロードされます。イメージサイズが大きいため、ダウンロードに数分かかる場合があります。2回目以降はローカルのイメージが使われるため、すぐに起動します。
以下のようなプロンプトが表示されれば、コンテナ内にログインできています。
jovyan@xxxxxxxxxxxx:/work$
コマンドの各オプションの意味は以下のとおりです。
| オプション | 説明 |
|---|---|
-it |
コンテナ内で対話的に操作できるようにする |
-v $(pwd):/work |
現在のフォルダ(pyspark-handson)をコンテナ内の/workにマウントする |
-w /work |
コンテナ内の作業ディレクトリを/workに設定する |
quay.io/jupyter/pyspark-notebook |
PySpark がプリインストールされたDockerイメージ |
bash |
コンテナ内でbashシェルを起動する |
サンプルデータがマウントされていることを確認します。
ls
以下のように sales.csv と products.csv が表示されれば、ローカルのファイルがコンテナ内から参照できる状態です。
products.csv sales.csv
| ⚠️ Windowsのコマンドプロンプトを使用している場合 |
|---|
$(pwd) はMac/Linuxのコマンドです。Windowsのコマンドプロンプトでは $(pwd) の代わりに %cd% を、PowerShellでは ${PWD} を使用してください。 |
3.3 PySparkシェルの起動
コンテナ内で以下のコマンドを実行し、PySparkの対話シェルを起動します。
pyspark
以下のようなSparkのロゴとプロンプトが表示されれば、PySparkの準備は完了です。
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version x.x.x
/_/
Using Python version x.x.x
SparkSession available as 'spark'.
>>>
SparkSession available as 'spark'. と表示されているとおり、spark という変数にSparkSessionが自動的に設定されています。以降のコードでは、この spark をそのまま使用します。
| 📝 SparkSessionとは |
|---|
SparkSessionは、Sparkアプリケーションのエントリポイント(入り口)です。データの読み込みやDataFrameの作成など、すべてのSpark操作はSparkSessionを通じて行います。PySparkシェルでは自動的に作成されますが、AWS Glueでは GlueContext 経由で取得します。 |
4. CSVファイルの読み込み
4.1 サンプルデータの読み込み
まず、先ほど作成したsales.csvを読み込んでみましょう。PySparkシェルで以下のコードを実行してください。
df = spark.read.csv("sales.csv", header=True, inferSchema=True)
header=True は1行目をカラム名として使用するオプション、inferSchema=True はデータ型を自動推定するオプションです。
読み込んだデータの内容を確認します。
df.show()
以下のような実行結果が表示されます。
+--------+-------------+----------+--------+----------+----------+------+
|order_id|customer_name|product_id|quantity|unit_price|order_date|region|
+--------+-------------+----------+--------+----------+----------+------+
| ORD001| 田中太郎| P001| 1| 120000|2025-01-05| 東京|
| ORD002| 鈴木花子| P002| 2| 3000|2025-01-05| 大阪|
| ORD003| 山田一郎| P003| 3| 2500|2025-01-06| 東京|
| ORD004| 佐藤美咲| P004| 5| 1500|2025-01-06| 福岡|
| ORD005| 高橋健太| P005| 1| 8000|2025-01-07| 東京|
| ORD006| 田中太郎| P002| 1| 3500|2025-01-07| 東京|
| ORD007| 鈴木花子| P004| 2| 4000|2025-01-08| 大阪|
| ORD008| 山田一郎| P001| 1| 35000|2025-01-08| 東京|
| ORD009| 佐藤美咲| P002| 3| 2800|2025-01-09| 福岡|
| ORD010| 高橋健太| P003| 10| 1200|2025-01-09| 東京|
+--------+-------------+----------+--------+----------+----------+------+
4.2 データの確認
読み込んだDataFrameの構造や内容を確認するためのメソッドを紹介します。
カラム名とデータ型の確認(printSchema)
DataFrameのスキーマ(カラム名とデータ型の構造)を確認するには printSchema() を使用します。
df.printSchema()
以下のような実行結果が表示されます。
root
|-- order_id: string (nullable = true)
|-- customer_name: string (nullable = true)
|-- product_id: string (nullable = true)
|-- quantity: integer (nullable = true)
|-- unit_price: integer (nullable = true)
|-- order_date: string (nullable = true)
|-- region: string (nullable = true)
inferSchema=True により、quantity と unit_price が自動的に integer 型として推定されていることがわかります。文字列のカラムは string 型になっています。
その他にも、データの件数や基本統計量を確認するメソッドがあります。それぞれ実行してみましょう。
行数の取得(count)
count() でDataFrameの行数を取得できます。
df.count()
以下のような実行結果が表示されます。
10
CSVファイルのヘッダー行を除いた10件のデータが読み込まれていることがわかります。
カラム名の一覧(columns)
columns でDataFrameのカラム名をリストとして取得できます。
df.columns
以下のような実行結果が表示されます。
['order_id', 'customer_name', 'product_id', 'quantity', 'unit_price', 'order_date', 'region']
基本統計量の確認(describe)
describe() で数値カラムの基本統計量(件数、平均、標準偏差、最小値、最大値)を確認できます。
df.describe().show()
以下のような実行結果が表示されます。
+-------+--------+-------------+----------+--------+------------------+----------+------+
|summary|order_id|customer_name|product_id|quantity| unit_price|order_date|region|
+-------+--------+-------------+----------+--------+------------------+----------+------+
| count| 10| 10| 10| 10| 10| 10| 10|
| mean| null| null| null| 2.9| 20150.0| null| null|
| stddev| null| null| null| 2.77| 37528.8| null| null|
| min| ORD001| 佐藤美咲| P001| 1| 1200|2025-01-05| 大阪|
| max| ORD010| 高橋健太| P005| 10| 120000|2025-01-09| 福岡|
+-------+--------+-------------+----------+--------+------------------+----------+------+
数値カラム(quantity、unit_price)については平均や標準偏差が計算されていますが、文字列カラムについてはnullと表示されます。min・max は文字列カラムでも辞書順で算出されます。
| 📝 inferSchemaの注意点 |
|---|
inferSchema=True はデータ全体を一度読み込んで型を推定するため、大規模なデータでは処理時間が増加します。本番環境では schema オプションで明示的にスキーマを指定することを推奨します。 |
5. DataFrameの操作
ここからは、読み込んだ売上データ(df)を使って、DataFrameの基本的な操作を学びます。各コードはPySparkシェルでそのまま実行できます。
5.1 カラムの選択(select)
特定のカラムだけを取り出す場合は select() を使用します。
df.select("カラム名1", "カラム名2").show()
実際に試してみましょう。customer_name と unit_price カラムだけを取り出します。
df.select("customer_name", "unit_price").show()
以下のような実行結果が表示されます。
+-------------+----------+
|customer_name|unit_price|
+-------------+----------+
| 田中太郎| 120000|
| 鈴木花子| 3000|
| 山田一郎| 2500|
| 佐藤美咲| 1500|
| 高橋健太| 8000|
| 田中太郎| 3500|
| 鈴木花子| 4000|
| 山田一郎| 35000|
| 佐藤美咲| 2800|
| 高橋健太| 1200|
+-------------+----------+
5.2 フィルタリング(filter / where)
条件に合う行だけを抽出する場合は filter() または where() を使用します。どちらも同じ動作をします。
df.filter(col("カラム名") 条件式).show()
実際に試してみましょう。まず col 関数をインポートします。col() はカラムを指定するための関数で、以降のセクションでも頻繁に使用します。
from pyspark.sql.functions import col
filterによる抽出
単価が5,000円以上の注文を抽出します。
df.filter(col("unit_price") >= 5000).show()
以下のような実行結果が表示されます。
+--------+-------------+----------+--------+----------+----------+------+
|order_id|customer_name|product_id|quantity|unit_price|order_date|region|
+--------+-------------+----------+--------+----------+----------+------+
| ORD001| 田中太郎| P001| 1| 120000|2025-01-05| 東京|
| ORD005| 高橋健太| P005| 1| 8000|2025-01-07| 東京|
| ORD008| 山田一郎| P001| 1| 35000|2025-01-08| 東京|
+--------+-------------+----------+--------+----------+----------+------+
whereによる抽出
where は filter のエイリアス(別名)で、同じ動作をします。東京の注文を抽出してみましょう。
df.where(col("region") == "東京").show()
以下のような実行結果が表示されます。
+--------+-------------+----------+--------+----------+----------+------+
|order_id|customer_name|product_id|quantity|unit_price|order_date|region|
+--------+-------------+----------+--------+----------+----------+------+
| ORD001| 田中太郎| P001| 1| 120000|2025-01-05| 東京|
| ORD003| 山田一郎| P003| 3| 2500|2025-01-06| 東京|
| ORD005| 高橋健太| P005| 1| 8000|2025-01-07| 東京|
| ORD006| 田中太郎| P002| 1| 3500|2025-01-07| 東京|
| ORD008| 山田一郎| P001| 1| 35000|2025-01-08| 東京|
| ORD010| 高橋健太| P003| 10| 1200|2025-01-09| 東京|
+--------+-------------+----------+--------+----------+----------+------+
5.3 カラムの追加・変換(withColumn)
既存のDataFrameに新しいカラムを追加したり、既存カラムの値を変換する場合は withColumn() を使用します。
df.withColumn("新しいカラム名", 値や式)
実際に試してみましょう。注文金額(数量 × 単価)を計算するカラムを追加します。
df.withColumn("total_price", col("quantity") * col("unit_price")).show()
以下のような実行結果が表示されます。total_price カラムが追加されていることがわかります。
+--------+-------------+----------+--------+----------+----------+------+-----------+
|order_id|customer_name|product_id|quantity|unit_price|order_date|region|total_price|
+--------+-------------+----------+--------+----------+----------+------+-----------+
| ORD001| 田中太郎| P001| 1| 120000|2025-01-05| 東京| 120000|
| ORD002| 鈴木花子| P002| 2| 3000|2025-01-05| 大阪| 6000|
| ORD003| 山田一郎| P003| 3| 2500|2025-01-06| 東京| 7500|
| ORD004| 佐藤美咲| P004| 5| 1500|2025-01-06| 福岡| 7500|
| ORD005| 高橋健太| P005| 1| 8000|2025-01-07| 東京| 8000|
| ORD006| 田中太郎| P002| 1| 3500|2025-01-07| 東京| 3500|
| ORD007| 鈴木花子| P004| 2| 4000|2025-01-08| 大阪| 8000|
| ORD008| 山田一郎| P001| 1| 35000|2025-01-08| 東京| 35000|
| ORD009| 佐藤美咲| P002| 3| 2800|2025-01-09| 福岡| 8400|
| ORD010| 高橋健太| P003| 10| 1200|2025-01-09| 東京| 12000|
+--------+-------------+----------+--------+----------+----------+------+-----------+
なお、カラム名を変更する場合は withColumnRenamed() を使用します。region カラムを area にリネームしてみましょう。
df.withColumnRenamed("region", "area").show()
以下のような実行結果が表示されます。region カラムが area に変わっていることがわかります。
+--------+-------------+----------+--------+----------+----------+----+
|order_id|customer_name|product_id|quantity|unit_price|order_date|area|
+--------+-------------+----------+--------+----------+----------+----+
| ORD001| 田中太郎| P001| 1| 120000|2025-01-05|東京|
| ORD002| 鈴木花子| P002| 2| 3000|2025-01-05|大阪|
| ORD003| 山田一郎| P003| 3| 2500|2025-01-06|東京|
| ORD004| 佐藤美咲| P004| 5| 1500|2025-01-06|福岡|
| ORD005| 高橋健太| P005| 1| 8000|2025-01-07|東京|
| ORD006| 田中太郎| P002| 1| 3500|2025-01-07|東京|
| ORD007| 鈴木花子| P004| 2| 4000|2025-01-08|大阪|
| ORD008| 山田一郎| P001| 1| 35000|2025-01-08|東京|
| ORD009| 佐藤美咲| P002| 3| 2800|2025-01-09|福岡|
| ORD010| 高橋健太| P003| 10| 1200|2025-01-09|東京|
+--------+-------------+----------+--------+----------+----------+----+
5.4 集計(groupBy / agg)
データをグループ化して集計する場合は groupBy() と agg() を組み合わせます。
df.groupBy("グループ化するカラム名").agg(
集計関数("カラム名").alias("別名")
)
実際に試してみましょう。まず集計に必要な関数をインポートします。
from pyspark.sql.functions import sum, avg, count
まず、注文金額カラムを追加したDataFrameを作成します。
df_with_total = df.withColumn("total_price", col("quantity") * col("unit_price"))
このDataFrameを使って、地域ごとの注文数・合計金額・平均単価を集計します。
df_with_total.groupBy("region").agg(
count("*").alias("order_count"),
sum("total_price").alias("total_sales"),
avg("unit_price").alias("avg_unit_price")
).show()
以下のような実行結果が表示されます。
+------+-----------+-----------+------------------+
|region|order_count|total_sales| avg_unit_price|
+------+-----------+-----------+------------------+
| 大阪| 2| 14000| 3500.0|
| 東京| 6| 186000|28366.666666666668|
| 福岡| 2| 15900| 2150.0|
+------+-----------+-----------+------------------+
東京が6件で合計186,000円と最も多いことがわかります。
5.5 並べ替え(orderBy)
データを特定のカラムで並べ替える場合は orderBy() を使用します。昇順はそのまま、降順は .desc() を付けます。
df.orderBy(col("カラム名").desc()).show()
実際に試してみましょう。単価の降順で並べ替えます。
df.orderBy(col("unit_price").desc()).show()
以下のような実行結果が表示されます。
+--------+-------------+----------+--------+----------+----------+------+
|order_id|customer_name|product_id|quantity|unit_price|order_date|region|
+--------+-------------+----------+--------+----------+----------+------+
| ORD001| 田中太郎| P001| 1| 120000|2025-01-05| 東京|
| ORD008| 山田一郎| P001| 1| 35000|2025-01-08| 東京|
| ORD005| 高橋健太| P005| 1| 8000|2025-01-07| 東京|
| ORD007| 鈴木花子| P004| 2| 4000|2025-01-08| 大阪|
| ORD006| 田中太郎| P002| 1| 3500|2025-01-07| 東京|
| ORD002| 鈴木花子| P002| 2| 3000|2025-01-05| 大阪|
| ORD009| 佐藤美咲| P002| 3| 2800|2025-01-09| 福岡|
| ORD003| 山田一郎| P003| 3| 2500|2025-01-06| 東京|
| ORD004| 佐藤美咲| P004| 5| 1500|2025-01-06| 福岡|
| ORD010| 高橋健太| P003| 10| 1200|2025-01-09| 東京|
+--------+-------------+----------+--------+----------+----------+------+
5.6 結合(join)
2つのDataFrameを共通のキーで結合する場合は join() を使用します。
df.join(結合先のDataFrame, on="結合キーのカラム名", how="結合の種類")
実際に試してみましょう。売上データに商品マスタ(products.csv)を結合して、商品名やカテゴリを付加します。
products_df = spark.read.csv("products.csv", header=True, inferSchema=True)
内容を確認します。
products_df.show()
以下のように商品マスタが読み込まれます。
+----------+------------------+--------+
|product_id| product_name|category|
+----------+------------------+--------+
| P001| ノートPC|電子機器|
| P002| プログラミング入門| 書籍|
| P003| マウス|電子機器|
| P004| Tシャツ| 衣類|
| P005| キーボード|電子機器|
+----------+------------------+--------+
この商品マスタを売上データと product_id で結合します。
joined_df = df.join(products_df, on="product_id", how="left")
結合結果を確認します。
joined_df.show()
以下のような実行結果が表示されます。売上データに product_name と category が追加されていることがわかります。
+----------+--------+-------------+--------+----------+----------+------+------------------+--------+
|product_id|order_id|customer_name|quantity|unit_price|order_date|region| product_name|category|
+----------+--------+-------------+--------+----------+----------+------+------------------+--------+
| P001| ORD001| 田中太郎| 1| 120000|2025-01-05| 東京| ノートPC|電子機器|
| P001| ORD008| 山田一郎| 1| 35000|2025-01-08| 東京| ノートPC|電子機器|
| P002| ORD002| 鈴木花子| 2| 3000|2025-01-05| 大阪| プログラミング入門| 書籍|
| P002| ORD006| 田中太郎| 1| 3500|2025-01-07| 東京| プログラミング入門| 書籍|
| P002| ORD009| 佐藤美咲| 3| 2800|2025-01-09| 福岡| プログラミング入門| 書籍|
| ....| ......| ........| ....| .....|..........| ....| ........| ......|
+----------+--------+-------------+--------+----------+----------+------+------------------+--------+
結合の種類には以下のようなものがあります。SQL講座で学んだ JOIN と対応付けると理解しやすいです。
| PySparkの指定 | 対応するSQL | 説明 |
|---|---|---|
| inner | INNER JOIN | 両方のDataFrameに存在するキーのみを返す |
| left | LEFT JOIN | 左側のDataFrameの全行を保持し、右側に一致がない場合はnullとなる |
| right | RIGHT JOIN | 右側のDataFrameの全行を保持し、左側に一致がない場合はnullとなる |
| outer | FULL OUTER JOIN | 両方のDataFrameの全行を保持し、一致がない側はnullとなる |
5.7 条件分岐(when / otherwise)
条件に応じて値を設定する場合は when() と otherwise() を使用します。SQLの CASE WHEN に相当する処理です。
df = df.withColumn("カラム名",
when(条件式1, 値1)
.when(条件式2, 値2)
.otherwise(デフォルト値)
)
実際に試してみましょう。まず when 関数をインポートします。
from pyspark.sql.functions import when
注文金額に応じてランクを設定します。
df_ranked = df.withColumn("total_price", col("quantity") * col("unit_price")) \
.withColumn("rank",
when(col("total_price") >= 50000, "プレミアム")
.when(col("total_price") >= 10000, "スタンダード")
.otherwise("ライト")
)
ランク列が追加されたか確認します。
df_ranked.select("order_id", "customer_name", "total_price", "rank").show()
以下のような実行結果が表示されます。
+--------+-------------+-----------+------------+
|order_id|customer_name|total_price| rank|
+--------+-------------+-----------+------------+
| ORD001| 田中太郎| 120000| プレミアム|
| ORD002| 鈴木花子| 6000| ライト|
| ORD003| 山田一郎| 7500| ライト|
| ORD004| 佐藤美咲| 7500| ライト|
| ORD005| 高橋健太| 8000| ライト|
| ORD006| 田中太郎| 3500| ライト|
| ORD007| 鈴木花子| 8000| ライト|
| ORD008| 山田一郎| 35000|スタンダード|
| ORD009| 佐藤美咲| 8400| ライト|
| ORD010| 高橋健太| 12000|スタンダード|
+--------+-------------+-----------+------------+
| 💡 ポイント |
|---|
DataFrameの操作は遅延評価(Lazy Evaluation)で実行されます。select や filter などの変換処理は、show() や count()、write などのアクションが呼ばれるまで実際には実行されません。これにより、Sparkは複数の変換処理をまとめて最適化してから実行できます。 |
5.8 Spark SQL
ここまで、select()、filter()、groupBy() などのDataFrame APIを使ってデータを操作してきました。PySparkにはもう一つの操作方法として、Spark SQLがあります。Spark SQLを使うと、DataFrameに対してSQLクエリを直接実行できます。SQLに慣れている方にとっては、DataFrame APIよりも直感的にデータ操作を記述できる場面があります。
Spark SQLはSQLの構文で記述しますが、内部的にはDataFrame APIと同じくSparkのCatalystオプティマイザによって最適化されます。つまり、Sparkのインメモリ分散処理や遅延評価といった恩恵はSpark SQLでもまったく同じように受けられます。通常のデータベースに対してSQLを実行するのとは異なり、大規模なデータに対しても高速に処理できるのがSpark SQLの強みです。
Spark SQLを使うには、まず createOrReplaceTempView() でDataFrameをSQLの一時テーブルとして登録し、その後 spark.sql() でSQLクエリを実行します。書式は以下のとおりです。
df.createOrReplaceTempView("テーブル名")
spark.sql("SELECT文").show()
実際に試してみましょう。先ほど結合した joined_df を一時テーブルとして登録します。
joined_df.createOrReplaceTempView("sales")
登録した sales テーブルに対して、SQLクエリを実行してみましょう。まず、全件を取得します。
spark.sql("SELECT * FROM sales LIMIT 5").show()
以下のような実行結果が表示されます。SQLの SELECT * FROM でDataFrameの内容をテーブルのように参照できていることがわかります。
+----------+--------+-------------+--------+----------+----------+------+------------------+--------+
|product_id|order_id|customer_name|quantity|unit_price|order_date|region| product_name|category|
+----------+--------+-------------+--------+----------+----------+------+------------------+--------+
| P001| ORD001| 田中太郎| 1| 120000|2025-01-05| 東京| ノートPC|電子機器|
| P001| ORD008| 山田一郎| 1| 35000|2025-01-08| 東京| ノートPC|電子機器|
| P002| ORD002| 鈴木花子| 2| 3000|2025-01-05| 大阪| プログラミング入門| 書籍|
| P002| ORD006| 田中太郎| 1| 3500|2025-01-07| 東京| プログラミング入門| 書籍|
| P002| ORD009| 佐藤美咲| 3| 2800|2025-01-09| 福岡| プログラミング入門| 書籍|
+----------+--------+-------------+--------+----------+----------+------+------------------+--------+
WHERE句による絞り込み
SQLの WHERE 句でデータを絞り込むことができます。東京の注文だけを抽出してみましょう。
spark.sql("SELECT order_id, customer_name, region FROM sales WHERE region = '東京'").show()
以下のような実行結果が表示されます。
+--------+-------------+------+
|order_id|customer_name|region|
+--------+-------------+------+
| ORD001| 田中太郎| 東京|
| ORD008| 山田一郎| 東京|
| ORD006| 田中太郎| 東京|
| ORD003| 山田一郎| 東京|
| ORD005| 高橋健太| 東京|
| ORD010| 高橋健太| 東京|
+--------+-------------+------+
これは、先ほど学んだ df.where(col("region") == "東京") と同じ結果です。
GROUP BYによる集計
SQLの GROUP BY で集計クエリも実行できます。カテゴリ別の注文数と売上合計を集計してみましょう。
spark.sql("""
SELECT
category,
COUNT(*) AS order_count,
SUM(quantity * unit_price) AS total_sales
FROM sales
GROUP BY category
ORDER BY total_sales DESC
""").show()
以下のような実行結果が表示されます。
+--------+-----------+-----------+
|category|order_count|total_sales|
+--------+-----------+-----------+
|電子機器| 5| 175500|
| 衣類| 2| 15500|
| 書籍| 3| 15300|
+--------+-----------+-----------+
これは、先ほど学んだ groupBy().agg() と同じ結果です。DataFrame APIとSpark SQLは用途に応じて使い分けることができます。
DataFrame APIとSpark SQLの使い分け
以下の表に、DataFrame APIとSpark SQLの違いをまとめます。
| 比較項目 | DataFrame API | Spark SQL |
|---|---|---|
| 記述方法 | メソッドチェーン(df.filter().groupBy().agg()) |
SQL文(SELECT ... FROM ... WHERE ...) |
| 適した場面 | 変換処理を段階的に組み立てる場合 | 複雑な集計や結合をSQLで直接書きたい場合 |
| 型チェック | コード実行前にある程度検出できる | SQLの構文エラーは実行時まで検出できない |
| 可読性 | Pythonに慣れている方にとって読みやすい | SQLに慣れている方にとって読みやすい |
| 💡 ポイント |
|---|
| 実務では、DataFrame APIとSpark SQLを混在させて使うことも一般的です。たとえば、DataFrame APIでデータの読み込みと前処理を行い、複雑な集計はSpark SQLで記述するといった使い分けがよく見られます。次の講座「PySparkでGlueジョブを作ろう」でも、この組み合わせパターンを使用します。 |
6. データの書き出し
ここまで、CSVファイルからデータを読み込み、フィルタリングや集計、結合といったさまざまな操作を行ってきました。しかし、処理した結果がメモリ上にしか存在しなければ、PySparkシェルを終了した時点で消えてしまいます。処理結果を永続的に残すには、ファイルとして書き出す必要があります。
PySparkでは df.write を使ってDataFrameの内容をファイルに書き出すことができます。
df.write.csv("出力先のフォルダパス", header=True, mode="書き出しモード")
write.csv() の各引数の意味は以下のとおりです。
- 第1引数: 出力先のフォルダパス。指定したフォルダが自動的に作成される
header:Trueにすると1行目にカラム名を出力するmode: 出力先に既存のデータがある場合の動作を指定する
実際に試してみましょう。書き出しを試すために、先ほどの結合データを使ってカテゴリ別の売上集計を作成します。
category_sales = joined_df.withColumn("total_price", col("quantity") * col("unit_price")) \
.groupBy("category").agg(
count("*").alias("order_count"),
sum("total_price").alias("total_sales")
)
作成した集計結果をCSVファイルとして書き出します。今回は出力先を output/category_sales、ヘッダーあり、既存データがあれば上書きする設定で実行します。
category_sales.write.csv("output/category_sales", header=True, mode="overwrite")
書き出しが完了したら、コンテナ内で出力を確認してみましょう。一度PySparkシェルを抜けて確認します。
exit()
まず、出力先フォルダの中身を確認します。
ls output/category_sales/
part-00000-xxxxx.csv のようなファイルが表示されます。Sparkは並列処理のため、複数のファイルに分割して出力します。
続いて、出力されたCSVファイルの中身を確認します。
cat output/category_sales/*.csv
以下のように、カテゴリ別の集計結果が書き出されていることが確認できます。
category,order_count,total_sales
電子機器,5,175500
書籍,3,15300
衣類,2,15500
再びPySparkシェルに戻ります。
pyspark
| 📝 書き出し時のファイル分割について |
|---|
Sparkは並列処理の仕組み上、データをパーティション(分割単位)ごとに個別のファイルとして出力します。出力ファイル数を制御したい場合は coalesce() を使って df.coalesce(1).write.csv(...) のように指定することで、1つのファイルにまとめて出力できます。ただし、大規模データの場合はメモリ不足になる可能性があるため、適切なファイル数を選択してください。 |
書き出し時のモードは以下のとおりです。
| モード | 説明 |
|---|---|
| overwrite | 既存データを上書きする |
| append | 既存データに追加する |
| ignore | 既存データがある場合は何もしない |
| error(デフォルト) | 既存データがある場合はエラーを発生させる |
7. データ変換の実践
ここまで学んだDataFrameの基本操作だけでも多くのことができますが、実際のデータは「日付が文字列のまま」「値が欠損している」「同じ顧客が複数回登場する」など、そのままでは分析に使えない状態であることがほとんどです。こうした「汚れたデータ」をきれいに整える処理こそがETLの「Transform(変換)」にあたり、データ分析の品質を左右する重要な工程です。
ここでは、実務のETL処理でよく使われるデータ変換のパターンを紹介します。引き続きPySparkシェルで実行してください。
7.1 データ型の変換
CSVファイルから読み込んだデータは、日付や金額のカラムが文字列型のまま取り込まれることがあります。文字列型のままでは、日付の比較や年・月の抽出、数値の計算などが正しく動作しません。データ型を適切に変換することで、これらの操作が可能になります。
日付型への変換(to_date)
文字列を日付型に変換するには to_date() を使用します。書式は以下のとおりです。
df.withColumn("カラム名", to_date(col("元のカラム名"), "日付フォーマット"))
第2引数の日付フォーマットには、元の文字列の形式を指定します。たとえば "2025-01-05" のような形式であれば "yyyy-MM-dd" と指定します。
日付型に変換したカラムからは、year() や month() で年・月を抽出できます。
df.withColumn("カラム名", year(col("日付カラム名")))
実際に試してみましょう。まず日付関連の関数をインポートし、売上データを読み込み直します。
from pyspark.sql.functions import col, to_date, year, month, dayofmonth
df = spark.read.csv("sales.csv", header=True, inferSchema=True)
文字列型の order_date を日付型に変換します。
df = df.withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd"))
変換した日付カラムから年を抽出します。
df = df.withColumn("year", year(col("order_date")))
同様に月を抽出します。
df = df.withColumn("month", month(col("order_date")))
日も抽出します。dayofmonth() を使用します。
df = df.withColumn("day", dayofmonth(col("order_date")))
結果を確認します。
df.select("order_id", "order_date", "year", "month", "day").show()
以下のような実行結果が表示されます。
+--------+----------+----+-----+---+
|order_id|order_date|year|month|day|
+--------+----------+----+-----+---+
| ORD001|2025-01-05|2025| 1| 5|
| ORD002|2025-01-05|2025| 1| 5|
| ORD003|2025-01-06|2025| 1| 6|
| ORD004|2025-01-06|2025| 1| 6|
| ORD005|2025-01-07|2025| 1| 7|
| ORD006|2025-01-07|2025| 1| 7|
| ORD007|2025-01-08|2025| 1| 8|
| ORD008|2025-01-08|2025| 1| 8|
| ORD009|2025-01-09|2025| 1| 9|
| ORD010|2025-01-09|2025| 1| 9|
+--------+----------+----+-----+---+
order_date が文字列から日付型に変換され、そこから year、month、day が正しく抽出されていることがわかります。変換に失敗した場合は order_date が null になるため、すべての行に日付が表示されていれば変換は成功です。
汎用的な型変換(cast)
日付以外のデータ型を変換するには cast() を使用します。書式は以下のとおりです。
df.withColumn("カラム名", col("カラム名").cast("変換先の型名"))
cast() には "int"(整数)、"double"(小数)、"string"(文字列)などの型名を文字列で指定します。たとえば、inferSchema で文字列型として読み込まれてしまった金額カラムを整数型に変換する場合は、以下のように記述します。
実際に試してみましょう。cast() の動作を確認するために、unit_price を一度文字列型に変換してから、整数型に戻してみます。まず文字列型に変換します。
df_str = df.withColumn("unit_price", col("unit_price").cast("string"))
printSchema() で型を確認します。
df_str.printSchema()
以下のように unit_price が string 型になっていることがわかります。
root
|-- order_id: string (nullable = true)
|-- customer_name: string (nullable = true)
|-- product_id: string (nullable = true)
|-- quantity: integer (nullable = true)
|-- unit_price: string (nullable = true)
|-- order_date: date (nullable = true)
|-- region: string (nullable = true)
|-- year: integer (nullable = true)
|-- month: integer (nullable = true)
|-- day: integer (nullable = true)
この文字列型の unit_price を整数型に戻します。
df_int = df_str.withColumn("unit_price", col("unit_price").cast("int"))
再度 printSchema() で確認します。
df_int.printSchema()
以下のように unit_price が integer 型に戻っていれば成功です。
root
|-- order_id: string (nullable = true)
|-- customer_name: string (nullable = true)
|-- product_id: string (nullable = true)
|-- quantity: integer (nullable = true)
|-- unit_price: integer (nullable = true)
|-- order_date: date (nullable = true)
|-- region: string (nullable = true)
|-- year: integer (nullable = true)
|-- month: integer (nullable = true)
|-- day: integer (nullable = true)
今回のサンプルデータでは inferSchema=True により数値カラムは自動的に整数型として推定されていますが、CSVの内容や設定によっては文字列型のまま取り込まれることがあるため、cast() による明示的な型変換は実務で頻繁に使用します。
7.2 null値の処理
実務のデータでは、値が入っていないレコード(null)が含まれていることがよくあります。nullが残ったまま集計すると、結果が不正確になったりエラーの原因になったりするため、事前に処理しておく必要があります。
null値を処理する方法は大きく2つあります。dropna() でnullを含む行を除去する方法と、fillna() でnullを別の値に置換する方法です。
nullを含む行の除去(dropna)
dropna() を使うと、nullを含む行をDataFrameから除去できます。書式は以下のとおりです。
df.dropna()
引数なしで実行すると、いずれかのカラムにnullが含まれる行がすべて除去されます。subset を指定すると、特定のカラムだけを対象にできます。
df.dropna(subset=["カラム名1", "カラム名2"])
今回のサンプルデータにはnullが含まれていないため、dropna() を実行しても結果は変わりません。ここでは書き方を確認しておきましょう。
nullを含む行をすべて除去する場合は以下のように記述します。
df_clean = df.dropna()
特定のカラムにnullがある行だけを除去することもできます。以下の例では order_id または unit_price がnullの行だけが除去されます。
df_clean = df.dropna(subset=["order_id", "unit_price"])
参考として、もし以下のようにnullを含むデータがあった場合の動作を示します。
+--------+-------------+----------+
|order_id|customer_name|unit_price|
+--------+-------------+----------+
| ORD001| 田中太郎| 120000|
| ORD002| null| 3000|
| ORD003| 山田一郎| null|
+--------+-------------+----------+
この状態で df.dropna() を実行すると、nullを含むORD002とORD003が除去され、ORD001の1件だけが残ります。
+--------+-------------+----------+
|order_id|customer_name|unit_price|
+--------+-------------+----------+
| ORD001| 田中太郎| 120000|
+--------+-------------+----------+
df.dropna(subset=["customer_name"]) とすれば、customer_name がnullのORD002だけが除去され、ORD001とORD003の2件が残ります。
+--------+-------------+----------+
|order_id|customer_name|unit_price|
+--------+-------------+----------+
| ORD001| 田中太郎| 120000|
| ORD003| 山田一郎| null|
+--------+-------------+----------+
nullの値の置換(fillna)
データを削除するのではなく、nullを別の値で埋めたい場合は fillna() を使用します。書式は以下のとおりです。辞書形式でカラムごとに置換する値を指定できます。
df.fillna({"カラム名1": 置換する値1, "カラム名2": 置換する値2})
こちらもサンプルデータにはnullがないため結果は変わりませんが、書き方を確認しておきましょう。以下の例では、unit_price がnullの場合は 0 に、region がnullの場合は "不明" に置換します。
df_filled = df.fillna({"unit_price": 0, "region": "不明"})
先ほどのnullを含むデータに対して df.fillna({"customer_name": "不明", "unit_price": 0}) を実行すると、以下のようにnullが指定した値に置き換わります。
+--------+-------------+----------+
|order_id|customer_name|unit_price|
+--------+-------------+----------+
| ORD001| 田中太郎| 120000|
| ORD002| 不明| 3000|
| ORD003| 山田一郎| 0|
+--------+-------------+----------+
dropna() と違いデータの件数が減らないため、集計結果への影響を最小限に抑えられます。
| 💡 ポイント |
|---|
dropna() はデータの件数が減るため、分析結果に影響を与える場合があります。nullの件数が少なければ除去しても問題ありませんが、nullが多い場合は fillna() で適切なデフォルト値に置換するほうが望ましいケースもあります。どちらを使うかはデータの性質と分析の目的に応じて判断してください。 |
7.3 重複の除去
複数のデータソースを結合した際や、同じデータが二重に登録されている場合など、重複するレコードを除去する必要があることがあります。
重複する行を除去する場合は dropDuplicates() を使用します。書式は以下のとおりです。引数に指定したカラムの値が重複する行が除去され、最初に出現した行だけが残ります。
df.dropDuplicates(["重複を判定するカラム名"])
実際に試してみましょう。customer_name の値が重複する行を除去し、各顧客の最初の注文だけを残します。
df.dropDuplicates(["customer_name"]).select("order_id", "customer_name").show()
以下のような実行結果が表示されます。元のデータでは田中太郎(ORD001, ORD006)、鈴木花子(ORD002, ORD007)、山田一郎(ORD003, ORD008)、佐藤美咲(ORD004, ORD009)、高橋健太(ORD005, ORD010)がそれぞれ2件ずつありましたが、各顧客から1行ずつだけ残り、5件に絞り込まれています。
+--------+-------------+
|order_id|customer_name|
+--------+-------------+
| ORD001| 田中太郎|
| ORD002| 鈴木花子|
| ORD003| 山田一郎|
| ORD004| 佐藤美咲|
| ORD005| 高橋健太|
+--------+-------------+
8. 環境のクリーンアップ
講座が完了したら、PySparkシェルとDockerコンテナを終了します。
PySparkシェルを終了します。
exit()
コンテナのbashを終了します。
exit
不要であれば、Dockerイメージを削除できます。
docker rmi quay.io/jupyter/pyspark-notebook
9. まとめ
この講座では、Apache SparkとPySparkによるデータ処理の基本について学びました。
- Apache Sparkはインメモリの分散処理フレームワークで、Driver・Executor・Cluster Managerの3つのコンポーネントで構成される
- Docker環境を使えば、JavaのインストールなしでPySparkの実行環境をすぐに構築できる
- DataFrameはSparkの主要なデータ構造で、
select、filter、groupBy、joinなどのSQLライクな操作でデータを処理できる - CSVファイルの読み込み・書き出しや、
withColumnによるカラム追加、when/otherwiseによる条件分岐など、実務で必要なデータ変換パターンを実行した - Spark SQLを使うと、DataFrameに対してSQLクエリを直接実行でき、DataFrame APIと使い分けることができる
- 遅延評価により、Sparkはアクション実行時に最適化された実行計画で処理を行う
- 本講座で学んだPySparkのコードは、AWS GlueのETLジョブでそのまま活用できる。次の講座「PySparkでGlueジョブを作ろう」では、実際にGlueジョブを作成してS3上のデータに対するETL処理を体験する