かまたま日記3

プログラミングメイン、たまに日常

embulk-executor-remoteserver 0.4.0 リリース

github.com

このバージョンより、Embulk clientとserver間でTLSでの接続ができるようになりました。

設定方法 (クライアント)

まず、use_tls オプションをtrueに設定してください。サーバ側が(クライアントにとって)既知のCA証明書でサインされた証明書を使っていれば、これだけでOKです*1

exec:
  type: remoteserver
  hosts: ...
  use_tls: true

そうでない場合は、CA証明書をca_cert_pathに追加してください

exec:
  type: remoteserver
  hosts: ...
  use_tls: true
  ca_cert_path: path/to/ca.cert.pem

クライアント認証が必要な場合、クライアント証明書と秘密鍵がセットになったPKCS12ファイルのパスとパスワードをcert_p12_fileで指定してください。

exec:
  type: remoteserver
  hosts: ...
  use_tls: true
  cert_p12_file:
    path: path/to/cert/client.p12
    password: xxxxx

設定方法 (サーバ)

EmbulkサーバをTLSの終端にする場合*2、以下の環境変数を設定してサーバを起動してください

  • USE_TLS=true: TLS接続を有効にする
  • REQUIRE_TLS_CLIENT_AUTH=true: クライアント認証を有効にする
  • CERT_P12_PATH, CERT_P12_PASSWORD: サーバ証明書秘密鍵のペアのPKCS12ファイルパスとパスワード
  • CA_CERT_PATH: CA証明書のパス。クライアント証明書が(サーバにとって)未知のCA証明書でサインされてる場合に必要

例えばdocker-composeで設定する場合以下のようになるかと思います

version: '3'
services:
  server:
    image: kamatama41/embulk-executor-remoteserver
    ports:
      - "30001:30001"
    volumes:
      - ./certs:/root/certs
    environment:
      USE_TLS: "true"
      REQUIRE_TLS_CLIENT_AUTH: "true"
      CERT_P12_PATH: /root/certs/embulk-server.local.p12
      CERT_P12_PASSWORD: xxxxx
      CA_CERT_PATH: /root/certs/ca.cert.pem

*1:たぶん、未確認

*2:前段にAWS NLBやNGINXを置いてそこを終端にするなども多分できると思います

Gradleで動的にプラグインを適用する

モチベーション

とあるプロジェクトでgradle-release pluginを使っているのですが、DockerでJarをビルドするときにこのプラグインを設定していると .git ディレクトリをコピーしないと(プロジェクトがGitリポジトリじゃないと)最初のbuild.gradleの検証で失敗してしまうのですが、無駄なレイヤーが増えるのでDockerコンテナ上に.gitはあまりコピーしたくない。なので、releaseタスクを実行するときのみ上記のプラグインを有効にしたい。

方法

こちらの記事 を参考にさせてもらいました。プラグインを指定するときにapply falseをつけてallprojects ブロックの中で指定したプロパティがあるかをを使って遅延applyします。

before

plugins {
    id "net.researchgate.release" version "2.8.0"
}

release {
    git { requireBranch = 'master' }
}

after

plugins {
    id "net.researchgate.release" version "2.8.0" apply false
}

allprojects {
    if (properties.get("enableReleasePlugin") == "true") {
        apply plugin: "net.researchgate.release"
        release {
            git { requireBranch = 'master' }
        }
    }
}

これで、実行時に -PenableReleasePlugin=true をつけたときのみプラグインが適用されます。

$ ./gradlew release -PenableReleasePlugin=true

CicleCIでDockerイメージを再利用する in 2019

CicleCIでDockerイメージを再利用する - かまたま日記3

こちらの記事の最新版です。 現在CircleCIはバージョン2で、Docker Layer Cachingという機能がありますが、残念ながら追加のフィーが必要です。というわけで、会社とかで使っててフィーを払える方はそちらを使うとして、個人のOSS活動などで払うのが厳しい方用に普通にCircleCI 2.0のファイルのキャッシュ機能を使った方法を解説します*1

前回に比べて以下の点が変わっています

  • GoのようにMulti stage buildを使って多段ビルドを行う前提 (最初のビルドのステージはas buider でbuilderという名前がついてます)
  • Docker image を完全に再利用はしないで中間イメージを再利用する (docker build は毎回やる)
  • docker save 後のtarファイルを更にgzに圧縮してリストアの時間短縮を図る

注意として、自分の環境では、数百メガのキャッシュをload cacheとsave cacheするので2~3分かかるのでキャッシュを使うことによって、それ以上短縮出来なければ、総時間は変わらないか悪くなる可能性もあります。

1. キャッシュ用のディレクトリとキャッシュキーを決める

ここではディレクトリは /home/circleci/docker-cache、キャッシュキーは毎回変えたいので {{ .Branch }}-{{ .Revision }} を使います

- restore_cache:
    keys:
      - v1-docker-{{ .Branch }}-{{ .Revision }}
      - v1-docker-{{ .Branch }}-
      - v1-docker-

- save_cache:
    paths:
      - "/home/circleci/docker-cache"
    key: v1-docker-{{ .Branch }}-{{ .Revision }}

2. イメージをビルドする

今回は builder のイメージを取っておきたいので、builderの部分は別にビルドしてbuilerタグを付けています。

IMAGE_NAME=my_app
CACHE_FROM="--cache-from ${IMAGE_NAME}:builder --cache-from ${IMAGE_NAME}"
docker build --pull ${CACHE_FROM} --target builder -t ${IMAGE_NAME}:builder .
docker build --pull ${CACHE_FROM} -t ${IMAGE_NAME} .

3. イメージを保存する

docker save で保存するイメージを docker history コマンドで取得します。 builder を取ってるのがミソです。

mkdir -p /home/circleci/docker-cache
image_ids=$(docker history -q my_app:builder | grep -v '<missing>')
docker save ${image_ids} | gzip > ${DOCKER_CACHE_FILE}

4. イメージを展開する

Gzip化しているのでgunzipコマンドで解答して docker load に渡します

if [ -f /home/circleci/docker-cache/image.tar.gz ]; then
  gunzip -c /home/circleci/docker-cache/image.tar.gz | docker load
fi

全体

キャッシュファイルのパスを変数に入れて、以下のような感じになります。関係ない部分*2は略してます。

version: 2
jobs:
  build:
    environment:
      DOCKER_CACHE_FILE: /home/circleci/docker-cache/image.tar.gz
    steps:
      - checkout
      - restore_cache:
          keys:
            - v1-docker-{{ .Branch }}-{{ .Revision }}
            - v1-docker-{{ .Branch }}-
            - v1-docker-
      - run: |
          name: Load Docker cache
          command: |
            if [ -f ${DOCKER_CACHE_FILE} ]; then
              gunzip -c ${DOCKER_CACHE_FILE} | docker load
            fi
      - run: |
          name: Docker build
          command: |
            IMAGE_NAME=my_app
            CACHE_FROM="--cache-from ${IMAGE_NAME}:builder --cache-from ${IMAGE_NAME}"
            docker build --pull ${CACHE_FROM} --target builder -t ${IMAGE_NAME}:builder .
            docker build --pull ${CACHE_FROM} -t ${IMAGE_NAME} .
      - run:
          name: Save Docker cache
          command: |
            mkdir -p $(dirname ${DOCKER_CACHE_FILE})
            image_ids=$(docker history -q my_app:${t} | grep -v '<missing>')
            docker save ${image_ids} | gzip > ${DOCKER_CACHE_FILE}
      - save_cache:
          paths:
            - ${DOCKER_CACHE_FILE}
          key: v1-docker-{{ .Branch }}-{{ .Revision }}

*1:基本となる考え方は別に他のCIでも使えると思います

*2:たとえばdockerのインストールとかsetup_remote_dockerやテストの実行、デプロイなど

embulk-executor-remoteserverを作った #Embulk

Embulkのexecutor pluginの仕組みとネットワークプログラミングを学びたかったので、勉強がてらこのようなプラグインを作ってみました。

github.com

できること

  • Embulkのタスクの実行を別に立てた専用のサーバ(以下Embulkサーバと呼びます)上で実行できる
  • 複数サーバにタスクの処理を分散させる
  • サーバとの接続が切れたときに再接続する

できないこと (TODOs)

使い方

2019/04/06時点の最新は 0.2.1 です

Embulkサーバを起動する

基本的にはDockerコンテナ上で動かすことを想定していて、DockerHubにイメージをホストしているので、それをrunすれば起動できます。クライアント(plugin)と同じバージョンのイメージを使うことをおすすめしますが、多少違ってもたぶん動きます。コンテナ側は 30001 番のポートを開くので*2それを公開します。

$ docker run -p 30001:30001 -it --rm kamatama41/embulk-executor-remoteserver     
14:55:48.805 [main] INFO  c.g.kamatama41.nsocket.SocketServer - Starting server..

Embulkを起動する

exectype: remoteserver を指定して hosts に起動中のEmbulkサーバのホストを登録します。

exec:
  type: remoteserver
  hosts:
    - localhost:30001

この状態でEmbulk runをすると、サーバに接続に行き、タスクの実行を依頼します。 もっと詳しいチュートリアルは、こちらのexample を参照してください。

中身の話

このプラグインは、指定されたEmbulkサーバとTCPコネクションを張り、そのコネクションを通じてクライアントとサーバが相互にデータのやり取りを行います。例えばクライアントからはEmbulkEmbedを構築するのに必要なタスクの情報やシステムコンフィグ、OSSプラグインを実行に必要なgemをZIPファイルにまとめて送ったりしています。逆にサーバからはクライアントに各タスクの進捗状況を送っており、クライアント側は終了の通知が来るまで待受をします。

いわゆるWebSocket的な双方向通信をしているのですが、この仕組みを自作しました。

github.com

まあnettyとか使ってWebSocketにしても良かったのですが、あんまり依存ライブラリを増やしたくなかったのとネットワークプログラミングの勉強も兼ねてスクラッチで作ってみたかったという次第です。nsocketNIOで作ったノンブロッキングsocketサーバとクライアント の略です。大変だったけど結構いい勉強になりました。

実用性があるかは微妙なところですが、よかったら使ってみて感想をいただけると喜びます。

*1:現状Maven pluginは(たぶん)無いので、あんまり実害はないかもしれません

*2:変えることもできます

gradle-embulk-plugin v0.5.0 リリース

Release 0.5.0 · kamatama41/gradle-embulk-plugin · GitHub

embulk_* のタスクを実行時の config.yml output.yml のオーバーライドを build.gradle 内ではなくプロパティを渡すようにしました。

以前

embulk {
  configYaml = "myconfig.yml"
  outputYaml = "myoutput.yml"
}

今回以降

$ ./gradlew embulk_guess -PconfigYaml=myconfig.yml -PoutputYaml=myoutput.yml

プラグインの詳しい使い方は以下の記事を参照ください。

kamatama41.hatenablog.com

Lambdaオブジェクトの型パラメータを取るのは難しい

TL;DR

  • ラムダオブジェクトの型パラメータを取得するスマートな方法は今の所見つかっていない
  • もし基盤プログラムでそういうことをしたい場合は、ラムダを禁止して、匿名クラスを使う
  • いい方法があったら教えてください

本文

Javaで基盤プログラム的なのを作るとき、ジェネリクスの型パラメータを取得したいことがあります。普通のクラスや匿名クラスの場合は以下のようなリフレクションのコードで取得することができます。

import java.lang.reflect.ParameterizedType;
import java.util.function.Consumer;

public class FooTest {
    public static void main(String[] args) {
        System.out.println(getGenericTypeParam(new Foo()));
        System.out.println(getGenericTypeParam(new Bar()));
        System.out.println(getGenericTypeParam(Baz));
    }

    private static Class<?> getGenericTypeParam(Consumer consumer) {
        ParameterizedType type = (ParameterizedType) consumer.getClass().getGenericInterfaces()[0];
        return (Class) type.getActualTypeArguments()[0];
    }

    private static class Foo implements Consumer<String> {
        @Override
        public void accept(String s) {}
    }

    private static class Bar implements Consumer<Integer> {
        @Override
        public void accept(Integer s) {}
    }

    private static Consumer<Void> Baz = new Consumer<Void>() {
        @Override
        public void accept(Void aVoid) {
        }
    };
}

実行結果

class java.lang.String
class java.lang.Integer
class java.lang.Void

しかし、これがラムダになると、getGenericInterfaces メソッドの結果が ParameterizedType ではなく単なる java.lang.Object のクラス型になり、ClassCastExceptionが発生してしまいます。

getGenericTypeParam((Consumer<Byte>) (b -> {}));

結果

Exception in thread "main" java.lang.ClassCastException: java.lang.Class cannot be cast to java.lang.reflect.ParameterizedType

つまり、ラムダ式で生成された関数オブジェクトからは型パラメータの情報が消えているのです。これをどうにかして取得したいといろいろ模索していたのですが、結局ダメでした。一番惜しかったのはこちらのGistのやり方です。

ラムダ式が定義されているクラスで getDeclaredMethods を使ってメソッド一覧を見ると、そのクラス内で定義されたラムダ式に対応したSyntheticメソッドが生成されています。

import java.lang.reflect.Method;

public class FooTest {
    public static void main(String[] args) {
        Runnable task1 = () -> task2();
        System.out.println(task1.getClass().getName());
        task1.run();

        System.out.println();
        for (Method method : FooTest.class.getDeclaredMethods()) {
            System.out.println(method);
        }
    }

    private static void task2() {
        Runnable task2 = () -> {};
        System.out.println(task2.getClass().getName());
    }
}

上記のプログラムを実行すると以下のような結果が出力されます。

FooTest$$Lambda$1/664223387
FooTest$$Lambda$2/666641942

public static void FooTest.main(java.lang.String[])
private static void FooTest.task2()
private static void FooTest.lambda$task2$1()
private static void FooTest.lambda$main$0()

ラムダ式で生成されたオブジェクトとSyntheticメソッドにはどちらも名前に $1 的なインデックスがついており、それぞれのインデックスが1対1で対応していそうです。そこで、先のGistに習って以下の getGenericTypeParamSmart を追加します。

private static Class<?> getGenericTypeParamSmart(Consumer consumer) {
    String functionClassName = consumer.getClass().getName();
    int lambdaMarkerIndex = functionClassName.indexOf("$$Lambda$");
    if (lambdaMarkerIndex == -1) { // Not a lambda
        return getGenericTypeParam(consumer);
    }

    String declaringClassName = functionClassName.substring(0, lambdaMarkerIndex);
    int lambdaIndex = Integer.parseInt(functionClassName.substring(lambdaMarkerIndex + 9, functionClassName.lastIndexOf('/')));

    Class<?> declaringClass;
    try {
        declaringClass = Class.forName(declaringClassName);
    } catch (ClassNotFoundException e) {
        throw new IllegalStateException("Unable to find lambda's parent class " + declaringClassName);
    }

    for (Method method : declaringClass.getDeclaredMethods()) {
        if (method.isSynthetic()
                && method.getName().startsWith("lambda$")
                && method.getName().endsWith("$" + (lambdaIndex - 1))
                && Modifier.isStatic(method.getModifiers())) {
            return method.getParameterTypes()[0];
        }
    }
    throw new IllegalStateException("Unable to find lambda's implementation method");
}

その上で、以下のコードを実行するとちゃんと型パラメータが取れてそうです

public static void main(String[] args) {
    System.out.println(getGenericTypeParamSmart(new Foo()));
    System.out.println(getGenericTypeParamSmart(new Bar()));
    System.out.println(getGenericTypeParamSmart(Baz));
    System.out.println(getGenericTypeParamSmart((Consumer<Byte>) (b -> {})));
    System.out.println(getGenericTypeParamSmart((Consumer<Long>) (l -> {})));
}
class java.lang.String
class java.lang.Integer
class java.lang.Void
class java.lang.Byte
class java.lang.Long

ただし、Gistにもコメントしましたが、ラムダ内でラムダを生成した場合、例えば、以下のパターンでは失敗します。

public static void main(String[] args) {
    Runnable task = () -> {
        System.out.println(getGenericTypeParamSmart(new Foo()));
        System.out.println(getGenericTypeParamSmart(new Bar()));
        System.out.println(getGenericTypeParamSmart(Baz));
        System.out.println(getGenericTypeParamSmart((Consumer<Byte>) (b -> {})));
        System.out.println(getGenericTypeParamSmart((Consumer<Long>) (b -> {})));
    };
    task.run();
}
class java.lang.String
class java.lang.Integer
class java.lang.Void
class java.lang.Long
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0

なぜでしょうか? ラムダオブジェクトとFooTestクラスに定義されているSyntheticメソッドを比較してみましょう

public static void main(String[] args) {
    Runnable task = () -> {
        Consumer<Byte> byteConsumer = b -> {};
        Consumer<Long> longConsumer = l -> {};
        System.out.println("byteConsumer: " + byteConsumer.getClass().getName());
        System.out.println("longConsumer: " + longConsumer.getClass().getName());
    };
    System.out.println("task: " + task.getClass().getName());
    task.run();

    System.out.println();
    for (Method method : FooTest.class.getDeclaredMethods()) {
        if (method.isSynthetic()) {
            System.out.println(method.toString());
        }
    }
}

これの実行結果は以下のようになります

task: FooTest$$Lambda$1/664223387
byteConsumer: FooTest$$Lambda$2/1349393271
longConsumer: FooTest$$Lambda$3/159413332

private static void FooTest.lambda$main$2()
private static void FooTest.lambda$null$1(java.lang.Long)
private static void FooTest.lambda$null$0(java.lang.Byte)

つまり:

  • task オブジェクト FooTest$$Lambda$1 に対応するSyntheticメソッドは lambda$main$2
  • byteConsumer オブジェクト FooTest$$Lambda$2 に対応するSyntheticメソッドは lambda$null$0
  • longConsumer オブジェクト FooTest$$Lambda$3 に対応するSyntheticメソッドは lambda$null$1

に、なるわけです。番号の対応がずれてるので、間違ったメソッドを検索してしまっていたわけです。そういう訳で、この方法は使えませんでした。

そのあといろいろ調べてみましたが、型パラメータをちゃんと取得する方法は見つかりませんでした。というわけで、こういうプログラムを書きたいときは今の所はラムダを禁止したほうが良さそうです。

private static Class<?> getGenericTypeParam(Consumer consumer) {
    String functionClassName = consumer.getClass().getName();
    if (functionClassName.contains("$$Lambda$")) {
        throw new UnsupportedOperationException("Lambda is not supported");
    }
    ParameterizedType type = (ParameterizedType) consumer.getClass().getGenericInterfaces()[0];
    return (Class) type.getActualTypeArguments()[0];
}

最終的なコードは以下のような感じになります。

import java.lang.reflect.ParameterizedType;
import java.util.function.Consumer;

public class FooTest {
    public static void main(String[] args) {
        System.out.println(getGenericTypeParam(new Foo()));
        System.out.println(getGenericTypeParam(new Bar()));
        System.out.println(getGenericTypeParam(Baz));
        try {
            System.out.println(getGenericTypeParam((Consumer<Byte>) (b -> {})));
        } catch (UnsupportedOperationException e) {}
    }

    private static Class<?> getGenericTypeParam(Consumer consumer) {
        String functionClassName = consumer.getClass().getName();
    if (functionClassName.contains("$$Lambda$")) {
        throw new UnsupportedOperationException("Lambda is not supported");
    }
        ParameterizedType type = (ParameterizedType) consumer.getClass().getGenericInterfaces()[0];
        return (Class) type.getActualTypeArguments()[0];
    }

    private static class Foo implements Consumer<String> {
        @Override
        public void accept(String s) {}
    }

    private static class Bar implements Consumer<Integer> {
        @Override
        public void accept(Integer s) {}
    }

    private static Consumer<Void> Baz = new Consumer<Void>() {
        @Override
        public void accept(Void aVoid) {}
    };
}

embulk-output-multiを作った

前職の同僚の @mtsmfm さんがつぶやいていたので、勢いで作ってみました。

github.com

使い方

2019/03/11時点の最新版は 0.4.0 です

outputs に複数のoutputの設定をリスト形式で記述するだけです、簡単ですね。

in:
  type: ...
out:
  type: multi
  outputs:
    - type: stdout
    - type: file
      path_prefix: out_file_
      file_ext: csv
      formatter:
        type: csv
    - type: s3
      ...

注意

エラーハンドリングについて

  • 複数プラグインのどれかが transaction (or resume) でエラーになった場合は、安全側に倒してその後のすべてのプラグインopen は実行されずに終わります。
  • すべてのプラグインopen まで到達した場合、outputの処理が順次実行されていきますが、もし途中で一つでも失敗したアウトプットがあった場合、そのタスクを失敗とみなして例外を投げます。

ConfigDiffに関して

現在は各プラグインのConfigDiffを <plugin_type>_<index_in_outputs> のタグを付けてMapで保存しています。例えば、上記の例で言うと最初のstdoutはstdout_0という名前が付きます。つまり一回ConfigDiffを出力したあとにconfig.ymlを書き換えて順番を変えたり違うプラグインに書き換えたりすると、Diffがマージされない、もしくは違うDiffがマージされてしまう可能性があります。ConfigDiffを使う場合は、一回実行した outputs の順番は変えない方が良いでしょう。

内部実装的な苦労話

通常のEmbulkのOutput pluginの実行フローは以下のようになっています.

  • transaction メソッドが呼ばれる。このメソッド内で下準備 (認証やパラメータの検証) をする。
  • OKなら渡されてきた OutputPlugin.Control オブジェクトの run メソッド (以下 コールバック) を呼びExecutor側に処理を移譲する
  • Execotorがタスク分割などを実施し、タスクごとに open メソッドが呼ばれる。このメソッドで返却した TransactionalPageOutput が送られてきたデータを処理していく。
    • タスクの処理が終わると commit メソッドが呼ばれタスクごとの TaskReport を作成し、返す
    • 失敗したタスクは abort が呼ばれる
  • 全部のタスクが終わったあと cleanup メソッドが実行され、後処理を行う
  • transactionメソッドは、コールバックから返却されたTask Reportsを元に ConfigDiff を作成し、返す

上記の流れを、multiプラグイントランザクション上で複数のプラグインに対してエミュレートする必要があります。そのため以下のような処理の流れを作りました。フローがとても複雑でマルチスレッドを駆使する必要があったので、難しかったですけど楽しかったですw

  • マルチスレッドで各pluginのtransactionを実施しつつ、ダミーのコールバックを渡す
  • すべてのプラグインのtransactionの検証が終わったあと (= ダミーのコールバックが呼ばれたあと) に元のコールバックを呼ぶ
    • ダミーは、オリジナルのコールバックが終わるまでは各プラグインのtransactionをブロックする必要がある
  • 元のコールバックが呼ばれると、multiプラグインのopenが呼ばれる。ここで各プラグインのTransactionalPageOutputを作成し、それぞれのPageOutputに処理(add, finish, closeなど)を移譲する。
    • ただし Page オブジェクトは使い回せないので、各プラグインごとにコピーする
    • どれかのプラグインが途中で失敗してた場合はmultiのタスク自体は失敗とみなして例外を投げる、その後、全プラグインのabortが呼ばれる
  • 元のcleanupが実行される、保持しておいた個々のTaskReportを復元して、各プラグインのcleanupを実行する
  • すべてのタスク終わったあとに。ダミーのコールバックのブロックを解除する。TaskReport内に個々プラグインのTaskReportが保持されているので、復元して個々のプラグインに渡す。
  • 返したTaskReportを元に個々のプラグインのtransactionがConfigDiffを返す。それを集めてmultiプラグインのConfigDiffとして返す