embulk-output-multiを作った
前職の同僚の @mtsmfm さんがつぶやいていたので、勢いで作ってみました。
embulk で 1 つの input から複数の output に繋ぐみたいなことはできるのだろうか
— Fumiaki MATSUSHIMA (@mtsmfm) 2019年2月25日
使い方
2019/03/11時点の最新版は 0.4.0
です
outputs
に複数のoutputの設定をリスト形式で記述するだけです、簡単ですね。
in: type: ... out: type: multi outputs: - type: stdout - type: file path_prefix: out_file_ file_ext: csv formatter: type: csv - type: s3 ...
注意
エラーハンドリングについて
- 複数プラグインのどれかが
transaction
(orresume
) でエラーになった場合は、安全側に倒してその後のすべてのプラグインのopen
は実行されずに終わります。 - すべてのプラグインが
open
まで到達した場合、outputの処理が順次実行されていきますが、もし途中で一つでも失敗したアウトプットがあった場合、そのタスクを失敗とみなして例外を投げます。
ConfigDiffに関して
現在は各プラグインのConfigDiffを <plugin_type>_<index_in_outputs>
のタグを付けてMapで保存しています。例えば、上記の例で言うと最初のstdoutはstdout_0
という名前が付きます。つまり一回ConfigDiffを出力したあとにconfig.ymlを書き換えて順番を変えたり違うプラグインに書き換えたりすると、Diffがマージされない、もしくは違うDiffがマージされてしまう可能性があります。ConfigDiffを使う場合は、一回実行した outputs
の順番は変えない方が良いでしょう。
内部実装的な苦労話
通常のEmbulkのOutput pluginの実行フローは以下のようになっています.
transaction
メソッドが呼ばれる。このメソッド内で下準備 (認証やパラメータの検証) をする。- OKなら渡されてきた
OutputPlugin.Control
オブジェクトのrun
メソッド (以下コールバック
) を呼びExecutor側に処理を移譲する - Execotorがタスク分割などを実施し、タスクごとに
open
メソッドが呼ばれる。このメソッドで返却したTransactionalPageOutput
が送られてきたデータを処理していく。- タスクの処理が終わると
commit
メソッドが呼ばれタスクごとのTaskReport
を作成し、返す - 失敗したタスクは
abort
が呼ばれる
- タスクの処理が終わると
- 全部のタスクが終わったあと
cleanup
メソッドが実行され、後処理を行う - transactionメソッドは、コールバックから返却されたTask Reportsを元に
ConfigDiff
を作成し、返す
上記の流れを、multiプラグインのトランザクション上で複数のプラグインに対してエミュレートする必要があります。そのため以下のような処理の流れを作りました。フローがとても複雑でマルチスレッドを駆使する必要があったので、難しかったですけど楽しかったですw
- マルチスレッドで各pluginのtransactionを実施しつつ、ダミーのコールバックを渡す
- すべてのプラグインのtransactionの検証が終わったあと (= ダミーのコールバックが呼ばれたあと) に元のコールバックを呼ぶ
- ダミーは、オリジナルのコールバックが終わるまでは各プラグインのtransactionをブロックする必要がある
- 元のコールバックが呼ばれると、multiプラグインのopenが呼ばれる。ここで各プラグインのTransactionalPageOutputを作成し、それぞれのPageOutputに処理(add, finish, closeなど)を移譲する。
- 元のcleanupが実行される、保持しておいた個々のTaskReportを復元して、各プラグインのcleanupを実行する
- すべてのタスク終わったあとに。ダミーのコールバックのブロックを解除する。TaskReport内に個々プラグインのTaskReportが保持されているので、復元して個々のプラグインに渡す。
- 返したTaskReportを元に個々のプラグインのtransactionがConfigDiffを返す。それを集めてmultiプラグインのConfigDiffとして返す