📝
BigQueryでパーティションテーブルの特定のパーティションにデータを挿入する
2024-12-19
約4083字
imofessional
この記事ではタイトルの通り、BigQuery(BQ)で特定のパーティションにデータを挿入したいケースでどのようにすればいいかを書いていきたいと思います!特段難しいことはしてないのですが、ひと工夫必要なので、メモ的に記事にしてみました!📝
日付ごとなどでパーティショニングされている既存のテーブルの特定のパーティションに、他のテーブルからデータを取り込む時を想定しています!
取り込む上では、カラム名の一致やデータ型の一致がマストになります!!ですので、いくつか方法がありますが、カラム名とデータ型を一致させつつ取り込みます!今回は取り込み元のテーブルはすでにBQ上に存在しているが、データ型やカラム名が一致していないケースを考えます。このケースではSQLでカラム名のRenameとデータ型のキャストを行うのが良さそうです。before_tableというテーブルに取り込み元のデータが入っている時には以下のようなクエリになります。クエリの中ではカラムのRenameとデータ型の変換を行なっています↓
SELECT
before_column1_name AS after_column1_name,
CAST(before_column2_name AS String) AS after_column2_name,
before_column3_name AS after_column3_name
FROM your_project.your_dataset.before_table
WHERE 挿入したいデータの条件(日付を指定する条件句がくる)
your_project.your_dataset.target_tableの12/1のpartitionにデータを挿入していきます
一旦csvに変換してからbq_loadを使ってデータを挿入するアプローチを取ります
import os
import subprocess
from datetime import datetime, timedelta
from google.cloud import bigquery
client = bigquery.Client()
query = """
SELECT
before_column1_name AS after_column1_name,
CAST(before_column2_name AS String) AS after_column2_name,
before_column3_name AS after_column3_name
FROM your_project.your_dataset.before_table
WHERE 挿入したいデータの条件(日付を指定する条件句がくる)
"""
temp_file = "temp_data_20241201.csv"
query_job = client.query(query)
results = query_job.result()
with open(temp_file, 'w') as f:
f.write(','.join([field.name for field in results.schema]) + '\n')
for row in results:
processed_row = [
str(value) if value is not None else ""
for value in row
]
f.write(','.join(processed_row) + '\n')
bq_command = [
'bq', 'load',
'--source_format=CSV',
'--skip_leading_rows=1',
'--schema', 'target_tableのjson_shemaファイルを適当な場所に保存して、その場所を指定',"your_project.your_dataset.target_table$20241201",
temp_file,
]
subprocess.run(bq_command, check=True)
このコードのテーブル名や日付を変更して実行すればデータの挿入が可能です!そしてここまできたら特定期間分丸ごと挿入できるようにコードを拡張していきます
# insert_partition_data.pyの中身
import os
import subprocess
from datetime import datetime, timedelta
from google.cloud import bigquery
def insert_partition_data():
client = bigquery.Client()
start_date = datetime(2023,10,2) #ここで期間を指定
end_date = datetime(2024,12,1) #ここで期間を指定
current_date = start_date
while current_date < end_date:
partitiondate = current_date.strftime("%Y-%m-%d")
partition_suffix = current_date.strftime("%Y%m%d")
target_table = f"your_project:your_dataset.before_table${partition_suffix}"
query = """
SELECT
before_column1_name AS after_column1_name,
CAST(before_column2_name AS String) AS after_column2_name,
before_column3_name AS after_column3_name
FROM your_project.your_dataset.before_table
WHERE 挿入したいデータの条件(日付を指定する条件句がくる)
"""
temp_file = f"temp_data_{partition_suffix}.csv"
query_job = client.query(query)
results = query_job.result()
with open(temp_file, 'w') as f:
f.write(','.join([field.name for field in results.schema]) + '\n')
for row in results:
processed_row = [
str(value) if value is not None else ""
for value in row
]
f.write(','.join(processed_row) + '\n')
bq_command = [
'bq', 'load',
'--source_format=CSV',
'--skip_leading_rows=1',
'--schema', 'target_tableのjson_shemaファイルを適当な場所に保存して、その場所を指定',
target_table,
temp_file,
]
try:
subprocess.run(bq_command, check=True)
except subprocess.CalledProcessError as e:
# エラー内容をログ出力または表示
print(f"BigQueryコマンドの実行中にエラーが発生しました: {e}")
raise RuntimeError("bqコマンドの実行に失敗しました") from e
print(f"Data for {_PARTITIONDATE} inserted successfully into {target_table}")
os.remove(temp_file)
current_date += timedelta(days=1)
if __name__ == "__main__":
insert_partition_data()
あとは作成したscriptを実行すれば指定した期間分のデータを挿入できます!
partition分割テーブルの特定partitionにデータを挿入したいケースはデータの再投入やローカルのデータを後からテーブルに挿入したいときなどに起こると思います!初めての時は意外と手こずる気がしたのでこうして記録を残してみました!
©︎ 2025 - Yard