AWS S3に設置されたCSVをGCP BigQueryで分析した時にやったこと

今年の夏は長いなと思っていたらこのところ急に涼しくなったりで、冷房をつけるか悩む日々です。
寒暖差がありますが、皆様は体調いかがでしょうか?

さて今回は、お客様のシステムデータを分析するにあたって実施したAWSとGCPの連携の話です。今回の分析では、お客様側のシステムがAWSで動作しており、S3に定期的にデータをCSV出力されていました。今回これをBigQueryに取り込んで分析する必要があり、取り込みのためにGCPのいくつかの機能を使いましたのでご紹介します。

※GLASSではお客様のシステムデータやサイトアクセスデータ、広告データなどを統合して、可視化・分析するサービスを行っております。ご興味がある方は以下をご確認ください。

ダッシュボード

処理の全体像

AWS S3からGCP BigQueryのデータ転送はいくつか方法がありますが、今回はCloudStorage経由での方法になります。

今回の前提として、あるマスタの週次差分データがCSVファイルとしてAWS S3に出力されます。今回はこのデータを以下の手順でCloudStorageを経由して最終的にBigQueryに取り込んでいます。

  • CSVファイルを定期的にAWS S3からGCP CloudStorageに転送する
  • CloudStorageに転送されたことをトリガにしてBigQueryに該当データをテーブルとして格納
  • 格納された差分データはBigQueryのスケジュールされたクエリを使って累積用テーブルに追加

なお、CSVデータはBigQueryが対応しているUTF-8にしておく必要があります。

【BigQueryガイド データの読み込みの概要】
https://cloud.google.com/bigquery/docs/loading-data?hl=ja

AWS S3からGCP CloudStorageへの転送

AWSからGCPへのデータの転送にはStorageTransferを使いました

※以下作業の前提としてAWS S3の該当バケットに対してアクセス可能なアカウントを作成し、その認証情報が用意されているものとします。

StorageTransferで「転送ジョブを作成」を選択します。

最初の参照元と宛先の指定でAmazon S3Google Cloud Storageを選択します。スケジュールモードは用途に合ったものを選んでください。

次にAWSのバケットを指定して、用意した認証情報(アクセスキーIDとシークレットアクセスキー)を指定します。

転送先のGCPのバケットを指定します。

今回は週1の定期的な分析ということだったので、転送は週1回のスケジュールで実施する設定としました。(キャプチャはバッチを選んだ場合。イベントドリブンの場合は画面が異なります)

最後の画面で上書き条件など細かなオプションを設定したら完了です。

CloudStorageからBigQueryへの転送

CloudStorageからBigQueryへの転送にはCloudRunを利用しました。CloudRunでPythonコードを使って、CSVデータを読み込み、BQにAPI経由で新規テーブルを作りました

記述したPythonコード(Python 3.12)は以下です。以下例ではitems.csvというファイルをCloudStorageからBigQueryに取り込みます。

from google.cloud import bigquery
import json

def load_data_to_bq(event, context):
    # BigQueryの環境に合わせて<project_id>を変更して下さい
    client = bigquery.Client(project="<project_id>") 
    BUCKET_NAME = event['bucket']
    NAME = event['name']
    NAME_WITHOUT_EXT = NAME.replace(".csv", "")

    hash = {
        "items": {
            "schema": [
                {"name": "item_code", "type": "STRING"},
                {"name": "item_name", "type": "STRING"},
                {"name": "category", "type": "STRING"},             
            ]
        },
        # ほかにも設置するファイルがあれば記述
    }

    try:
        target = hash[NAME_WITHOUT_EXT]
    except KeyError:
        return

    target_schema = []
    for col in target["schema"]:
        target_schema.append(bigquery.SchemaField(col["name"], col["type"]))

    # BigQueryの環境に合わせて<project_id>.<dataset_name>を変更して下さい
    table_id = f"<project_id>.<dataset_name>.{NAME_WITHOUT_EXT}_diff"

    # 既存のテーブルを削除
    try:
        client.delete_table(table_id, not_found_ok=True)
    except Exception as e:
        print(f"Error deleting table {table_id}: {e}")    

    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1, #先頭行がヘッダの場合は1
        autodetect=False,
        schema=target_schema,
    )

    uri = f"gs://{BUCKET_NAME}/{NAME}"

    load_job = client.load_table_from_uri(
        uri, table_id, job_config=job_config
    )

    load_job.result() 

    print(f"Loaded data from {uri} to {table_id}")

コードの通り、CloudRunでPythonを使った場合は、スキーマ(カラム名とデータ型)も指定できるので柔軟性が高いです。例えばCSVのカラム名が日本語の場合でもこのコード上で英語名に変更できますし、商品コードなどのデータ型も必要に応じて数値型か文字列型か調整できます。

最初はBigQuery側でCloudStorageのファイルを直接参照する方法を試したのですが、この場合は細かなスキーマの指定はできず、自動判定になってしまいました。
例えば、空のCSVが渡された場合はデータからデータ型を推測できないため全てのカラムが文字列型になり、フィールド名もstring_field_0のようなフィールド名になってしまいました。
また、空では無くデータが入っている場合も、カラム名に日本語が使われていると’_’に置き換えられてしまいました。

話をCloudRunに戻します。CloudRunは定期実行など実行タイミングを選ぶことができます。定期実行以外にもCloudStorageにファイルが置かれたことをトリガにすることができるので、今回はこれを使いました。

トリガーのタイプにはCloudStorageを、イベントタイプにはgoogle.cloud.storage.object.v1.finalizedを指定します。

CloudStorageへのファイル設置をトリガにした場合、Pythonコードのevent[‘name’]、event[‘bucket’]にはトリガとなったファイルとバケットの情報が入りますので、コード中からこれを使ってファイル名などを参照できます。

BigQueryでスケジュールされたクエリを使って差分データを蓄積

ここまでで無事、差分データのCSVファイルをBigQueryに転送できました。今回はこの定期的に取得できる差分データを蓄積しておきたかったので、スケジュールされたクエリを使って以下のように蓄積用のテーブルにUPSERTすることにしました。

MERGE INTO `<project_id>.<dataset_name>.items` AS master
USING `<project_id>.<dataset_name>.items_diff` AS diff
ON master.item_code = diff.item_code
WHEN MATCHED THEN
  UPDATE SET
    master.item_code = diff.item_code,
    master.item_name = diff.item_name,
    master.category = diff.category ,
WHEN NOT MATCHED THEN
  INSERT (
    item_code, item_name, category
  )
  VALUES (
    diff.item_code, diff.item_name, diff.category
  );

これで、BigQuery内でサイトアクセスのデータなど統合してと様々な分析が可能になりました。

まとめ

今回はAWS S3に置かれたCSVのマスタ差分ファイルをGCPのBigQueryで分析できるようにするまでの連携についてお話ししました。

GLASSでは今回のようなお客様のシステムデータを、Google Analyticsや各種広告データなどのマーケティングデータと統合し、マーケティング成果を可視化・分析するサービスを行っています。

ご興味がございましたら是非以下からお問い合わせください。

文末ダッシュボード

GLASSで一緒に働いてみませんか?