こんにちは、メディアサービス開発部サービス分析課の佐藤です。ブックウォーカー社で全社横断のデータ基盤を構築しています。
以前投稿した記事「ブックウォーカー社のデータ基盤について(2023秋)」にも書いた通り、現在ブックウォーカー社におけるデータ基盤はいくつかの課題を抱えています。
その一つとして、FluentBitからPub/Subを経由してBigQueryへログを送る「ストリーミングデータ投入システムの構築」がありました。 今回の記事ではそのストリーミングデータ投入に関して、開発の都合上で必要となったLuaJITの自動テスト環境を整えるまでの試行錯誤を書きました。
発端
前回の記事を書いた当初はPub/SubのカスタムスキーマとBigQuery Subscriptionを利用したuse topic schema
を採用する計画でした。しかし、今年2024年1月22日にuse table schema
という機能がPub/Subからリリースされました。
リリースノート | Pub/Sub Documentation | Google Cloud
use topic schema
オプションでは、AvroまたはProtocolBufferの2種類どちらかで設定したカスタムスキーマに対してログを送ると、BigQueryのテーブルでそのデータ型に対応したフィールドに格納することができます。しかし、このオプションはBigQueryにおけるテーブルスキーマとPub/Subにおけるカスタムスキーマという2つのスキーマの二重管理と、その間の変換処理がネックでした。
昨年はこのuse topic schema
を利用したストリーミングデータ投入にチャレンジしていましたが、どうしてもデータ型変換の実装を進められず、断念することになりました。
どうしようかと途方に暮れていたその真っ只中で上記のuse table schema
リリースがあり、即座に検証を行いました。
use table schema
を使ったサブスクリプションではJSON形式の文字列をメッセージとしてPub/Subトピックに投入しておけば、そのJSONに対応したBigQueryテーブルにログをPushすることができます。
ただし、JSON形式の文字列ということでデータの表現はJSONに準拠することとなります。JSONが基本型としてサポートしているのはstrings, numbers, booleans, nullの4種類のみであり、それ以外のデータ型は適切に変換する必要があります。
データ基盤において、上記4種類以外に頻出するデータ型といえば日付・時刻型ではないでしょうか。その他のデータ型については一旦は文字列として格納してから後に変換処理をすることでも対応できます。しかし、データのパーティショニングの都合もあるため日付型だけでもデフォルトでサポートしておきたいところです。
use table schema
オプションを利用した場合、DATE・DATETIME・TIME・TIMESTAMP型はそれぞれ変換処理を行った整数型としてJSON文字列の内部に格納する必要があります。
BigQuery のデータ型 | 整数表現 |
---|---|
DATE | Unix エポックである1970 年 1 月 1 日からの日数 |
DATETIME | CivilTimeEncoder を使用して常用時として表された日時(マイクロ秒単位) |
TIME | CivilTimeEncoder を使用して常用時として表された時間(マイクロ秒単位) |
TIMESTAMP | Unix エポックである1970 年 1 月 1 日 00:00:00 UTC からのマイクロ秒数 |
DATE型やTIMESTAMP型については単位(日数・マイクロ秒数)に気をつける必要があるものの、基本的にはUnixエポック時間からの引き算ですみます。 しかし、DATETIME型やTIME型で扱っているCiviTimeEncoderについては話が別です。これは究極的にはGoogleが公開しているSQLパーサー・アナライザーであるZetaSQLの内部実装を理解する必要があります。
ZetaSQLのリポジトリにあるJavaのコードを読めば分かりますが、CivilTimeEncoderは内部でビット演算を行って任意の時刻と整数を互いに変換する処理が実装されていました。 このため、FluentBitからPub/Subを経由してBigQueryへログを送るためにビット演算が必要となります。
Lua filter in FluentBit
上記の背景があり、FluentBit内でDATETIME型・DATE型を元にしたビット演算をする必要が出てきました。それと、JSONは文字列としてPub/Subへ送る必要があります。 これらの処理はFluentBitにおいてLua filterを使うことで対応できます。
Lua filterはFluentBit内部で扱うログレコードを受け取るLuaの関数を定義しておくことで、ログに対して任意の処理を実行できます。
Luaのコードとしてビット演算の処理を書いたり、JSONの文字列化の処理を書けばuse table schema
に対応したログをPub/Subトピックへ送ることができます。
Lua filterを利用する際にはFluentBitのconfファイルに下記のようにFILTER部分を追加するだけで済みます。
[FILTER] Name lua script fluentbit_pubsub.lua call transform_log
ちなみにビット演算の処理とJSON文字列化処理は実際には別々のコードで実装されており、ここではfluentbit_pubusb.lua
で両方のコードを呼び出しています。
Lua filterを2つにわけず一つにまとめたのはデータ型の精度落ちを防ぐためです。
ビット演算処理部分でで64ビット整数を利用しているのですが、一度Lua filter外に出ると精度落ちが発生してしまいます。
Luaコードの中で64ビット整数型を維持したままで取り回すことで数値の精度を落とすことなく扱うことができます。
上記の経緯からビット処理やJSONの文字列化など、Luaのコードで複雑な処理を行う必要が出てきました。 このLuaコードを開発するにあたり、テストを用意したいというのは自然な流れではないでしょうか。 もともとFluentBitに関わる処理としては、Dockerイメージのビルドと設定ファイル読み込みのdry-runまではCIで用意されていました。 そこで設定ファイルの確認とLuaコードのテストまでをまとめてCIで実行しようと考えました。
前置きが長くなってしまいましたが、この「FluentBitのCIへLuaのテストを追加しよう!」というのが当記事の本題です。
FluentBitにおけるLua処理系
まず、今回は都合によりFluentBit v1.9.10を前提として話を進めていきます。記事執筆にあたって最新のv3.0も軽く確認しましたが、事情はそう大きく変わらないと思っています。 FluentBit v1.9.10ではLua filterにLuaJIT 2.1.0-beta3を利用しています。
LuaJITというのはLua処理系の一つです。名前の通りJITを利用して高速化が図られ、LuaJIT 2.1はLua5.1互換となっています。
Luaにビット演算用の記法が導入されるのは5.3以降です。LuaJITでは代わりに特殊な拡張機能としてBitOpという拡張モジュールが追加されています。このBitOpsモジュールを利用すれば、ビット演算を使った処理を自然な形で実装できます。
また、BitOpにおいて多倍長整数が必要になった場合はLua標準のnumber
ではなくint64_t
という型が使えますが、これもやはりLuaJIT拡張機能のffiという拡張モジュールを利用します。
このBitOpを使えば例えばCivilTimeEncoderにあるビット演算が必要なJavaコードをLuaでもできるだけ再現して書けるようになります。
下記のJavaコードは筆者がCivilTimeEncoderから必要になる処理を抽出して関数1つにまとめたものです。
private static final int MICRO_LENGTH = 20; private static final int SECOND_SHIFT = 0; private static final int MINUTE_SHIFT = 6; private static final int HOUR_SHIFT = 12; private static final int DAY_SHIFT = 17; private static final int MONTH_SHIFT = 22; private static final int YEAR_SHIFT = 26; // datetime型 → long型 変換処理をまとめたもの public static long encodeDatetimeMicros(LocalDateTime datetime) { // encodePacked32TimeSeconds 相当の処理 LocalTime time = dateTime.toLocalTime(); long bitFieldTimeSeconds = 0x0L; bitFieldTimeSeconds |= time.getHourOfDay() << HOUR_SHIFT; bitFieldTimeSeconds |= time.getMinuteOfHour() << MINUTE_SHIFT; bitFieldTimeSeconds |= time.getSecondOfMinute() << SECOND_SHIFT; // encodePacked64DatetimeSeconds 相当の処理 long bitFieldDatetimeSeconds = 0x0L; bitFieldDatetimeSeconds |= (long) dateTime.getYear() << YEAR_SHIFT; bitFieldDatetimeSeconds |= (long) dateTime.getMonthOfYear() << MONTH_SHIFT; bitFieldDatetimeSeconds |= (long) dateTime.getDayOfMonth() << DAY_SHIFT; bitFieldDatetimeSeconds |= (long) bitFieldTimeSeconds; // encodePacked64DatetimeMicros 相当の処理 return (bitFieldDatetimeSeconds << MICRO_LENGTH); }
下記のLuaコードはJavaコードをLuaJITで再現したものです。
local bit = require("bit") local bor = bit.bor local lshift = bit.lshift local ffi = require("ffi") local uint64_t = ffi.typeof("uint64_t") local MICRO_LENGTH = 20 local SECOND_SHIFT = 0 local MINUTE_SHIFT = 6 local HOUR_SHIFT = 12 local DAY_SHIFT = 17 local MONTH_SHIFT = 22 local YEAR_SHIFT = 26 -- datetime は os.date('*t', timestamp) 等で得るtable型 -- timestamp は FluentBitから渡るナノ秒精度の浮動小数点を表すnumber型 function encode(datatime, timestamp) bit_field = 0x0ull bit_field = bor(bit_field, lshift(uint64_t(datetime.sec) , SECOND_SHIFT)) bit_field = bor(bit_field, lshift(uint64_t(datetime.min) , MINUTE_SHIFT)) bit_field = bor(bit_field, lshift(uint64_t(datetime.hour) , HOUR_SHIFT )) bit_field = bor(bit_field, lshift(uint64_t(datetime.day) , DAY_SHIFT )) bit_field = bor(bit_field, lshift(uint64_t(datetime.month), MONTH_SHIFT )) bit_field = bor(bit_field, lshift(uint64_t(datetime.year) , YEAR_SHIFT )) -- マイクロ秒を取得するため、小数第一位から第六位までを整数として得る micro = math.floor((timestamp - math.floor(timestamp)) * 10^6) bit_field = bor(lshift(bit_field, MICRO_LENGTH), uint64_t(micro)) return bit_field
コード上では再現できているように見えますが、実際のところどうでしょうか? やはり処理結果が望んだ形で得られるのかどうか、テストが必要そうです。
Luaテストフレームワーク、busted
Luaでテストをどうやって実行するか調べたところ、bustedというユニットテスト用のフレームワークを見つけました。
これを使えばCivilTimeEncoderでのビット演算処理をテストすることができそうです。
単純にbustedを利用するだけであれば、luarocksというLua向けパッケージマネージャーを使って簡単にインストールができます。
この調子でbustedを使ってencode
関数をテストすればよさそうです。CivilTimeEncoderに基づいて事前に計算しておいた数値をテストケースに書き出してみました。
describe("CivilTimeEncode", function() it("should encode a civil time as minimum", function() local test_input = { year = 1, month = 1, day = 1, hour = 0, min = 0, sec = 0, microsecond = 0} local datetime = test_input local timestamp = test_input.microsecond / 10^6 local encoded = civil_time.encode(datetime, timestamp) assert.is_equal(encoded, 74904229642240ULL) end) it("should encode a civil time as maximum", function() local test_input = { year = 9999, month = 12, day = 31, hour = 23, min = 59, sec = 59, microsecond = 999999 } local datetime = test_input local timestamp = test_input.microsecond / 10^6 local encoded = civil_time.encode(datetime, timestamp) assert.is_equal(encoded, 703674213004689983ULL) end) end)
しかし、上記のコードをテストしようとすると下記のエラーが発生しました。
$ busted test.lua ✱ 0 successes / 0 failures / 1 error / 0 pending : 0.003779 seconds Error → test.lua @ 1 suite test.lua test.lua:1: error loading module 'civil_time' from file './civil_time.lua': ./civil_time.lua:18: malformed number near '0x0u'
どうやらLuaJIT拡張を含んだコードをbustedが解釈できていないようです。 というのも、素直にluarocksを用意してbustedをインストールするとLuaJITではなくLua 5.4向けのテスト環境となるためです。
今回FluentBitのLua filter向けコードをテストするにあたって、LuaJITの自動テストを用意する方法を探しました。 しかし、インターネットですぐに見つかる情報はどれもLua向けのテストについてばかりでした。 そこでビット演算を行うBitOpsや多倍長整数を取り扱うffiを利用したLuaJITのコードをテストする方法を検証することとなりました。
ローカル環境での動作検証
最終的にはGitHub ActionsのCIで自動テストすることを目標としますが、一旦は手元のMacbookPro(M1)で動作確認を行います。 上記エラーに出くわした流れは、大雑把には下記のコマンドを順に実行していったものです。
$ brew install luajit luarocks
$ luarocks install busted
$ busted test.lua
まず最初に、bustedから利用するLuaを指定できないかを調べました。
ドキュメントを見るとコマンドオプションとして--lua=LUA
があり、busted実行時のLuaインタプリタのパスを指定できるようです。
また、bustedにはconfigファイルを指定するオプションがあり、標準では.busted
というファイルを記述することで自動的に設定が読み込まれます。
ここでbusted実行に使用するオプションをこのファイルに書き、LuaインタプリタをLuaJITに指定できないかと試しました。
return { default = { root = '.', lua = 'luajit' } }
しかし、結果は変わらず0x0ull
を解釈できないというエラーのままでした。コマンドオプションから直接--lua
を指定しても同様です。
GitHubで検索する限りではこの.busted
にLuaJITを指定しているリポジトリもあるようなのでなにかを間違えてるだけかもしれません。
bustedからLuaインタプリタを変更する方法については結局わからないままだったため、どなたか教えてもらえると嬉しいです。
次に、bustedをインストールするluarocksをLuaJIT版にできないかを検討しました。 一旦MacからluarocksとLuaJIT以外のすべてのLuaをアンインストールし、LuaJITだけが残った状態で再度インストールしてみました。 しかし、luarocksをインストールする時点で最新のLuaも同時にインストールされるようです。 仕方がないのでluarocksをインストールした後にluarocksで利用するLuaインタプリタを変更できないかを調べることにしました。 luarocksコマンドを無引数・無オプションで実行してみるとオプションとサブコマンドだけでなく、設定状況も確認することができます。
$ luarocks Usage: luarocks [-h] [--version] [--dev] [--server <server>] [--only-server <server>] [--only-sources <url>] [--namespace <namespace>] [--lua-dir <prefix>] [--lua-version <ver>] [--tree <tree>] [--local] [--global] [--no-project] [--force-lock] [--verbose] [--timeout <seconds>] [<command>] ... LuaRocks 3.11.0, the Lua package manager /opt/homebrew/bin/luarocks - LuaRocks main command-line interface Options: -h, --help Show this help message and exit. # (オプション一覧は省略) --timeout <seconds> Timeout on network operations, in seconds. 0 means no timeout (wait forever). Default is 30. Commands: help Show help for commands. # (サブコマンド一覧は省略) write_rockspec Write a template for a rockspec file. Variables: Variables from the "variables" table of the configuration file can be overridden with VAR=VALUE assignments. Configuration: Lua: Version : 5.4 LUA : /opt/homebrew/opt/lua/bin/lua5.4 (ok) LUA_INCDIR : /opt/homebrew/opt/lua/include/lua5.4 (ok) LUA_LIBDIR : /opt/homebrew/opt/lua/lib (ok) Configuration files: System : /opt/homebrew/etc/luarocks/config-5.4.lua (ok) User : /Users/sato-i/.luarocks/config-5.4.lua (not found) Rocks trees in use: /Users/sato-i/.luarocks ("user") /opt/homebrew ("system")
この時点ではluarocksがまだ最新のLua 5.4を向いています。 LuaJITはLua5.1互換なので、まずこれをLua5.1を利用するように変更します。
$ luarocks config lua_version 5.1
次にluarocksの設定ファイルを書き換えます。上記のコマンド出力にあるConfiguration files
です。
このファイルに書いた指定により、Configuration:
以下の Lua:
を書き換えることができます。
既存のLua 5.4向け設定があるパスの指定にならって~/.luarocks/config-5.1.lua
を作成し、brewでインストールしたLuaJITのパスを調べて記述していきます。
variables = { LUA = "/opt/homebrew/bin/luajit"; LUA_INCDIR = "/opt/homebrew/include/luajit-2.1"; LUA_LIBDIR = "/opt/homebrew/lib/lua"; }
この状態で再度luarocksコマンドを実行すれば、設定が書き換わっていることを確認できます。
$ luarocks # (前半部分を省略) Variables: Variables from the "variables" table of the configuration file can be overridden with VAR=VALUE assignments. Configuration: Lua: Version : 5.1 LUA : /opt/homebrew/bin/luajit (ok) LUA_INCDIR : /opt/homebrew/include/luajit-2.1 (ok) LUA_LIBDIR : /opt/homebrew/lib (ok) Configuration files: System : /opt/homebrew/etc/luarocks/config-5.1.lua (not found) User : /Users/sato-i/.luarocks/config-5.1.lua (ok) Rocks trees in use: /Users/sato-i/.luarocks ("user")
この状態で再度bustedをインストールし直してみます。
すると、LuaJIT拡張を使ったテストコードをbustedで動かすことができました。
念の為、ただ単にLua 5.1が動いているのではなく、正しくLuaJITが動いているということをテストコードから確認してみます。
Luaコードの中から、実行中の環境について調べるには _VERSION
という変数と、 jit
という変数について確かめれば良いようです。
describe("Tet", function() it("test", function() print(_VERSION) if jit then print("This is LuaJIT") else print("This is not LuaJIT") end assert.is_equal(17, 0x11) end) end)
$ busted test.lua Lua 5.1 This is LuaJIT ● 1 success / 0 failures / 0 errors / 0 pending : 0.004673 seconds
これでLuaJIT版のluarocksからインストールしたbustedを使えば、LuaJIT拡張を含んだテストを実行することができるということが確かめられました。
ちなみに、今回は自動テストのための検証ということで手元のPCのluarocks環境を実際に変えてしまいました。 Luaを扱う人によってはLuaバージョンごとに異なるluarocksを用意したい人もいると思います。 そういった場合はluajit-rocksを利用してそれぞれのluarocksをビルドしておくことで対応できるようです。 https://github.com/torch/luajit-rocks
LuaJIT拡張を使ったコードをテストする方法がわかったところで、このテストをCI環境で自動化していくことについて考えていきます。
自動テスト
ここまでの流れをGitHub Actions上で再現し、LuaJit版bustedをCIで実行できればゴールです。 しかし、ビルド済みパッケージのインストールならまだしも、Actionの中で毎回ビルドするのは避けたいところです。 busted公式からもGitHub Actionsが提供されていますが、これもやはりLua5.4向けが使われているため今回は使えません。 GitHub ActionsでLuaJIT対応bustedを利用するためには自前で環境を用意する必要がありそうです。 幸いGitHub Actionsはジョブ定義にコンテナを指定すれば、そのコンテナ内部でジョブを実行することができます。
このコンテナ環境としてLuaJIT版bustedがインストール済みのコンテナイメージをあらかじめ用意しておけば、あとはworkflowのyamlにrun busted test.lua
と書くだけで済みます。
下記のDockefileを使うことでluajit対応bustedを扱えるコンテナイメージをビルドできます。
FROM nickblah/luajit:2.1.0-beta3-luarocks-jammy RUN curl -fsSL https://deb.nodesource.com/setup_20.x | bash - && \ apt-get update && \ apt-get install -y dumb-init gcc libc-dev nodejs git RUN luarocks install busted RUN busted --version ENTRYPOINT ["busted", "--lua=luajit-2.1.0-beta3","--verbose", "--output=gtest"]
GitHub Actionsから利用するために、このコンテナイメージをghcr.ioへプッシュしておきます。 特に非公開にすべき内容はなく、イメージ利用時の認証が不要になるため公開範囲はpublicとしました。
bustedインストールにあたってdumb-init
, gcc
, libc-dev
を事前にインストールする必要がある点はbusted公式Dockerfileを参考にしました。
ベースイメージの選択やgitとNode.jsインストールについてはGitHub Actions上での取り回しの都合です。
そのまま actions/checkout
を動かせるようになりますし、もし何か追加でパッケージが必要になったときもalpineより書き換えやすいと思っています。
上記Dockefileでビルドしたイメージを利用することで、GitHub Actionsで下記の通り簡単にCIを設定できました。
name: "[CI] LuaJIT Test sample" on: pull_request: permissions: id-token: write contents: read pull-requests: write jobs: lua: name: "Lua Test" runs-on: ubuntu-latest container: image: ghcr.io/ragi256/luajit-busted:latest steps: - uses: actions/checkout@v4 - name: Run Busted run: busted test.lua --output=TAP
これで、今後FluentBitのLua filterに使うためのLuaコードを修正したさいには自動テストが実行され、安心して品質の高い開発をすることができるようになります。
まとめ
FluentBitでLua filter用のLuaJITコードを実装した際に、LuaJIT版luarocksでbustedをインストールした環境を用意することで自動テストできるという話でした。 開発当時、LuaJITのテストに関する知見を見つけられなかったためこの記事を執筆しました。この記事がLuaJITを扱う誰かの手助けになれば幸いです。
ブックウォーカーではデータ基盤を整え、誰もがデータ分析を行えるデータの民主化を推進しています。 電子書籍ストア、マンガ連載サービス、読書SNSなど出版分野のデータを扱うデータ基盤を一緒に作りませんか?
出版文化からイノベーションを生み出すためにも、我々と共にデータ基盤を作り上げてくれるデータエンジニアを募集しています。ぜひブックウォーカーの採用情報ページからご応募ください。