🚜
dbtからDataprocでpythonモデルを実行する
2024-12-22
約3713字
株式会社ヤードでデータエンジニアをしている @toitech です。
今回は、dbt(data build tool)からDataprocを使ってPythonモデルを実行する方法を紹介します。
dbtは、データ変換プロセスを管理・自動化し、主にSQLを使ってモジュール化・テスト可能なデータモデルを構築するツールです。
SQLのみで開発できるため、アナリティクスエンジニアを中心に多くのデータ基盤で活用されています。
いっぽうで、通常のSQLやUDF(ユーザー定義関数)では困難あるいは非効率なデータ処理や分析を実現する場合、Pythonで記述したいケースも少なからずあります。
などが、典型的なユースケースかと思います。
dbtからPythonモデルを利用する場合、以前は dbt-fal と呼ばれるアプローチがありましたが、プロジェクトが終了してしまいました。
それの代替手段として、Dataprocを使うことが推奨されています。
今回は、dbtからDataproc上でpythonモデルを実行する方法を紹介します。
Dataprocは、Google Cloudが提供する高速で簡単なマネージドApache SparkとHadoopサービスです。データ処理や分析、機械学習をクラウドで効率的に実行できるように設計されています。
Dataprocを実行するサービスアカウントに、以下の権限を加えます。
インスタンスタイプのOSについては、Dataprocのインスタンスのリリースノートを見ると、Debian / Ubuntu / RockyのOSが使えることがわかります。
今回は、2.2.39-debian12を使います。
# サービスアカウントの作成
resource "google_service_account" "dataproc_sa" {
account_id = "dataproc-service-account"
display_name = "Dataproc Service Account"
}
# サービスアカウントに必要なIAMロールを付与
resource "google_project_iam_member" "dataproc_roles" {
for_each = toset([
"roles/dataproc.worker",
"roles/storage.objectAdmin",
"roles/monitoring.metricWriter",
"roles/logging.logWriter",
"roles/bigquery.dataEditor",
"roles/bigquery.jobUser",
"roles/bigquery.user"
])
project = "your-project-id"
member = "serviceAccount:${google_service_account.dataproc_sa.email}"
role = each.value
}
resource "google_dataproc_cluster" "example_cluster" {
name = "example-dataproc-cluster"
region = "asia-northeast1"
cluster_config {
master_config {
num_instances = 1
machine_type = "n1-standard-4"
disk_config {
boot_disk_type = "pd-ssd"
boot_disk_size_gb = 100
}
}
worker_config {
num_instances = 2
machine_type = "n1-standard-4"
disk_config {
boot_disk_type = "pd-ssd"
boot_disk_size_gb = 100
}
}
software_config {
image_version = "2.2.39-debian12"
}
initialization_action {
executable_file = "gs://your-bucket/connectors.sh"
}
gce_cluster_config {
network = "default"
subnetwork = "default"
service_account = google_service_account.dataproc_sa.email
}
}
}
GoogleCloudDataprocのレポジトリにある、connectors.shをコピーし、末尾に自分が使いたいPythonパッケージの設定を記述するのが良いでしょう。今回は、後述するpythonコードでhash化を行うので、hashidsをインストールします。
この connectors.sh
ファイルを your-bucket
のバケット以下に配置します。
#!/bin/bash
# 略 上記のconnectors.shの記述をコピー
apt-get update || true
apt-get install -y python3-pip
pip3 install hashids
これらの設定をterraform applyします。
profiles.yml
を以下のように設定します。
my_dbt_project:
target: dev
outputs:
dev:
type: bigquery
location: asia-northeast1
method: oauth
project: project-test
dataset: test
gcs_bucket: your-bucket
dataproc_region: asia-northeast1
staging層でtest.test_tableのidカラム(int)をhashed_idに変換する処理は、以下のmodels/staging/hashed_test.py
のように書きます。
以下の例では、dbt.source("test", "test_table)
の部分は、PySparkのDataframe型になっているのでPandasに変換しています。
from hashids import Hashids
def hash_id(x):
# 何かしらの処理、ここでxをハッシュ化する例を示します
hashids = Hashids()
return hashids.encode(x)
def model(dbt, session):
dbt.config(
submission_method="cluster",
dataproc_cluster_name="example-dataproc-cluster",
)
df = dbt.source("test", "test_table").toPandas()
df["hashed_id"] = df["id"].apply(lambda x: hash_id(x))
return df
以下のように実行すると、無事にhashed_testテーブルが作成されます。
dbt run --select hashed_test --target dev
本記事では、dbtでpythonモデルを実行するためのDataprocの構築方法と簡単な処理のpythonの処理スクリプトを紹介しました。
大規模データ処理はもちろん、SQLでは表現しにくいロジックをPythonで記述できる用になると思います。
©︎ 2025 - Yard