こんにちは、メディアサービス開発部サービス分析課の佐藤です。ブックウォーカー社で全社横断のデータ基盤を構築しています。
前回SlackからGitHub Actionsを実行する記事を投稿しましたが、今回はそのGitHub Actionsを使ってデプロイしていたBigQueryのリモート関数の利用ケースについて説明していきます。
- 背景
- 「外部のAPIから得たデータをBigQueryへ投入するやり方」の検討
- Terraformでデプロイ用のServiceAccountを作成
- BigQueryに外部接続とデータセット作成
- GitHub Actionsにデプロイ用ワークフローを作る
- 全体の流れをまとめて提供する
- 終わりに
背景
現在、BigQueryのデータを加工する集計バッチについてはスケジュールクエリを各自に好き勝手に作成してもらう運用にしています。
スケジュールクエリ登録に制限やレビューを設けず好きにやって貰う代わりに、個人スキャンサイズに割当上限をかけたり、大規模なログを扱う分割テーブルはパーティショニングフィルタを必須としています。
ただ、さすがにそろそろスケジュールクエリでバッチを作り続けるのには限界を感じてきているので、これは将来dataformに移行する予定です。
dataformへの移行を前提とした時、同時にデータリネージも積極的に行っていこうと検討しています。
GCPのDataplexによるBigQueryのデータに対するデータリネージ機能はつい先日GAした機能で、SQLの実行履歴を元にしてそのSQLの入力や出力を可視化ネットワーク上で追いかけることができるようになります。
この機能を使い倒すことで集計バッチにおける一連の流れを追いかけやすくなるだろうと思われます。 一方でデータの取得部分をどうやってこの可視化ネットワークへ取り入れていくかが課題となります。
こういった課題があることは長らく認識していましたが、dataformもリネージもまだプレビューであるため本格的な移行や活用時期は先になるだろうと踏んでいました。
しかし今年に入って「実際にGA4のAPIを利用してGA4レポートの内容をBigQueryに投入したいがどうやったらいいか?」という相談がサービス分析課にやってきました。その依頼元のチームではこれまでGASからGoogleAnalytics DataとBigQuery insertを使っており、バッチもデータ保存先もを自前で管理していました。サービス分析課がデータ基盤を構築していくことで、データの一元化やデータ加工処理の整備が進んでいったため、データ基盤流のやり方に合わせて徐々に移行させたいというのが相談の背景です。
そこで「外部のAPIから得たデータをBigQueryへ投入するやり方」についてサービス分析課なりの推奨手法を検討することとなりました。
「外部のAPIから得たデータをBigQueryへ投入するやり方」の検討
今回の依頼ではGA4のレポートデータでしたが、他にもアプリストアや広告など業務に関連する様々な領域でAPI経由のデータ取り込みが想定されます。こういった「社外のサービスからAPI経由で取り込むデータ」のことをこの記事では「外部APIデータ」と呼ぶことにします。
外部APIデータの取得するためにはJavaScriptやRubyなど何らかのプログラミング言語でコードを書くことになります。コードからAPIに対してGETなりPOSTなりのHTTPリクエストを行って、レスポンスとして直接取得するかレスポンス内のダウンロードURLを介して取得するのが一般的なデータの取得方法だと思います。
このコードをどこで実行しどのようにBigQueryへ投入するか、その流れをどうやってデータリネージ機能に追跡させるかを考えていきます。 まず「外部APIを呼び出すコードの実行場所について」です。投入先がBigQueryなので下記3つを候補として考えていました。
- Google App Script(GAS)
- Cloud Functions
- Cloud Run
管理チームをサービス分析課へ変えてGASをこのまま使い続ける方法も一応検討候補にはありました。とっつきやすさやGoogle関連サービスとの連携させやすさなど利点はあります。しかしその場合「GoogleWorkspace下のリソースになる」「ライブラリ追加などで手間がかかる」「言語が選べない」「claspを使うにしてもローカルでの開発やCI整備が難しい」などのデメリットが多くありました。
そこで簡便さよりもデータ基盤チーム管理下のもと統一的な管理と自動化ができる方法としてCloud Functionsを採用することにしました。Cloud Runも残りの候補にありましたが、実現するにあたり大体同様のフローになりそうだったため、別の言語を使いたい等の要望が出ない限りはCloud Functionsを優先して整備するという方針です。
これで他のデータ基盤と同様にGCPにリソースをまとめることができ、開発環境を整えやすく、実行状況をGCPのMonitoringやLoggingなどで監視しやすくなります。
API呼び出しの実行基盤はCloudFunctionとして、次に「その結果のデータをBigQueryへ格納する方法について」検討します。パッと思いついた候補は下記の3つでした。
- GCP SDKをつかってBigQueryのテーブルにinsertする
- データをGCSに置き、GCP SDKかSQLでテーブルへロードする
- BigQueryのリモート関数としてBigQueryへデータを返す
BigQueryへのデータ投入は下記記事のフローチャートにあるように、ワンタイム処理か、繰り返す処理か、ストリーミングな処理かで大きく別れます。
一度きりの投入であったり、一回で大規模なデータを投入するときはGCS経由のloadが望ましいです。しかし外部APIデータは期間をあけて定期的に取得する想定でした。
かといってストリーミング処理となるほどシビアなパフォーマンスや信頼性は要求されません。Storage Write APIを使うほどではないでしょう。
外部APIデータをBigQueryに投入した後に連続してデータ加工を行うことも考えられるため、dataformに組み込みやすいと更にありがたいです。また、冒頭に書いたとおり、dataformとデータリネージ機能を利用した時に可視化ネットワークへの反映が気になります。
これらを検討しデータ投入をシームレスにSQLと繋げられるようにするため、BigQueryのリモート関数を採用することにしました。
リモート関数としてデータを取り出し、そのままSQLのINSERT文によりデータを保存します。こうすることで外部APIデータの取得をSQLジョブフローへ自然な形で埋め込むことができます。
以上から、外部APIデータを取得するときの手順は「Cloud Functionsに関数をデプロイ」し、「BigQueryのリモート関数経由で呼び出す」ことと決まりました。
次にこの2つの手順を誰がどのように実施するかですが、誰からも扱いやすいデータ基盤を全社に提供しつつ、セルフサービス化を目指しているサービス分析課としては外部APIの各利用者に行ってもらいたいと考えています。
- 外部APIを利用したい各チームのエンジニアがコードを書いてPRを出す
- チームメンバーとサービス分析課がレビューをする
- ApproveをもらったらPRをマージし、Slackからデプロイする
- (スケジュールクエリやdataformなどのSQLからリモート関数を利用する)
この一連のフローを行ってもらうための基盤を準備していきます。大まかな方針としては下記のリモート関数を作成する公式ドキュメントの通りです。この一連の作業をSlackコマンドとGitHub Actionsで自動化していきます。
以降では上記フローを整えるためにやったことを説明していきます。また、この作業においては前回記事にした「SlackからGitHub Actionsを実行する」コマンドを利用していきます。
Terraformでデプロイ用のServiceAccountを作成
GitHub ActionsからCloud Functionsをデプロイするために必要なサービスアカウントをTerraformで用意します。
ワークフロー内でgoogle-github-actions/auth
を使うことでOpenID Connectを利用し、このサービスアカウントとしてCloud Functionsのデプロイを行います。
# Slackから起動するGitHub Actionでリモート関数をデプロイするためのサービスアカウント resource "google_service_account" "remote_function_deployer_service_account" { display_name = "Remote Function Deployer service account" account_id = "remote-function-deployer" } # デプロイ用の権限を持ったロール resource "google_project_iam_custom_role" "bigquery_remote_function_deployer_role" { project = "XXXXX" role_id = "RemoteFunctionDeployer" stage = "GA" # タイミングに合わせて変えていく title = "BigQueryリモート関数作成者" description = "GitHub Actionsでリモート関数を作る" permissions = [ # IAMの「分析された権限を表示」から必要だったもののみ列挙、実際はもっと多いが省略 "bigquery.connections.delegate", "bigquery.jobs.create", "bigquery.routines.create", "bigquery.routines.update", "cloudfunctions.functions.get", "cloudfunctions.functions.update", "cloudfunctions.operations.get", "iam.serviceAccounts.actAs", "bigquery.connections.create", ] } # サービスアカウントにロールを付与 resource "google_project_iam_binding" "bigquery_remote_function_deployer_role_iam_binding" { project = "XXXXX" role = "projects/XXXXX/roles/RemoteFunctionDeployer" members = [ "serviceAccount:remote-function-deployer@XXXXX.iam.gserviceaccount.com", ] } # OIDC用 resource "google_service_account_iam_member" "oidc_principal_can_use_remote_function_deployer" { service_account_id = google_service_account.remote_function_deployer_service_account.name role = "roles/iam.workloadIdentityUser" member = "principalSet://iam.googleapis.com/${google_iam_workload_identity_pool.github_actions.name}/*" }
BigQueryに外部接続とデータセット作成
次にBigQueryで作ったリモート関数とCloud Functionsを繋げるために必要な外部接続を用意していきます。
接続の作成方法は上記ドキュメントにある通り「BigQueryコンソール」「bqコマンド」「Terraform」の3種類あります。
外部接続の管理・運用についてはまだ悩んでいるところがあるのですが、「リモート関数用」として1つ作ります。作成したいチームごとに1接続と細かく用意するか、全体で1つにまとめるかは検討の余地がありそうです。ロケーションはデータ基盤全体の運用に合わせ asia-northeast1
にします。
ドキュメントに従って接続を作成すると「BigLake とリモート関数(クラウド リソース)」という接続タイプの外部接続ができます。
接続の作成後に自動でサービスアカウントが作成されるので、このサービスアカウントにIAMから必要な権限を与えるためロールを付与します。今回はCloud Functionsの第2世代を利用していくため、「Cloud Run起動元」ロールを付与します。もしリモート関数の実行を第1世代で行う場合は「Cloud Functions起動元」を付与します。
リモート関数はユーザー定義関数(UDF)の特殊版であるため、CREATE FUNCTION
でデータセットの中にルーティンとして作成する必要があります。このためBigQuery内部でリモート関数の置き場を一箇所にまとめたデータセットを作るか、利用者に合わせて各データセットに作るか決める必要があります。
今回はセルフサービス化を推し進めていく一環としての作業であるため後者を採用し、既に用意してあったGA4のAPIを利用したいチーム用データセットに作成することとしました。
これでリモート関数を作成するための接続と置き場が用意できました。あとは実際にCloud Functionsへコードをデプロイし、 CREATE FUNCTION
を実行するだけでリモート関数は作成できます。
GitHub Actionsにデプロイ用ワークフローを作る
Cloud Functionsへ第2世代のコードをデプロイし、CREATE FUNCTION
を実行するワークフローを作成します。
前回記事にした「SlackからGitHub Actionsを実行する」コマンドがあるのでこのワークフローはリモート関数用のコードのPRがマージされた後、各チームの開発者からSlackごしに実行されます。実行するコマンド例としては下記のようなもので、コマンド引数それぞれがパースして渡ってくるとします。
/dwhbot deploy remote_function:hoge_func
deploy
とremote_function
の部分でどのワークフローを実行するか決め、hoge_func
で関数を指定します。
このコマンドにより下記のワークフローを実行していきます。
name: "deploy for remote function using Cloud Functions" on: repository_dispatch: types: [remote-function-deploy] env: GCP_WORKLOAD_IDENTITY_PROVIDER: # OIDC に事前に作っておいたもの GCP_SERVICE_PROVIDER: remote-function-deployer@XXXXX.iam.gserviceaccount.com # TFで作ったSA FUNC_NAME: ${{ github.event.client_payload.function_name }} # これからデプロイする関数名の指定 SOURCE_PATH: ${{ github.event.client_payload.source_directory }} # モノレポなので対象ディレクトリを指定する permissions: id-token: write contents: read jobs: deploy: name: "Deploy remote function" runs-on: ubuntu-latest defaults: run: working-directory: ./remote_function steps: # Setup - uses: actions/checkout@v3 - name: Configure GCP Credentials id: get-gcp-token-via-oidc uses: google-github-actions/auth@v1 with: workload_identity_provider: ${{ env.GCP_WORKLOAD_IDENTITY_PROVIDER }} service_account: ${{ env.GCP_SERVICE_PROVIDER }} - name: 'Set up Cloud SDK' uses: 'google-github-actions/setup-gcloud@v1' # Check deploy target - name: 'Set runtime' # 関数のコードからランタイムを特定する id: runtime run: echo "runtime=$(sh inference_runtime.sh ${{ env.SOURCE_PATH }})" >> $GITHUB_OUTPUT - name: 'Set entry point name' # 関数のコードからエントリポイントを取得する id: entry_point run: echo "entry_point=$(sh pick_entrypoint_name.sh ${{ env.SOURCE_PATH }})" >> $GITHUB_OUTPUT # Deploy - name: 'Deploy Cloud Functions' run: >- gcloud functions deploy ${{ env.FUNC_NAME }} --gen2 --region=asia-northeast1 --runtime=${{ steps.runtime.outputs.runtime }} # 自動判定したランタイム --source=./${{ env.SOURCE_PATH }} --entry-point=${{ steps.entry_point.outputs.entry_point }} # 自動判定したエントリポイント --trigger-http - name: 'Get URI of function' # デプロイした関数からエンドポイントのURLを手に入れる id: cloudfunction_uri run: >- echo "endpoint_url=$(gcloud functions describe ${{ env.FUNC_NAME }} --gen2 --region=asia-northeast1 --format='value(serviceConfig.uri)' )" >> $GITHUB_OUTPUT - name: 'Create Remote Function in BigQuery' # エンドポイントURLを書き換えつつ、CREATE FUNCTIONする run: > sed -e "s@ENDPOINT_URL@${{ steps.cloudfunction_uri.outputs.endpoint_url }}@" ./${{ env.SOURCE_PATH }}/create_remote_function.sql | bq query --use_legacy_sql=false --location=asia-northeast1 - name: Notify apply result to Slack if: always() id: slack uses: slackapi/slack-github-action@v1.23.0 with: payload: | # (ワークフローの成否を送る、割愛) env: SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK SLACK_WEBHOOK_URL: ${{ secrets.SLACK_CHANNEL_WEBHOOK_URL }}
Cloud Functionsのデプロイには様々なオプションが必要となりますが、ある程度の内容はSet runtime
とSet entry point name
のstepで自動判定します。
これは開発者が考える労力を減らすために、デプロイ内容についてはある程度あらかじめこちらで決めてしまっておこうという措置です。この自動判定の指定とは別に、リモート関数の開発者が自ら指定できるように設定ファイルを読み取る処理をいずれ用意しようと考えています。
それぞれの自動判定について説明します。
ランタイム判定
inference_runtime.sh
の中身はこのようなシェルスクリプトになっています。
#!/bin/sh #引数にとったディレクトリの中をみて言語ランタイムを返すシェルスクリプト for file in $( ls $1 ); do case "$file" in *.js ) echo "nodejs18" && break ;; *.php ) echo "php81" && break ;; *.py ) echo "python311" && break ;; *.rb ) echo "ruby30" && break ;; esac done
デプロイする関数コードのあるディレクトリにあるファイルの拡張子をみて判定します。とりあえず需要がありそうな4言語のみ想定しています。 ランタイムのバージョンは現時点で最新のものを指定し、何か要望があがるか又はCloud Functionsにおいてサポートが切れそうになったら更新する運用です。
エントリポイント判定
pick_entrypoint_name.sh
の中身はこのようなシェルスクリプトになっています。
#!/bin/sh #引数にとったディレクトリの中をみてエントリポイントを返すシェルスクリプト for file in $( ls $1 ); do case "$file" in # Javascriptエントリポイント: functions.http がある行の文字列 *.js) grep 'functions.http' $1/$file | grep -oP "(?<=['\"]).*?(?=['\"])" && break ;; # PHPエントリポイント: FunctionsFramework::http がある行の最初の文字列 *.php) grep 'FunctionsFramework::http' $1/$file | grep -oP "(?<=['\"]).*?(?=['\"])" |head -n1 && break ;; # Pythonエントリポイント: functions_framework.http がある行の次行にある関数名 *.py) grep -A1 'functions_framework.http' $1/$file | tail -n1 | grep -oP "(?<=def ).*?(?=\()" && break ;; # Rubyエントリポイント: FunctionsFramework.http がある行の文字列部分 *.rb) grep 'FunctionsFramework.http' $1/$file | grep -oP "(?<=['\"]).*?(?=['\"])" && break ;; esac done
エントリポイント名はコードの内容によって異なるので、命名は開発者によって異なります。GCPドキュメントのサンプルコードやGitHubにある各言語のFunctions Frameworkのリポジトリのサンプルを参考にして、エンドポイント名になりそうな部分をgrepで抽出しています。
上記の2つはどの関数においても共通で実行できるため全ワークフロー実行で共通の内容になりますが、CREATE FUNCTION
部分はこれから作成するリモート関数名が必要となるため、下記のようなテンプレートを用意してリモート関数作成者に各自書き換えてもらうことになります。ただし、ENDPOINTのURLだけは実際にCloud Functionsへデプロイしてから判明するため、ワークフローのYAMLへ書いたとおりデプロイ後に拾ってsed
で書き換えてbq
に流します。
-- 関数名と引数と返り値を作成するリモート関数に合わせて書き換える CREATE OR REPLACE FUNCTION `XXXXX.dataset_name.function_name`(arg_value DATE) RETURNS JSON REMOTE WITH CONNECTION `XXXXX.asia-northeast1.remote_function` -- 固定、書き換えない OPTIONS ( endpoint = 'ENDPOINT_URL' -- GitHub Actions内部で自動的に置き換える ) ;
注意点として、リモート関数を呼び出す際の戻り値について気をつける必要があります。公式ドキュメントにある通り 戻り値は固有のデータ型のみサポートしており、ARRAY
やSTRUCT
は扱えません。ですが大抵の外部APIデータは複数の値を持つため、JSON型にしがちだと思いますがBigQueryでJSON型を取り扱うときに用いるJSON関数はどれも戻り値はSTRING型を基本としています。数値や真偽値を扱いたい場合は関数呼び出し側で適宜キャストする必要があります。
リモート関数をデプロイする仕組みとしてはこれで一通り完成です。
全体の流れをまとめて提供する
実際に外部APIデータを利用したい方へ提供する前に、一度自分で一連のフローを試しました。(再掲)
- 外部APIを利用したい各チームのエンジニアがコードを書いてPRを出す
- チームメンバーとサービス分析課がレビューをする
- ApproveをもらったらPRをマージし、Slackからデプロイする
- (スケジュールクエリやdataformなどのSQLからリモート関数を利用する)
この時にCREATE FUNCTION
のテンプレを用意したり、ワークフローのSlack通知を調整したり、ドキュメントを整備します。一通りの流れを確認できた段階で完成です。
終わりに
リモート関数をデータ投入用に用いる試みのため、GitHub Actionsから自動でリモート関数を作成するフローを作りました。あとはこのリモート関数でのデータ投入の真価を発揮できるよう、dataformとデータリネージの活用を進めていくだけです。
このようにブックウォーカー社では様々な場面からでも利用しやすいデータ基盤を求めて、より良い形になるように自分たちで改善しています。こんなデータ基盤をぜひ使ってみたいという方、あるいは一緒に開発してみたいという方をお待ちしています。