embulk-output-multiを作った
前職の同僚の @mtsmfm さんがつぶやいていたので、勢いで作ってみました。
embulk で 1 つの input から複数の output に繋ぐみたいなことはできるのだろうか
— Fumiaki MATSUSHIMA (@mtsmfm) 2019年2月25日
使い方
2019/03/11時点の最新版は 0.4.0
です
outputs
に複数のoutputの設定をリスト形式で記述するだけです、簡単ですね。
in: type: ... out: type: multi outputs: - type: stdout - type: file path_prefix: out_file_ file_ext: csv formatter: type: csv - type: s3 ...
注意
エラーハンドリングについて
- 複数プラグインのどれかが
transaction
(orresume
) でエラーになった場合は、安全側に倒してその後のすべてのプラグインのopen
は実行されずに終わります。 - すべてのプラグインが
open
まで到達した場合、outputの処理が順次実行されていきますが、もし途中で一つでも失敗したアウトプットがあった場合、そのタスクを失敗とみなして例外を投げます。
ConfigDiffに関して
現在は各プラグインのConfigDiffを <plugin_type>_<index_in_outputs>
のタグを付けてMapで保存しています。例えば、上記の例で言うと最初のstdoutはstdout_0
という名前が付きます。つまり一回ConfigDiffを出力したあとにconfig.ymlを書き換えて順番を変えたり違うプラグインに書き換えたりすると、Diffがマージされない、もしくは違うDiffがマージされてしまう可能性があります。ConfigDiffを使う場合は、一回実行した outputs
の順番は変えない方が良いでしょう。
内部実装的な苦労話
通常のEmbulkのOutput pluginの実行フローは以下のようになっています.
transaction
メソッドが呼ばれる。このメソッド内で下準備 (認証やパラメータの検証) をする。- OKなら渡されてきた
OutputPlugin.Control
オブジェクトのrun
メソッド (以下コールバック
) を呼びExecutor側に処理を移譲する - Execotorがタスク分割などを実施し、タスクごとに
open
メソッドが呼ばれる。このメソッドで返却したTransactionalPageOutput
が送られてきたデータを処理していく。- タスクの処理が終わると
commit
メソッドが呼ばれタスクごとのTaskReport
を作成し、返す - 失敗したタスクは
abort
が呼ばれる
- タスクの処理が終わると
- 全部のタスクが終わったあと
cleanup
メソッドが実行され、後処理を行う - transactionメソッドは、コールバックから返却されたTask Reportsを元に
ConfigDiff
を作成し、返す
上記の流れを、multiプラグインのトランザクション上で複数のプラグインに対してエミュレートする必要があります。そのため以下のような処理の流れを作りました。フローがとても複雑でマルチスレッドを駆使する必要があったので、難しかったですけど楽しかったですw
- マルチスレッドで各pluginのtransactionを実施しつつ、ダミーのコールバックを渡す
- すべてのプラグインのtransactionの検証が終わったあと (= ダミーのコールバックが呼ばれたあと) に元のコールバックを呼ぶ
- ダミーは、オリジナルのコールバックが終わるまでは各プラグインのtransactionをブロックする必要がある
- 元のコールバックが呼ばれると、multiプラグインのopenが呼ばれる。ここで各プラグインのTransactionalPageOutputを作成し、それぞれのPageOutputに処理(add, finish, closeなど)を移譲する。
- 元のcleanupが実行される、保持しておいた個々のTaskReportを復元して、各プラグインのcleanupを実行する
- すべてのタスク終わったあとに。ダミーのコールバックのブロックを解除する。TaskReport内に個々プラグインのTaskReportが保持されているので、復元して個々のプラグインに渡す。
- 返したTaskReportを元に個々のプラグインのtransactionがConfigDiffを返す。それを集めてmultiプラグインのConfigDiffとして返す
GradleプラグインをGradle community portalにアップロードした
今まで、自作の2つのGradleプラグインはGitHub上のオレオレMavenリポジトリから落とすようにしていたんですが、 buildscript
でそのリポジトリを指定しないと行けなかったり、記述がちょっとだけ面倒だったので、Gradle community portalにアップしてPlugins DSLで書けるようにしました。
アップロードの方法は簡単で、基本的にこちらの手順に従うだけです
- ポータルのアカウントを作る
- APIキーを作成
- Gradle plugin publishing pluginを使って諸々の設定をbuild.gradleに書く (ref)
publishPlugin
タスクを実行
初回のみapprovalが必要でちょっと時間がかかりますが、自分の場合は半日くらいで承認されました。
JUnit 5 入門
そろそろ使ってみるかということで入門してみました。
JUnit Jupiter
こちらにも書かれてますが、JUnit 5は複数のサブプロジェクトからなり、JUnit 5でテストを書いたり拡張機能を書くためのクラスはJUnit Jupiterというプロジェクトにあります。なので、テストを書く場合は org.junit.jupiter
配下の各種ライブラリーをインポートして使うことになります。
Gradle から使う
最低限以下の記述が必要になります。 (Gradle 4.6以上が必要です)
dependencies { testCompile('org.junit.jupiter:junit-jupiter-api:5.3.2') testRuntime('org.junit.jupiter:junit-jupiter-engine:5.3.2') } test { useJUnitPlatform() }
junit-jupiter-api
は JUnit Jupiter のテストを書くのに必要なクラス、アノテーション群があります。testCompile
で指定します。junit-jupiter-engine
は JUnit Jupiter のテストを実行するためのTestEngine
実装です。testRuntime
で指定します。test
タスク内のコンフィグレーションでuseJUnitPlatform
を指定することで、JUnit 5のプラットフォームを使うように宣言します。
テストを書く、実行する
基本的な書き方はJUnit 4までと同じで、テストしたいメソッドに @Test
のアノテーションを追加します。アサーションは org.junit.jupiter.api.Assertions
クラスにJUnit 3まででお馴染みの assertEquals
などの基本的なアサーションメソッドがあるので、それを使います。JUnit 4時代の assertThat
や、AssertJなどの別のアサーションライブラリを使いたい場合は、別途HamcrestやAssertJをインストールして使うことができます。
package com.example.project; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; class SampleTests { @Test void onePlusOneEqualsTwo() { assertEquals(2, 1 + 1); } }
実行はGradleのテストタスクで実行します。
$ ./gradlew test > Task :test com.example.project.SampleTests > onePlusOneEqualsTwo() PASSED BUILD SUCCESSFUL in 1s 3 actionable tasks: 2 executed, 1 up-to-date
ちょっと高度な使い方集
BeforeとかAfterとか
@Before
@After
は @BeforeEach
@AfterEach
に、 @BeforeClass
@AfterClass
は @BeforeAll
, @AfterAll
に置き換えられました。
package com.example.project; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; class DBTests { @BeforeAll static void initializeDB() { System.out.println("Initializing Database..."); } @AfterAll static void deleteDB() { System.out.println("Deleting Database..."); } @BeforeEach void insertData() { System.out.println("Inserting test data..."); } @AfterEach void clearData() { System.out.println("Clearing test data..."); } @Test void testWithDB() { System.out.println("Testing..."); } }
Parameterized Test
junit-jupiter-params のライブラリをインストールした上で @ParameterizedTest
をテストメソッドに追加します。パラメタのソースは、簡易的には @CsvSource
でCSV文字列で指定できます。もうちょっと高度にやりたい場合は @ArgumentsSource
アノテーションを使うことで独自の引数のProviderを指定することができます。
dependencies { .... testCompile('org.junit.jupiter:junit-jupiter-params:5.3.2') .... }
package com.example.project; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import static org.junit.jupiter.api.Assertions.assertEquals; class ParameterizedTests { @ParameterizedTest(name = "{0} + {1} = {2}") @CsvSource({ "0, 1, 1", "1, 2, 3", "49, 51, 100", "1, 100, 101" }) void testsForPlus(int first, int second, int expected) { assertEquals(expected, first + second, first + " + " + second + " should equal " + expected); } }
JUnit 4のテストを実行する
junit-vintage-engineを testRuntime
でインストールします。これはJUnit 4以前のテストを実行するためのTestEngine
の実装です。
dependencies { ... testCompile "junit:junit:4.12" testRuntime "org.junit.vintage:junit-vintage-engine:5.3.2" ... }
package com.example.project; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import org.junit.Test; public class JUnit4Tests { @Test public void test() { assertThat(1 + 2, is(3)); } }
拡張機能
JUnit 4の @Rule
のような拡張機能は任意の Extension
classを実装することで実現できます。
詳しくは この辺 参照。
package com.example.project; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.platform.commons.support.AnnotationSupport; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import java.util.Optional; class ExtensionTests { @Test @MyExtension("FOO") void extensionTest() { System.out.println("Running a test..."); } @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @ExtendWith(MyExtensionImpl.class) private @interface MyExtension { String value(); } private static class MyExtensionImpl implements BeforeEachCallback { @Override public void beforeEach(ExtensionContext context) { final Optional<MyExtension> annotation = AnnotationSupport.findAnnotation(context.getTestMethod(), MyExtension.class); System.out.println(String.format("Running my extension with %s...", annotation.get().value())); } } }
参考
gradle-embulk-plugin v0.3.0 リリース
Release v0.3.0 · kamatama41/gradle-embulk-plugin · GitHub
gem
, gemPush
のタスクの内容を最新のEmbulkのものに追従しました。詳しい使い方は以下の記事を参照ください。
JavaでRubyのeach_sliceがしたい
each_sliceというのは配列を指定した要素数の配列に分ける処理です。リストの中身をn件ごとに処理するときに便利です。 Javaには同様の処理が(たぶん)標準APIには無いので、こんな感じで行けそうです。
import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; public class Test { public static void main(String[] args) { // [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10]] System.out.println(slice(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3)); } private static <T> List<List<T>> slice(List<T> list, int n) { final int resultSize = (int) Math.ceil((double) list.size() / n); return IntStream.range(0, resultSize) .mapToObj(i -> list.subList(n * i, Math.min(list.size(), n * (i + 1)))) .collect(Collectors.toList()); } }
参考: Is there a way to do the Ruby each_slice in Java 8? - Stack Overflow
アメリカに行ってきた
先週1週間会社の出張でアメリカ(マウンテンビュー)に行ってきました。海外は前職でのフィリピン出張以来2年ぶりです。
飛行機
行きは羽田からサンフランシスコ国際空港(SFO)の直行便で9時間ほど、帰りは後述の理由からSFOから香港国際空港(HKG)経由の成田への便でした、香港での待ち時間含めて22時間くらい。中国は初上陸でしたが、HKGでWi-Fi使う分には普通にTwitterできました。
香港国際空港着、空港内のWi-FiからTwitterはみれる模様。
— かまたま (@kamatama_41) September 1, 2018
気候
滞在した一週間は雨も降らず、サンフランシスコもマウンテンビューも最低10℃最高25℃くらいで朝は肌寒く日中は過ごしやすい、過ごしやすい気候でした。"ベイエリアは湿度が低いので日本ほど暑く感じない" というのをよく聞いていましたが、実感しました。
食事
基本的にアメリカ基準のサイズですべてが大きかったです。会社のランチはEAT Clubというのを使っているのですが、サラダ+主菜
で注文したらサラダが直径25cmくらいのボウルに並々入っていて単体でお腹いっぱいになる量でした。結局食べきれず、持って帰って夜食べていました...w
あと、マウンテンビュー最後の夜にSushitomiという店で寿司を食べましたが、日本のクオリティで普通に美味しかったです。近くのshalalaというラーメン屋も美味しいらしいので、今度MVに行った際は寄ってみたいです。
仕事
1週間と短い間の滞在で時差ボケもありあまり効率的に作業ができたとは言いがたかったですが、直属の上司やプロダクトマネージャー、QAチームなどと初めて直接顔を合わせてコミュニケーションできたので、今後の仕事のやり易さに繋がればなあと思います。
歯の詰め物が取れた
3日目に歯の詰め物が取れてしまいました。治療したかったのですが、HRの方とも相談して日本に帰国後治療することに。IT健保の場合、海外での医療費は後で申請すれば返金してもらえるようなのですが*1、日本の治療費基準で返金されるようなので、日本の3~10倍ほどすると言うUSでの歯科治療費を考えると、申請する手間と割にに合わないと判断しました。
Uber, Airbnb
今回の出張で初めてUberとAirbnbを利用したのですが、なるほどこれは便利だなあという感じでした。特にUberは精算の手間も無くなるし日本でもUberもしくは同様なサービスが流行ってほしいなと思いました。
コンピュータ歴史博物館
マウンテンビューを経つ直前に行ってきました。そろばんから現代のコンピュータまでの計算機の歴史が一同に介しており、とても見ごたえがありました。時間がなく駆け足で見ることになってしまったので、また行ってじっくり見たいです。
US出張の最後に心ばかりの観光を (at @ComputerHistory Museum in Mountain View, CA) https://t.co/zOKJWTZ7Pt pic.twitter.com/y16DSZ97w9
— かまたま (@kamatama_41) 2018年8月31日
余談
出張後に夏休みを取って、家族とアメリカ観光をする予定だったのですが、直前になり子供が熱を出して渡航できなくなってしまうと言うハプニングが。結局諸々の予定をキャンセルし、帰りの航空券も取り直し日本に帰ることになりました。ホテル、ツアーなど直前キャンセルが利かないものも多く、10%ほどしかお金は戻ってきませんでした(泣)