いくつものDBからデータをかき集めるには

こんにちは、メディアサービス開発部サービス分析課の佐藤です。推しTRPGはD&D5版です。今年4月に入社し、事業横断データ基盤を構築しています。
今回はブックウォーカー社内でデータ基盤を構築するにあたって身につけたAWS関連の知識、主にRDSのexport snapshot to S3、Glue Job、AWS Serverless Application Model (以下SAMと略します)周りについて書いていきます。

背景

株式会社ブックウォーカーは社名になっている電子書籍ストア以外にも様々なサービスを運営しています。 また、Webサービス以外にも社内用システムも含め、多種多様なデータが社内に存在しています。

自分が入社する4月時点ではこれらのデータに対する集計・分析業務は各々の事業部署で独立して行われ、データの在処はもちろん分析作業も分断されていました。 いわゆるデータのサイロ化が進んでいる状態です。その状態を整理するためには、社内のデータを統合し、集計や分析といった作業を共通化させ、データ基盤を構築する必要がありました。

そこでまず各サービス毎に散らばったAWSアカウントにあるDBのデータをかき集めることから着手しました。

幸いにも大体のサービスはAWSのRDSを利用しており、DBからのデータ収集は統一的な手法で行えそうだという予測がありました。 この時点でのDBデータの収集方法の候補は下記の3つです。

  1. 定期的なデータ抽出
  2. 外部接続(例:Redshift Federated Query, BigQuery Omni)
  3. Change Data Capture(例:AWS DMS、GCP Datastream)

これら手法それぞれについてメリット・デメリットを考えていきます。

手法1. 定期的なデータ抽出

  • データの鮮度は更新頻度次第
  • コストを計算しやすい
  • 他部署へのIAMロールの作成依頼とその検証が必要
  • システム構築が手間

手法2. 外部接続

  • 常に最新データを扱える
  • コストはクエリ次第
  • 他部署へのネットワーク設定依頼とその検証が必要
  • 負荷によっては専用のReadインスタンスの検討が必要

手法3. Change Data Capture

  • 最新データを扱える
  • コストは対象データの更新量次第
  • 他部署へのネットワーク設定とその検証が必要
  • 負荷によっては専用のReadインスタンスの検討が必要
  • 管理運用の手間が不安

事前にこういった点をそれぞれ列挙して比較しましたが、最終的には下記の観点からスナップショットを利用した定期的なデータ抽出を採用することになりました。

  • 他部署への依頼がIAM Role作成のみと最も手間が少ない
  • 今後もしも基盤の構成を変えるとしても応用が効く
  • データ基盤の他の部分でも触れるのでparquetの扱いに慣れておきたい

大雑把な流れとしては、定期的にRDSのスナップショットからparquetをS3バケットへ取り出し、必要であれば加工を施して蓄積するという工程です。

システム概略図(加工が不要な場合はGlueをスキップ)

システム構成の詳細

Export Snapshot to S3

RDSにはスナップショットのデータを利用してparquet形式でS3へエクスポートする機能があります。

docs.aws.amazon.com

この機能はMySQL、PostgreSQLどちらもサポートしており、Auroraクラスタでも非AuroraのDBでも扱えます。

また、スナップショットからのエクスポート処理はバックグラウンドで実行されるためRDS側のパフォーマンスへは影響しません。

今回は別AWSアカウントにあるRDSにおいて日次バックアップされている、自動スナップショットをS3バケットへエクスポートしていきます。 このエクスポートはAWSアカウントをまたぐ、クロスアカウントな処理となります。

エクスポート処理に必要なAWSリソースは主に下記の3種類です。

  • 抽出先のS3バケット
  • クロスアカウント暗号化用のKMSキー
  • エクスポート元と宛先両方のIAM Role

これらを用意しておけばAWS CLIやAWS SDKなど好きな手法でStartExportTaskを起動できるようになります。 今回は後述の加工処理も行う必要があり、まとめてワークフローとして扱うためLambdaからAWS SDK経由で呼び出します。

スナップショットのS3エクスポートに関する大まかな処理の流れは下図の通りです。 詳しくはAWSのドキュメントをご参照ください。

RDSスナップショットのクロスアカウントエクスポート

実際にエクスポートをしてみたところ、1~2時間程度かかりましたが現状のデータ基盤は日次処理が中心のため良しとします。

AWS Glue Job

スナップショットからのエクスポートでは最小単位がテーブルとなり、無加工でそのままparquetとして抽出されます。 データに対する要望によっては「個人情報を含むテーブルからIDのみを取り出したい」といったケースも考えられます。

ここではS3にエクスポートされたparquetをGlueのジョブを使ってすぐに加工し、個人情報カラムを削ぎ落としていきます。

今どきはETL(抽出→加工→取り込み)ではなくELT(抽出→取り込み→加工)が主流という向きもありますが、取り扱いに注意が必要なデータは扱うスコープをできる限り削りたいため今回はETL方式を採用します。

GlueはSparkのスクリプトでETLジョブを書くことができます。 今回始めてSparkに触れるため、AWS Glue Studioを使って試しにGUIでプロトタイプを作り、できあがったコードを確認しながら手を加えることでETLジョブを作っていきました。

GUIで組んだパーツを元にコード修正

Glueでparquet形式を扱うにはGlueContextからcreate_dynamic_framewrite_dynamic_frameを使って読み書きしていくことになります。

docs.aws.amazon.com

ここで、そのまま素直にformat="parquet"を指定し、useGlueParquetWriterを利用するとparquetの中のメタデータ形式が加工前と同じ形式に揃いませんでした。 スナップショットからエクスポートしたparquetは日付型がINT64 L:TIMESTAMP(MICROS,true)で、圧縮方式はgzipです。 対してGlueの加工後では日付型がINT96で圧縮方式がsnappyになってしまいました。

少し調べたところ下記の設定を追加すればそれぞれの形式を指定することができました。 こうすることでRDSのスナップショットからエクスポートしたparquetと形式を揃えることができます。

# Glue ETL job冒頭の部分で

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
conf = SparkConf()
conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")  # TIMESTAMPの型指定
conf.set("spark.sql.parquet.int96AsTimestamp", "false") # INT96を使わない
conf.set("spark.sql.parquet.compression.codec", "gzip") # 圧縮はgzipを指定
sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

加工後には加工元のデータは不要になるため、すぐにpurge_s3_pathで後掃除をしてしまいます。

Step FunctionsとEvent Bridge

上記のRDSスナップショットのエクスポート用のLambdaとGlueのETLジョブをひとまとまりのワークフローとし、定期的に実行するようにします。 GlueをPythonで書いたため、全体を通して使用言語はPythonへ統一することにしました。

ワークフローの作成にはStep Functionsを利用しました。こちらも初めての利用であったため、Glueジョブのときと同じくStep Functions Workflow Studioを利用し、GUIで色々試しながら組んでいきます。

ただしここで注意することとして、このGUIで組んだワークフローとその要素のAWSリソースは本番用に利用しない事をお勧めします。理由については後述するSAMの段で説明します。

Step Functionsで作ったワークフロー内でLambdaとGlue要素を作り、手動でワークフロー全体を実行することで最初から最後まで通した処理を確認していきます。

Step Functionsのワークフロー

このワークフローをEvent Bridgeのスケジュールルールを用いて定期実行することで、毎日RDSからデータを取り出してS3バケットへparquetを蓄積できるようになります。 サービス毎に異なるルールを用意し、ワークフローへ渡す入力を切り替えることで対象とするAWSアカウント、RDSクラスターなどを切り替えることができます。

SAMとCI/CD

ここまで説明したスナップショットのエクスポート(を指令するLambda)、データ加工のためのGlueジョブ、スケジュール実行をするEventBridgeルール、全体をまとめるStep Functions、これらをすべてSAMで一括管理できるようにまとめます。

ここで今まで作ったものはすべて捨て、まったく同じものをSAMで構築できるように新しいリソースを作り直すことをオススメします。 自分はうっかりSAM化前の状態で全体を稼働させてしまい、大量のリソースを一つ一つCloud Formationのスタックへ aws cloudformation create-change-set でインポートしていく羽目になりました。 こうなると一筋縄ではいかず、インポート時に予期せぬエラーが出て泥臭く対応することになります。

特に--resources-to-importで指定するJSONについては調べるのが大変で、EventBridgeのルールをインポートする際の指定は何が正しかったのか未だによく理解していないままです。 仕様をご存知の方はコメントをぜひお願いします。

[
    {
        "ResourceType": "AWS::Lambda::Function",
        "LogicalResourceId": "SampleLambdaName",
        "ResourceIdentifier": {
            "FunctionName": "sample-lambda-name" // Lambdaは"FunctionName"に名前を指定する
        }
    },
    {
        "ResourceType": "AWS::StepFunctions::StateMachine",
        "LogicalResourceId": "SampleStateMachineName",
        "ResourceIdentifier": {
            "Arn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXX:stateMachine:state-machine-name" // StepFunctionsは"Arn"にARNを指定する
        }
    },
    {
        "ResourceType": "AWS::Events::Rule",
        "LogicalResourceId": "SampleEventBridgeTriggerRuleName",
        "ResourceIdentifier": {
            "Arn": "sample_event_bridge_trigger_rule_name" // EventBridgeは"Arn"にARNを指定してはいけない、ARNではエラーになる
        }
    }
]

ところでSAMがサポートしているAWSのリソースにはAWS::Serverless::Function、AWS::StepFunctions::StateMachine、AWS::Events::Ruleは含まれているものの、Glueジョブは含まれていません。

docs.aws.amazon.com

SAM非対応のリソースでは扱えないのかと思いきや、SAMのテンプレートYAMLは結局はCloud FormationのYAMLへ変換されるため普通にCloud Formationの記法をそのままSAMのYAMLへ書いてしまえば問題ありません。

AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: >
  rds-snapshot-import-batch
  Workflow batch to export RDS snapshot to S3
Resources:
  # Step Functiosn
  ExportRdsSnapshotWorkflow:
    Type: AWS::Serverless::StateMachine
    DeletionPolicy: Retain
    Properties:
      Name: export-rds-snapshot-workflow
      DefinitionUri: statemachine/export-rds-snapshot-workflow.asl.json
      Role: arn:aws:iam::XXXXXXXXXXXX:role/service-role/sample-stepfunctions-role

  # Lambda、SAMによる記法
  ExportRdsSnapshotToS3:
    Type: AWS::Serverless::Function
    DeletionPolicy: Retain
    Properties:
      FunctionName: export-rds-snapshot-to-s3
      CodeUri: functions/export-rds-snapshot-to-s3/
      Handler: lambda_function.lambda_handler
      Runtime: python3.9
      Architectures:
        - arm64
      MemorySize: 128
      Timeout: 30
      Role: arn:aws:iam::XXXXXXXXXXXX:role/service-role/sample-lambda-role

  # (エクスポートジョブの進捗確認Lambdaは省略)

  # Glue Job、ここに一緒に並べてしまってOK
  SampleTransformRdsParquet:
    Type: AWS::Glue::Job
    Properties:
      Name: SampleTransformRdsParquet
      Description: Transform parquet of SAMPLE table
      GlueVersion: '3.0'
      MaxCapacity: 5
      Command:
        Name: glueetl
        PythonVersion: '3'
        ScriptLocation: glue/sample_transform_rds_parquet/app.py
      Role: arn:aws:iam::XXXXXXXXXXXX:role/sample-glue-role
      DefaultArguments:
        '--enable-metrics': true
        '--enable-continuous-cloudwatch-log': true
        '--enable-spark-ui': true
        '--job-bookmark-option': 'job-bookmark-disable'
        '--enable-job-insights': true
        '--class': 'GlueApp'

こうしてSAMによるデプロイが可能になったため、GitHub ActionsでCI/CDを設定します。 CI/CDを実行するIAMロールはそれぞれ向けにOpenID Connectを利用して用意しました。

docs.aws.amazon.com

CIでは pysen run lintsam buildcloud formation validate をし、CDでは sam buildsam deploy をしています。

最終成果物とシステム運用

上記の一通りの処理をまとめてSAMで管理することにより、データ基盤へRDSのデータを収集する処理を自動化することができました。

もし新たなAWSアカウントにあるRDSからデータを抽出する必要がでたときでも、下記の手続きで対象RDSの追加ができます。

  1. 抽出元のAWSアカウントにてIAM Roleを作ってもらう
  2. S3バケット、KMS、抽出先IAMのポリシーに作成してもらったIAM Roleの追記
  3. SAMのテンプレファイルにEventBridgeトリガーの追記
  4. (加工が必要であれば)加工処理に合わせたGlueスクリプトの追加

別リソースともあわせた管理の都合上ポリシー編集はTerraformで行い、こちらもCI/CDを行っています。

このため大体30行くらいの変更PR 1つを作り、マージすればあとは自動で翌日には新たなRDSデータが抽出できています。 テーブル追加の場合はもっと簡単で、抽出対象を指定している設定ファイルにテーブル名を1行足せば済みます。

データ収集バッチの運用フロー

課題

ここまで長々と説明してきたRDSのスナップショット抽出バッチですが、課題はいくつもあります。

parquetのデータ型

抽出結果がparquet形式であるため、元データがDATETIME型でもTIMESTAMP型でもparquetでは同じ型になります。

事前に調べたところ社内で扱うDBの中でTIMESTAMP型をほとんど使わない事がわかっていたため、とりあえず全ての時刻型をDATETIME型として扱っています。

またparquetのメタデータはGlueジョブでは自由に指定できますが、スナップショットからのエクスポートでは指定ができません。

例えばPostgreSQLのスナップショットからエクスポートしたparquetは大半のデータ型がSTRING型扱いになります。 これは特に日付・時刻型カラムでは致命的で、元のカラムが文字列型なのか日付・時刻型だったのか判別できません。

さらにスナップショットのエクスポート時には圧縮形式を選ぶこともできないため、すべてgzip圧縮になります。

データの更新頻度

冒頭にも書いたとおり、データ抽出の方式ではデータの鮮度が課題になります。 現在はシステムスナップショットを扱っているためデータの更新頻度が1日1回に限定されています。

スナップショットをとる処理を追加し、頻度をあげてもよいのですがエクスポート段階で処理時間に1~2時間かかってしまいます。

この問題を解決するためであれば、DBへの外部接続やChange Data Captureを検討したほうが良いでしょう。

ーーという内容で記事を執筆している間にAWSからAuroraの新機能が発表されていました。

aws.amazon.com

こちらの機能を利用すればスナップショットをとる手間を要せず、クラスターに負荷をかけることもなく、parquetをS3へエクスポートできるようになります。 非AuroraなDBについては従来通りスナップショットを扱うとして、Auroraについてはこちらへ移行するだけで更新頻度向上が見込めます。

最後に

まだまだ課題もありますが、このようにして複数のAWSアカウントにあるRDSの中のデータを一箇所へ集めるシステムが構築できました。 改善すべき項目は多々ありますし、アプリケーションログなど他の種類のデータも集めていかなければなりません。

ブックウォーカーではデータ基盤を整え、誰もがデータ分析を行えるデータの民主化を推進しています。 電子書籍ストア、マンガ連載サービス、読書SNSなど出版分野のデータを分析してみたい方はぜひブックウォーカーの採用情報ページからご応募ください。 我々の構築したデータ基盤を使い倒してくれる方を大募集中です。