PySparkでGlueジョブを作ろう

このハンズオンでは、AWS Glueを使ってS3上のCSVデータをPySparkで変換し、Parquet形式で出力するETL処理について実際にハンズオン形式で手を動かしながら体験します。

  • S3へのサンプルデータのアップロード
  • Glueジョブの作成とPySparkスクリプトの実行
  • 複数CSVファイルの読み込みと結合
  • 個人情報のマスキング処理
  • Spark SQLによるデータ操作
  • Parquet形式での出力と確認

1. 事前準備

このハンズオンでは、以下のツールやアカウントが必要です。まだ準備できていない場合は、リンク先の手順に沿って準備をお願いします。

💡 ポイント
AWS Glueジョブの実行にはDPU(Data Processing Unit)に応じた課金が発生します。最小構成(2 DPU)で1回のジョブ実行にかかる費用は、実行時間にもよりますが数十円〜数百円程度です。ハンズオン終了後は必ずリソースを削除してください。

2. ハンズオンの概要

前回の講座では、Docker環境でPySparkの基本操作を学びました。このハンズオンでは、前回学んだPySparkのコードを実際にAWS Glue上で実行し、S3に格納されたCSVデータを変換・加工してParquet形式で出力するETLパイプラインを構築します。

今回取り扱うシナリオは以下のとおりです。ある企業の売上データが、東京支店と大阪支店から別々のCSVファイルとして提出されます。これらのデータを結合し、個人情報のマスキングを行ったうえで、分析しやすいParquet形式に変換します。

このハンズオンで構築するETL処理の流れを示します。

flowchart LR
    subgraph Extract
        A[S3<br>東京支店CSV]
        B[S3<br>大阪支店CSV]
    end

    subgraph Transform
        C[結合・マスキング<br>集計・変換]
    end

    subgraph Load
        D[S3<br>Parquet形式]
    end

    A --> C
    B --> C
    C --> D

3. サンプルデータの準備

3.1 S3バケットの作成

まずは、データを格納するためのS3バケットを作成します。

S3のダッシュボードから、バケットを作成をクリックします。

下記の内容を設定してください。

設定項目 設定の基準
バケット名 glue-etl-handson-{自分の名前}-{日付}(例: glue-etl-handson-yamada-20250120) バケット名はグローバルで一意である必要があるため、名前と日付を含める
AWSリージョン ap-northeast-1(東京) Glueジョブと同じリージョンにするため
その他の設定 デフォルトのまま ハンズオン用途のためデフォルト設定で問題ない

入力が完了したら「バケットを作成」をクリックします。

無事にS3バケットが作成されれば、ここまでの操作は完了です。

3.2 フォルダの作成

作成したバケットをクリックして開きます。バケット内にデータの用途ごとにフォルダを作成します。「フォルダの作成」をクリックし、以下の2つのフォルダを順番に作成してください。

フォルダ名 用途
input/ サンプルCSVデータの格納先
output/ Glueジョブの出力先

最終的に、以下のようなフォルダ構成になっていれば完了です。

3.3 サンプルデータのアップロード

続いて、Glueジョブで処理するサンプルデータを作成し、S3にアップロードします。以下の2つのCSVファイルをローカルに作成してください。

東京支店の売上データ

tokyo_sales.csv は東京支店の売上データ(5件)です。以下の内容でファイルを作成してください。

order_id,customer_name,email,phone,product_name,category,quantity,unit_price,order_date
T001,田中太郎,tanaka@example.com,090-1234-5678,ノートPC,電子機器,1,120000,2025-01-05
T002,鈴木花子,suzuki@example.com,080-2345-6789,プログラミング入門,書籍,2,3000,2025-01-06
T003,山田一郎,yamada@example.com,070-3456-7890,マウス,電子機器,3,2500,2025-01-07
T004,佐藤美咲,sato@example.com,090-4567-8901,AWS入門,書籍,1,3500,2025-01-08
T005,高橋健太,takahashi@example.com,080-5678-9012,キーボード,電子機器,2,8000,2025-01-09

大阪支店の売上データ

osaka_sales.csv は大阪支店の売上データ(5件)です。以下の内容でファイルを作成してください。

order_id,customer_name,email,phone,product_name,category,quantity,unit_price,order_date
O001,中村優子,nakamura@example.com,090-6789-0123,モニター,電子機器,1,35000,2025-01-05
O002,小林誠,kobayashi@example.com,080-7890-1234,データ分析入門,書籍,3,2800,2025-01-06
O003,加藤恵,kato@example.com,070-8901-2345,Tシャツ,衣類,5,1500,2025-01-07
O004,吉田翔太,yoshida@example.com,090-9012-3456,パーカー,衣類,2,4000,2025-01-08
O005,渡辺愛,watanabe@example.com,080-0123-4567,USBメモリ,電子機器,10,1200,2025-01-09

S3へのアップロード

2つのファイルを作成したら、S3コンソールでバケットの input/ フォルダを開き、「アップロード」をクリックします。

tokyo_sales.csvosaka_sales.csv の両方のファイルを選択し、「アップロード」をクリックしてください。

アップロードが完了したら、input/ フォルダに2つのCSVファイルが表示されていることを確認してください。

4. IAMロールの作成

続いて、Glueに割り当てるためのIAMロールを作成します。GlueはETLジョブの実行時に、割り当てられたIAMロールの権限を使ってAWSリソースを操作します。今回はGlueがS3からデータを読み取り、加工したデータをS3に書き込む必要があるため、これらのアクセスを許可するIAMロールを作成します。

IAMのダッシュボードを開き、左側のメニューから「ロール」を選択し、「ロールを作成」をクリックします。

下記の内容を設定してください。

設定項目 設定の基準
信頼されたエンティティタイプ AWSのサービス Glueサービスにロールを引き受けさせるため
ユースケース Glue Glueジョブ用のロール

入力が完了したら「次へ」をクリックします。

許可ポリシーの設定画面が表示されます。検索ボックスを使って以下の2つのポリシーを検索し、チェックを入れてください。

ポリシー名 用途
AmazonS3FullAccess S3バケットの読み書きに必要
AWSGlueServiceRole Glueジョブの実行に必要

2つのポリシーにチェックを入れたら「次へ」をクリックします。

ロール名に glue-etl-handson-role と入力し、「ロールを作成」をクリックします。

IAMロールが作成されれば、ここまでの操作は完了です。

💡 ポイント
本番環境では AmazonS3FullAccess のような広い権限ではなく、必要なバケットとアクションのみを許可するカスタムポリシーを使用してください。このハンズオンでは設定を簡略化するためにフルアクセスを使用しています。

5. Glueジョブの作成

5.1 PySparkスクリプトの作成

Glueジョブで実行するPySparkスクリプトを作成します。以下の内容をローカルに etl_job.py として保存してください。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext
from pyspark.sql.functions import col, lit, concat, sum, count, when, regexp_replace, sha2, to_date, year, month

# ===== 初期化 =====
args = getResolvedOptions(sys.argv, ["JOB_NAME", "INPUT_PATH", "OUTPUT_PATH"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

input_path = args["INPUT_PATH"]
output_path = args["OUTPUT_PATH"]

# ===== Extract(抽出) =====
# 東京支店と大阪支店のCSVファイルを個別に読み込み
tokyo_df = spark.read.csv(f"{input_path}/tokyo_sales.csv", header=True, inferSchema=True)
osaka_df = spark.read.csv(f"{input_path}/osaka_sales.csv", header=True, inferSchema=True)

# 支店名カラムを追加
tokyo_df = tokyo_df.withColumn("branch", lit("東京"))
osaka_df = osaka_df.withColumn("branch", lit("大阪"))

# 2つのDataFrameを結合
df = tokyo_df.union(osaka_df)

print(f"=== 結合後のデータ件数: {df.count()} 件 ===")
df.show()

# ===== Transform(変換) =====

# --- 個人情報のマスキング ---
# メールアドレス: SHA-256でハッシュ化
df = df.withColumn("email_hash", sha2(col("email"), 256))

# 電話番号: 下4桁以外をマスク(例: ***-****-9012)
df = df.withColumn("phone_masked",
    regexp_replace(col("phone"), r"^\d{3}-\d{4}", "***-****")
)

# 元のemail・phoneカラムを削除
df = df.drop("email", "phone")

print("=== マスキング後のデータ ===")
df.select("order_id", "customer_name", "email_hash", "phone_masked").show(truncate=False)

# --- データ型の変換 ---
# 日付型への変換と年・月の抽出
df = df.withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd"))
df = df.withColumn("year", year(col("order_date")))
df = df.withColumn("month", month(col("order_date")))

# 注文金額の算出
df = df.withColumn("total_price", col("quantity") * col("unit_price"))

# --- Spark SQLによる集計 ---
# DataFrameを一時ビューとして登録
df.createOrReplaceTempView("sales")

# SQLで支店別・カテゴリ別の売上集計
summary_df = spark.sql("""
    SELECT
        branch,
        category,
        COUNT(*) AS order_count,
        SUM(total_price) AS total_sales,
        AVG(unit_price) AS avg_unit_price
    FROM sales
    GROUP BY branch, category
    ORDER BY branch, total_sales DESC
""")

print("=== 支店別・カテゴリ別の売上集計(Spark SQL) ===")
summary_df.show()

# ===== Load(出力) =====
# 明細データをParquet形式で出力(支店でパーティション分割)
df.write.partitionBy("branch").parquet(
    f"{output_path}/detail/",
    mode="overwrite"
)

# 集計データをParquet形式で出力
summary_df.write.parquet(
    f"{output_path}/summary/",
    mode="overwrite"
)

print("=== ETL処理が完了しました ===")

5.2 スクリプトの解説

コードを解説します。

args = getResolvedOptions(sys.argv, ["JOB_NAME", "INPUT_PATH", "OUTPUT_PATH"])

getResolvedOptions は、Glueジョブの実行時にパラメータを受け取るための関数です。INPUT_PATHOUTPUT_PATH をパラメータとして受け取ることで、入出力先のS3パスをジョブ実行時に柔軟に変更できます。

tokyo_df = tokyo_df.withColumn("branch", lit("東京"))
osaka_df = osaka_df.withColumn("branch", lit("大阪"))
df = tokyo_df.union(osaka_df)

lit() はリテラル値(固定値)のカラムを追加する関数です。各支店のDataFrameに支店名カラムを追加してから、union() で2つのDataFrameを縦方向に結合しています。

df = df.withColumn("email_hash", sha2(col("email"), 256))
df = df.withColumn("phone_masked",
    regexp_replace(col("phone"), r"^\d{3}-\d{4}", "***-****")
)
df = df.drop("email", "phone")

個人情報のマスキング処理です。メールアドレスは sha2() でSHA-256ハッシュに変換し、元のアドレスを復元できないようにしています。電話番号は regexp_replace() で先頭部分をアスタリスクに置換し、下4桁のみ残しています。処理後、元のカラムは drop() で削除しています。

📝 個人情報のマスキングとは
マスキングとは、個人情報や機密データを分析に支障のない形に加工する処理のことです。データ分析基盤にデータを格納する際、氏名やメールアドレス、電話番号などの個人情報をそのまま保存すると情報漏洩のリスクがあります。ハッシュ化や部分的な置換を行うことで、個人を特定できないようにしつつ、集計や分析には利用できるデータに変換します。
df.createOrReplaceTempView("sales")
summary_df = spark.sql("""
    SELECT branch, category, COUNT(*) AS order_count, ...
    FROM sales
    GROUP BY branch, category
""")

Spark SQLを使った集計処理です。createOrReplaceTempView() でDataFrameをSQLのテーブルとして登録すると、spark.sql() で標準的なSQLクエリを実行できます。前回の講座で学んだ groupBy().agg() と同じ結果が得られますが、SQLに慣れている方にとってはこちらのほうが直感的に書けます。

df.write.partitionBy("branch").parquet(f"{output_path}/detail/", mode="overwrite")

partitionBy("branch") を指定すると、branch=東京/branch=大阪/ のようにフォルダが分割されて出力されます。分析時に特定の支店のデータだけを読み込む場合、不要なデータを読み飛ばせるためパフォーマンスが向上します。

5.3 Glueジョブの作成

AWSマネジメントコンソールの検索ボックスで「AWS Glue」と入力し、Glueのコンソールを開きます。

左メニューの「ETLジョブ」をクリックし、「Script editor」を選択します。

下記の内容を設定してください。

設定項目 設定の基準
エンジン Spark PySparkスクリプトを実行するため
オプション Upload and edit an existing script(既存のスクリプトをアップロード) 作成済みのスクリプトを使用するため
ファイルのアップロード etl_job.py 先ほど作成したスクリプト

入力が完了したら「Create script」をクリックします。

スクリプトエディタが開き、アップロードしたスクリプトの内容が表示されます。

ジョブの詳細設定

続いて、ジョブの詳細を設定します。画面上部の「Job details」タブをクリックしてください。

下記の内容を設定してください。

設定項目 設定の基準
Name glue-etl-handson ジョブを識別するため
IAM Role glue-etl-handson-role 先ほど作成したIAMロール
Glue version Glue 5.0 最新バージョンを使用するため
Language Python 3 PySparkスクリプトのため
Worker type G.1X 最小構成
Requested number of workers 2 最小構成(コストを抑えるため)

ジョブパラメータの設定

同じ「Job details」タブの下部にある「Advanced properties」を展開し、「Job parameters」セクションを表示します。

「Add new parameter」をクリックして、以下の2つのパラメータを追加します。キーと値をそれぞれ入力してください。

キー
--INPUT_PATH s3://{バケット名}/input
--OUTPUT_PATH s3://{バケット名}/output

バケット名は先ほど作成したバケット名に置き換えてください。キーの先頭に --(ハイフン2つ)が必要です。

📝 パラメータのキーにはハイフン2つが必要です
Glueのジョブパラメータでは、キーの先頭に --(ハイフン2つ)を付ける必要があります。INPUT_PATH ではなく --INPUT_PATH と入力してください。ハイフンが不足しているとジョブ実行時にエラーになります。

すべての設定が完了したら、画面上部の「Save」をクリックしてジョブを保存します。

6. Glueジョブの実行

6.1 ジョブの実行

ジョブが保存されたら、「Run」をクリックしてジョブを実行します。

画面上部の「Runs」タブをクリックすると、実行状況を確認できます。ステータスがRunningとなり、ジョブが実行中であることがわかります。

ジョブの実行には2〜5分程度かかります。ステータスが「Succeeded」になれば、ジョブの実行は成功です。

⚠️ ジョブの実行に失敗する場合
(1)IAMロールに AmazonS3FullAccessAWSGlueServiceRole が付与されているか確認してください。
(2)Job parametersの --INPUT_PATH--OUTPUT_PATH のS3パスが正しいか確認してください。キーの先頭に -- が付いているかも確認してください。
(3)S3の input/ フォルダに2つのCSVファイルがアップロードされているか確認してください。

6.2 実行ログの確認

ジョブが成功したら、実行ログを確認します。「Runs」タブで実行IDをクリックし、「Output logs」のリンクからCloudWatch Logsを開きます。

スクリプト内の print() で出力したメッセージがログに記録されています。以下の3つのポイントを確認してください。

  • 結合後のデータ件数が10件であること(東京5件 + 大阪5件)
  • マスキング後のデータで、メールアドレスがハッシュ値に、電話番号が***-****-XXXXの形式に変換されていること
  • 支店別・カテゴリ別の集計結果が出力されていること

これらが確認できれば、ETL処理は正しく動作しています。

6.3 出力データの確認

続いて、S3に出力されたデータを確認します。S3コンソールでバケットの output/ フォルダを開きます。

output/detail/ フォルダには、支店ごとにパーティション分割されたParquetファイルが格納されています。

partitionBy("branch") の指定により、支店ごとにフォルダが分割されていることがわかります。detail/ フォルダの中に branch=東京/branch=大阪/ の2つのフォルダが作成されていれば、パーティション分割は正しく動作しています。

summary/ フォルダには、支店別・カテゴリ別の集計データがParquet形式で格納されています。

💡 ポイント
Parquetファイルはバイナリ形式のため、S3からダウンロードしてもテキストエディタでは中身を確認できません。データの内容を確認したい場合は、AthenaでSQLクエリを実行するか、S3の「クエリでプレビュー」機能を使用してください。

7. RDSからの読み込み(参考)

ここまでのハンズオンではS3のCSVファイルをデータソースとして使用しましたが、実務ではRDS(リレーショナルデータベース)からデータを取得するケースも多くあります。RDSからの読み込みにはJDBC接続を使用します。

以下は、RDS(MySQL)からデータを読み込むコードの例です。実際にはVPCの設定やGlue接続(Connection)の作成が別途必要になりますが、PySpark部分のコードは以下のようになります。

orders_df = spark.read.format("jdbc").options(
    url="jdbc:mysql://my-rds-endpoint:3306/mydb",
    dbtable="orders",
    user="admin",
    password="password",
    driver="com.mysql.cj.jdbc.Driver"
).load()

読み込んだ後は、S3から読み込んだ場合と同じDataFrame操作が使えます。

orders_df.filter(col("status") == "完了").show()
orders_df.groupBy("category").agg(sum("amount").alias("total")).show()
💡 ポイント
RDSからの読み込みを行う場合、Glueジョブの設定で接続(Connection)を作成し、VPC・サブネット・セキュリティグループを指定する必要があります。また、パスワードはコードに直接記述せず、AWS Secrets Managerで管理することを推奨します。「データ分析基盤を構築しよう」のハンズオンでGlueからRDSに接続した際の設定が参考になります。

8. 不要リソースの削除

ハンズオンが完了したら、不要なコストが発生しないようにリソースを削除します。以下の順序で削除してください。

8.1 Glueジョブの削除

Glueコンソールの「ETLジョブ」を開きます。glue-etl-handson を選択し、「アクション」→「ジョブの削除」をクリックします。

確認ダイアログが表示されたら「削除」をクリックしてください。ジョブが一覧から消えれば削除は完了です。

8.2 IAMロールの削除

IAMコンソールの左メニューから「ロール」を選択します。検索ボックスに glue-etl-handson-role と入力し、該当のロールを選択して「削除」をクリックします。

確認のためロール名の入力を求められるので、glue-etl-handson-role と入力して「削除」をクリックしてください。

8.3 S3バケットの削除

S3コンソールでバケットを選択します。

まず「空にする」をクリックして、バケット内のすべてのオブジェクトを削除します。確認のため 永久に削除 と入力して「空にする」をクリックしてください。

バケットが空になったら、「削除」をクリックしてバケット自体を削除します。確認のためバケット名を入力して「バケットを削除」をクリックしてください。

バケットが一覧から消えれば、すべてのリソースの削除は完了です。

9. まとめ

このハンズオンでは、AWS Glueを使ったS3上のCSVデータのETL処理を体験しました。

  • S3上の複数のCSVファイル(東京支店・大阪支店)を読み込み、union()結合した
  • メールアドレスのハッシュ化と電話番号のマスキングにより、個人情報の保護を行った
  • createOrReplaceTempView()spark.sql() を使ったSpark SQLで集計クエリを実行した
  • 処理結果を partitionBy() で支店ごとにフォルダ分割し、Parquet形式でS3に出力した
  • 前回の講座で学んだPySparkの操作(withColumnfiltergroupBy など)がGlueジョブでそのまま活用できることを確認した

10. (参考)Terraformで構築する場合

このハンズオンで構築した環境は、Terraformを使ってコードで管理することもできます。以下に、S3バケット・IAMロール・GlueジョブのすべてのリソースをTerraformで構築するサンプルコードを示します。

注意: サンプルCSVデータの作成とS3へのアップロードは、Terraformの範囲外のため手動で行う必要があります。

10.1 Terraformコード

以下のコードをmain.tfとして保存してください。

# ==============================================================
# Provider(プロバイダ設定)
# Terraformが使用するAWSプロバイダのバージョンとリージョンを指定
# ==============================================================
terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 6.0"
    }
  }
}

provider "aws" {
  region = "ap-northeast-1" # 東京リージョン
}

# ==============================================================
# Variables(入力変数)
# terraform.tfvars または実行時に値を指定する
# ==============================================================

# S3バケット名の末尾に付ける識別子(バケット名はグローバルで一意にする必要があるため)
variable "bucket_suffix" {
  type        = string
  description = "S3 bucket suffix (e.g. yamada-20250120)"
}

# ==============================================================
# S3(データ格納先)
# サンプルCSVの入力先とGlueジョブの出力先
# ==============================================================

# S3バケットの作成
resource "aws_s3_bucket" "glue_data" {
  bucket        = "glue-etl-handson-${var.bucket_suffix}"
  force_destroy = true # ハンズオン用: バケット内にオブジェクトがあっても削除可能にする
  tags          = { Name = "glue-etl-handson-${var.bucket_suffix}" }
}

# inputフォルダの作成(サンプルCSVのアップロード先)
resource "aws_s3_object" "input_folder" {
  bucket = aws_s3_bucket.glue_data.id
  key    = "input/"
}

# outputフォルダの作成(Glueジョブの出力先)
resource "aws_s3_object" "output_folder" {
  bucket = aws_s3_bucket.glue_data.id
  key    = "output/"
}

# scriptsフォルダの作成(PySparkスクリプトの格納先)
resource "aws_s3_object" "scripts_folder" {
  bucket = aws_s3_bucket.glue_data.id
  key    = "scripts/"
}

# PySparkスクリプトをS3にアップロード
resource "aws_s3_object" "etl_script" {
  bucket = aws_s3_bucket.glue_data.id
  key    = "scripts/etl_job.py"
  content = <<-PYSPARK
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext
from pyspark.sql.functions import col, lit, concat, sum, count, when, regexp_replace, sha2, to_date, year, month

# ===== 初期化 =====
args = getResolvedOptions(sys.argv, ["JOB_NAME", "INPUT_PATH", "OUTPUT_PATH"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

input_path = args["INPUT_PATH"]
output_path = args["OUTPUT_PATH"]

# ===== Extract(抽出) =====
tokyo_df = spark.read.csv(f"{input_path}/tokyo_sales.csv", header=True, inferSchema=True)
osaka_df = spark.read.csv(f"{input_path}/osaka_sales.csv", header=True, inferSchema=True)

tokyo_df = tokyo_df.withColumn("branch", lit("東京"))
osaka_df = osaka_df.withColumn("branch", lit("大阪"))

df = tokyo_df.union(osaka_df)

print(f"=== 結合後のデータ件数: {df.count()} 件 ===")
df.show()

# ===== Transform(変換) =====
df = df.withColumn("email_hash", sha2(col("email"), 256))
df = df.withColumn("phone_masked",
    regexp_replace(col("phone"), r"^\d{3}-\d{4}", "***-****")
)
df = df.drop("email", "phone")

df = df.withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd"))
df = df.withColumn("year", year(col("order_date")))
df = df.withColumn("month", month(col("order_date")))
df = df.withColumn("total_price", col("quantity") * col("unit_price"))

df.createOrReplaceTempView("sales")

summary_df = spark.sql("""
    SELECT
        branch,
        category,
        COUNT(*) AS order_count,
        SUM(total_price) AS total_sales,
        AVG(unit_price) AS avg_unit_price
    FROM sales
    GROUP BY branch, category
    ORDER BY branch, total_sales DESC
""")

print("=== 支店別・カテゴリ別の売上集計(Spark SQL) ===")
summary_df.show()

# ===== Load(出力) =====
df.write.partitionBy("branch").parquet(
    f"{output_path}/detail/",
    mode="overwrite"
)

summary_df.write.parquet(
    f"{output_path}/summary/",
    mode="overwrite"
)

print("=== ETL処理が完了しました ===")
  PYSPARK
}

# ==============================================================
# IAM(権限管理)
# GlueジョブがS3やCloudWatch Logsにアクセスするための権限
# ==============================================================

# Glue用IAMロール
resource "aws_iam_role" "glue" {
  name = "glue-etl-handson-role"

  # Glueサービスがこのロールを引き受けることを許可
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = { Service = "glue.amazonaws.com" }
    }]
  })

  tags = { Name = "glue-etl-handson-role" }
}

# AWSGlueServiceRole(Glueジョブの実行に必要なAWS管理ポリシー)をアタッチ
resource "aws_iam_role_policy_attachment" "glue_service" {
  role       = aws_iam_role.glue.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
}

# AmazonS3FullAccess(S3バケットの読み書きに必要)をアタッチ
resource "aws_iam_role_policy_attachment" "glue_s3" {
  role       = aws_iam_role.glue.name
  policy_arn = "arn:aws:iam::aws:policy/AmazonS3FullAccess"
}

# ==============================================================
# Glue ETL Job(ETLジョブ)
# S3上のCSVデータを変換しParquet形式で出力するジョブ
# ==============================================================
resource "aws_glue_job" "etl" {
  name     = "glue-etl-handson"
  role_arn = aws_iam_role.glue.arn

  command {
    name            = "glueetl"
    script_location = "s3://${aws_s3_bucket.glue_data.id}/scripts/etl_job.py"
    python_version  = "3"
  }

  # スクリプトに渡すパラメータ
  default_arguments = {
    "--job-language" = "python"
    "--INPUT_PATH"   = "s3://${aws_s3_bucket.glue_data.id}/input"
    "--OUTPUT_PATH"  = "s3://${aws_s3_bucket.glue_data.id}/output"
    "--TempDir"      = "s3://${aws_s3_bucket.glue_data.id}/temp/"
  }

  glue_version      = "5.0"  # Glue 5.0(ハンズオン本編と同じバージョン)
  worker_type       = "G.1X" # ワーカータイプ(G.1X = 4 vCPU, 16 GB メモリ)
  number_of_workers = 2      # ワーカー数(小規模データのため最小構成)
}

# ==============================================================
# Outputs(出力値)
# terraform apply後にコンソールに表示される情報
# ==============================================================
output "s3_bucket_name" {
  description = "データ格納先のS3バケット名"
  value       = aws_s3_bucket.glue_data.id
}

output "glue_job_name" {
  description = "GlueジョブのETLジョブ名"
  value       = aws_glue_job.etl.name
}

10.2 Glueリソースの解説

Terraformコードのうち、このハンズオンの主題であるGlue関連のリソースについて解説します。

リソース Terraformリソース名 説明
S3バケット aws_s3_bucket / aws_s3_object Glueの入出力先となるS3バケットを作成。aws_s3_objectinput/output/scripts/フォルダを作成
ETLスクリプト aws_s3_object(etl_script) PySparkで記述されたETLスクリプトをS3にアップロード。ハンズオン本編と同じスクリプトをインラインで定義
IAMロール aws_iam_role / aws_iam_role_policy_attachment Glueジョブの実行に使用するロール。AWSGlueServiceRoleAmazonS3FullAccessの2つのポリシーをアタッチ
Glue ETLジョブ aws_glue_job Glueジョブの定義。スクリプトの場所、IAMロール、Glueバージョン、ワーカータイプ・数を指定。default_argumentsでスクリプトに渡すパラメータ(入出力先S3パス)を設定

10.3 実行方法

Terraform実行

terraform.tfvarsを作成し、変数を設定します。

bucket_suffix = "yamada-20250120"  # ご自身の名前と日付に置き換えてください

まず、Terraformの初期化を行います。プロバイダのダウンロードなどが実行されます。

terraform init

次に、作成されるリソースの実行計画を確認します。どのようなリソースが作成されるかを事前に確認できます。

terraform plan

問題がなければ、実際にリソースを作成します。確認プロンプトが表示されるので、yesと入力してください。

terraform apply

実行が完了すると、S3バケット名とGlueジョブ名が出力されます。

サンプルデータのアップロード

Terraformでリソースを作成した後、ハンズオン本編と同じサンプルCSVファイル(tokyo_sales.csvosaka_sales.csv)を作成し、S3のinput/フォルダにアップロードしてください。

以下のコマンドでAWS CLIを使ってアップロードできます。

aws s3 cp tokyo_sales.csv s3://glue-etl-handson-<bucket_suffix>/input/
aws s3 cp osaka_sales.csv s3://glue-etl-handson-<bucket_suffix>/input/

Glue ETLジョブを実行

Glue ETLジョブはTerraformでリソースを作成しただけでは自動実行されません。AWSマネジメントコンソールから手動で実行します。

AWSマネジメントコンソールで「AWS Glue」を開き、左メニューから「ETL jobs」を選択します。ジョブ一覧からglue-etl-handsonを選択し、画面右上の「Run」ボタンをクリックしてジョブを実行します。「Runs」タブでジョブの実行状況を確認でき、ステータスが「Succeeded」になれば完了です。

リソースの削除

環境を削除する場合は、以下のコマンドを実行します。

terraform destroy

force_destroy = trueを設定しているため、S3バケット内にオブジェクトがあっても削除されます。