かまたま日記3

プログラミングメイン、たまに日常

embulk-output-multiを作った

前職の同僚の @mtsmfm さんがつぶやいていたので、勢いで作ってみました。

github.com

使い方

2019/03/11時点の最新版は 0.4.0 です

outputs に複数のoutputの設定をリスト形式で記述するだけです、簡単ですね。

in:
  type: ...
out:
  type: multi
  outputs:
    - type: stdout
    - type: file
      path_prefix: out_file_
      file_ext: csv
      formatter:
        type: csv
    - type: s3
      ...

注意

エラーハンドリングについて

  • 複数プラグインのどれかが transaction (or resume) でエラーになった場合は、安全側に倒してその後のすべてのプラグインopen は実行されずに終わります。
  • すべてのプラグインopen まで到達した場合、outputの処理が順次実行されていきますが、もし途中で一つでも失敗したアウトプットがあった場合、そのタスクを失敗とみなして例外を投げます。

ConfigDiffに関して

現在は各プラグインのConfigDiffを <plugin_type>_<index_in_outputs> のタグを付けてMapで保存しています。例えば、上記の例で言うと最初のstdoutはstdout_0という名前が付きます。つまり一回ConfigDiffを出力したあとにconfig.ymlを書き換えて順番を変えたり違うプラグインに書き換えたりすると、Diffがマージされない、もしくは違うDiffがマージされてしまう可能性があります。ConfigDiffを使う場合は、一回実行した outputs の順番は変えない方が良いでしょう。

内部実装的な苦労話

通常のEmbulkのOutput pluginの実行フローは以下のようになっています.

  • transaction メソッドが呼ばれる。このメソッド内で下準備 (認証やパラメータの検証) をする。
  • OKなら渡されてきた OutputPlugin.Control オブジェクトの run メソッド (以下 コールバック) を呼びExecutor側に処理を移譲する
  • Execotorがタスク分割などを実施し、タスクごとに open メソッドが呼ばれる。このメソッドで返却した TransactionalPageOutput が送られてきたデータを処理していく。
    • タスクの処理が終わると commit メソッドが呼ばれタスクごとの TaskReport を作成し、返す
    • 失敗したタスクは abort が呼ばれる
  • 全部のタスクが終わったあと cleanup メソッドが実行され、後処理を行う
  • transactionメソッドは、コールバックから返却されたTask Reportsを元に ConfigDiff を作成し、返す

上記の流れを、multiプラグイントランザクション上で複数のプラグインに対してエミュレートする必要があります。そのため以下のような処理の流れを作りました。フローがとても複雑でマルチスレッドを駆使する必要があったので、難しかったですけど楽しかったですw

  • マルチスレッドで各pluginのtransactionを実施しつつ、ダミーのコールバックを渡す
  • すべてのプラグインのtransactionの検証が終わったあと (= ダミーのコールバックが呼ばれたあと) に元のコールバックを呼ぶ
    • ダミーは、オリジナルのコールバックが終わるまでは各プラグインのtransactionをブロックする必要がある
  • 元のコールバックが呼ばれると、multiプラグインのopenが呼ばれる。ここで各プラグインのTransactionalPageOutputを作成し、それぞれのPageOutputに処理(add, finish, closeなど)を移譲する。
    • ただし Page オブジェクトは使い回せないので、各プラグインごとにコピーする
    • どれかのプラグインが途中で失敗してた場合はmultiのタスク自体は失敗とみなして例外を投げる、その後、全プラグインのabortが呼ばれる
  • 元のcleanupが実行される、保持しておいた個々のTaskReportを復元して、各プラグインのcleanupを実行する
  • すべてのタスク終わったあとに。ダミーのコールバックのブロックを解除する。TaskReport内に個々プラグインのTaskReportが保持されているので、復元して個々のプラグインに渡す。
  • 返したTaskReportを元に個々のプラグインのtransactionがConfigDiffを返す。それを集めてmultiプラグインのConfigDiffとして返す