読者です 読者をやめる 読者になる 読者になる

techium

このブログは何かに追われないと頑張れない人たちが週一更新をノルマに技術情報を発信するブログです。もし何か調査して欲しい内容がありましたら、@kobashinG or @muchiki0226 までいただけますと気が向いたら調査するかもしれません。

ワークフローエンジンDigDagのプラグイン機能を試してみる

DigDag Slack

ワークフローエンジンのDigdagについて記事を書いてきましたが、v0.8.3からプラグイン機能が入ったというアナウンスがありましたので早速サンプルを動かしてみました。
ドキュメントやトラブル等があったのでそれらの対策等を紹介いたします。
また、Slackにメッセージを投稿するプラグインもサンプルで作ってみたのでやり方を紹介します。

プラグインプロジェクトのビルド

DigDagのプラグイン機能のサンプルはこちらにおいてあります。
(サンプルというかテスト用でしょうけど…)

こちらの部分を抜き出したフォルダ構成は次のようになります。

plugin
├── build
│   ├── publications
│   │   └── mavenJava
│   │       └── pom-default.xml
│   └── repo
│       └── io
│           └── digdag
│               └── plugin
│                   └── digdag-plugin-example
│                       ├── 0.1.0
│                       │   ├── digdag-plugin-example-0.1.0.jar
│                       │   ├── digdag-plugin-example-0.1.0.jar.md5
│                       │   ├── digdag-plugin-example-0.1.0.jar.sha1
│                       │   ├── digdag-plugin-example-0.1.0.pom
│                       │   ├── digdag-plugin-example-0.1.0.pom.md5
│                       │   └── digdag-plugin-example-0.1.0.pom.sha1
│                       ├── maven-metadata.xml
│                       ├── maven-metadata.xml.md5
│                       └── maven-metadata.xml.sha1
├── build.gradle
├── gradle
│   └── wrapper
│       ├── gradle-wrapper.jar
│       └── gradle-wrapper.properties
├── gradlew
├── gradlew.bat
└── src
    └── main
        ├── java
        │   └── io
        │       └── digdag
        │           └── plugin
        │               └── example
        │                   ├── ExampleOperatorFactory.java
        │                   └── ExamplePlugin.java
        └── resources
            └── META-INF
                └── services
                    └── io.digdag.spi.Plugin

このプロジェクトフォルダ内の「digdag-plugin-example」がDigDagのプラグインのプロジェクトになっています。
中のプロジェクトはmaven-publishを利用してプロジェクトをビルドしてビルド生成物を作成します。
早速、digdag-plugin-exampleをビルドしてみると下記のようなエラーになります。

$ ./gradlew publish
:generatePomFileForMavenJavaPublication
:compileJava

FAILURE: Build failed with an exception.

* What went wrong:
Could not resolve all dependencies for configuration ':provided'.
> Could not resolve io.digdag:digdag-spi:0.8.3.
  Required by:
      io.digdag.plugin:digdag-plugin-example:0.1.0
   > Could not resolve io.digdag:digdag-spi:0.8.3.
      > Could not get resource 's3://digdag-beta-release/maven/io/digdag/digdag-spi/0.8.3/digdag-spi-0.8.3.pom'.
         > The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId; Request ID: A915771EEF478CC7)
> Could not resolve io.digdag:digdag-plugin-utils:0.8.3.
  Required by:
      io.digdag.plugin:digdag-plugin-example:0.1.0
   > Could not resolve io.digdag:digdag-plugin-utils:0.8.3.
      > Could not get resource 's3://digdag-beta-release/maven/io/digdag/digdag-plugin-utils/0.8.3/digdag-plugin-utils-0.8.3.pom'.
         > The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId; Request ID: C3D5C4A0D8D6F3FD)

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output.

BUILD FAILED

Total time: 13.331 secs

これはrepositoriesのmavenでAWSのS3を指定しているために発生しています。
これはS3へのアクセスができる人はそのまま使用することができるかもしれませんが、アクセス出来ない人はbuild.gradleでrepositoriesのmavenの指定を削除する必要があります。
削除した状態でビルドすると次のようなエラーが発生します。

$ ./gradlew publish
:generatePomFileForMavenJavaPublication
:compileJava

FAILURE: Build failed with an exception.

* What went wrong:
Could not resolve all dependencies for configuration ':provided'.
> Could not find io.digdag:digdag-spi:0.8.2.
  Searched in the following locations:
      file:/Users/muchiki0226/.m2/repository/io/digdag/digdag-spi/0.8.2/digdag-spi-0.8.2.pom
      file:/Users/muchiki0226/.m2/repository/io/digdag/digdag-spi/0.8.2/digdag-spi-0.8.2.jar
      https://repo1.maven.org/maven2/io/digdag/digdag-spi/0.8.2/digdag-spi-0.8.2.pom
      https://repo1.maven.org/maven2/io/digdag/digdag-spi/0.8.2/digdag-spi-0.8.2.jar
      https://jcenter.bintray.com/io/digdag/digdag-spi/0.8.2/digdag-spi-0.8.2.pom
      https://jcenter.bintray.com/io/digdag/digdag-spi/0.8.2/digdag-spi-0.8.2.jar
  Required by:
      io.digdag.plugin:digdag-plugin-example:0.1.0
> Could not find io.digdag:digdag-plugin-utils:0.8.2.
  Searched in the following locations:
      file:/Users/muchiki0226/.m2/repository/io/digdag/digdag-plugin-utils/0.8.2/digdag-plugin-utils-0.8.2.pom
      file:/Users/muchiki0226/.m2/repository/io/digdag/digdag-plugin-utils/0.8.2/digdag-plugin-utils-0.8.2.jar
      https://repo1.maven.org/maven2/io/digdag/digdag-plugin-utils/0.8.2/digdag-plugin-utils-0.8.2.pom
      https://repo1.maven.org/maven2/io/digdag/digdag-plugin-utils/0.8.2/digdag-plugin-utils-0.8.2.jar
      https://jcenter.bintray.com/io/digdag/digdag-plugin-utils/0.8.2/digdag-plugin-utils-0.8.2.pom
      https://jcenter.bintray.com/io/digdag/digdag-plugin-utils/0.8.2/digdag-plugin-utils-0.8.2.jar
  Required by:
      io.digdag.plugin:digdag-plugin-example:0.1.0

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output.

BUILD FAILED

Total time: 12.078 secs

DigDagプロジェクト内のモジュールの読み込みが行われていないため、特定のモジュールが見つからないためにこのエラーが問題が発生しています。
そこでDigDag自体のプジェクトをダウンロードしてきてビルドします。

$ git clone git@github.com:treasure-data/digdag.git
$ cd digdag
$ ./gradlew build

ビルドできたら次の2ファイルをコピーします。

  • ./digdag-spi/build/libs/digdag-spi-0.8.4-SNAPSHOT.jar
  • ./digdag-plugin-utils/build/libs/digdag-plugin-utils-0.8.4-SNAPSHOT.jar

この2ファイルをdigdag-plugin-exampleの中にlibsというフォルダを作成し、その中にコピーします。
これをビルド時に一緒にビルドすることでモジュールの問題は解決されますが、これをビルドしても必要なライブラリが含まれていないためエラーになります。
これらのエラーをすべて解消したbuild.gradleは次のようになります。

apply plugin: 'java'
apply plugin: 'maven'
apply plugin: 'maven-publish'

group = 'io.digdag.plugin'
version = '0.1.0'

def digdagVersion = '0.8.2'

repositories {
    mavenLocal()
    mavenCentral()
    jcenter()
//    maven {
//        url 's3://digdag-beta-release/maven'
//        credentials(AwsCredentials) {
//            accessKey "${System.env.AWS_ACCESS_KEY_ID}"
//            secretKey "${System.env.AWS_SECRET_ACCESS_KEY}"
//        }
//    }
}

//configurations {
//    provided
//}

dependencies {
    compile files('./lib/digdag-plugin-utils-0.8.4-SNAPSHOT.jar')
    compile files('./lib/digdag-spi-0.8.4-SNAPSHOT.jar')
    compile files('./lib/digdag-client-0.8.4-SNAPSHOT.jar')
    compile 'javax.inject:javax.inject:1'
    compile group: 'com.google.guava', name: 'guava', version: 'r05'
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.0.1'
    //provided 'io.digdag:digdag-spi:' + digdagVersion
    //provided 'io.digdag:digdag-plugin-utils:' + digdagVersion  // this should be 'compile' once digdag 0.8.2 is released to a built-in repository
}
//sourceSets {
//    main {
//        compileClasspath += configurations.provided
//        test.compileClasspath += configurations.provided
//        test.runtimeClasspath += configurations.provided
//    }
//}

publishing {
    publications {
        mavenJava(MavenPublication) {
            from components.java
        }
    }
    repositories {
        maven {
            url "$buildDir/repo"
        }
    }
}

sourceCompatibility = 1.8
targetCompatibility = 1.8

compileJava.options.encoding = 'UTF-8'
compileTestJava.options.encoding = 'UTF-8'

javadoc {
    options {
        locale = 'en_US'
        encoding = 'UTF-8'
    }
}

追加でguavaとjacksonを入れ、これでビルドすると./digdag-plugin-example/build/repo/io/digdag/plugin/digdag-plugin-example/0.1.0内にdigdag-plugin-example-0.1.0.jarが生成されます。
これで準備が整いました。

プラグインの実行

このフォルダ内のplugin.digを見てみると次のようになっています。

_export:
  plugin:
    repositories:
      - file://${repository_path}
    dependencies:
      - io.digdag.plugin:digdag-plugin-example:0.1.0

+step1:
  example>: template.txt
  message: yes
  path: example.out

exportにpluginという項目を記載し、プラグインのリポジトリのパスとdependenciesでプラグインの指定を行っています。
dependenciesは「groupId:artifactId:version」の構成で記入を行います。
groupId、artifactId、versionはプラグインのbuild.gradleに記載されている内容になります。

これで次のコマンドを実行するとタスクが実行されます。

$ digdag r plugin.dig -p repository_path=/<digdag-plugin-exampleをおいているパス>/digdag-plugin-example/build/repo                                    ⏎
2016-07-03 14:31:59 +0900: Digdag v0.8.3
2016-07-03 14:32:00 +0900 [WARN] (main): Using a new session time 2016-07-03T00:00:00+00:00.
2016-07-03 14:32:00 +0900 [INFO] (main): Using session .digdag/status/20160703T000000+0000.
2016-07-03 14:32:00 +0900 [INFO] (main): Starting a new session project id=1 workflow name=plugin session_time=2016-07-03T00:00:00+00:00
2016-07-03 14:32:00 +0900 [INFO] (0019@+plugin+step1): example>: template.txt
Success. Task state is saved at .digdag/status/20160703T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

上記のように表示されたら、example.outが作成され中に下記のような文字列書き込まれています。
[example.out]

Worked? yes

これはtemplete.txtが下記のようになっており、messageの部分がプラグインによってstep1に記載されているmessageの部分に差し替えられていることがわかります。
[templete.txt]

Worked? ${message}

Slackに投稿するプラグイン

これでプラグインの環境は整いました。

今回は下記のページのIncoming WebHooksを使ってSlackへ文字列を投稿します。

qiita.com

まずdigファイルは次のようにします。
[plugin.dig]

_export:
  plugin:
    repositories:
      - file://${repository_path}
    dependencies:
      - io.digdag.plugin:digdag-plugin-example:0.1.0

+step1:
  slack>: message.txt
  webhook: https://hooks.slack.com/services/XXXXXXX/XXXX/XXXXXX
  channel: general
  username: webhookbot
  icon_emoji: ghost

slackコマンドでmessage.txtを読み込んで、webhook、channel、username、icon_emojiのそれぞれの情報を実行時に読み込んでSlackへ投稿するようにします。

プラグ新プロジェクト内のOperatorFactoryをimplementsしたSlackOperatorFactoryを次のように作成しました。

[SlackOperatorFactory.java]

public class SlackOperatorFactory
        implements OperatorFactory
    {
        private final TemplateEngine templateEngine;

    public SlackOperatorFactory(TemplateEngine templateEngine)
    {
        this.templateEngine = templateEngine;
    }

    public String getType()
    {
        return "slack";
    }

    @Override
    public Operator newTaskExecutor(Path workspacePath, TaskRequest request)
    {
        return new ExampleOperator(workspacePath, request);
    }

    private class ExampleOperator
            extends BaseOperator
    {
        public ExampleOperator(Path workspacePath, TaskRequest request)
        {
            super(workspacePath, request);
        }

        @Override
        public TaskResult runTask()
        {
            Config params = request.getConfig().mergeDefault(
                    request.getConfig().getNestedOrGetEmpty("slack"));

            String message = templateEngine.templateCommand(workspacePath, params, null, UTF_8);
            String url = params.get("webhook", String.class);
            String channel = "#" + params.get("channel", String.class);
            String username = params.get("username", String.class);
            String icon_emoji = ":" + params.get("icon_emoji", String.class) + ":";

            System.out.println(url);
            System.out.println(message);
            System.out.println(channel);
            System.out.println(username);
            System.out.println(icon_emoji);

            Gson gson = new Gson();
            String payload = gson.toJson(new SlackInfo(channel,username,message,icon_emoji));
            System.out.println(payload);

            try {
                HttpResponse<String> result = Unirest.post(url)
                    .header("Content-Type", "application/x-www-form-urlencoded")
                        .field("payload", payload).asString();
            } catch (UnirestException e) {
                e.printStackTrace();
            }
            return TaskResult.empty(request);
        }
    }
}

ここでの特徴はgetTypeでプラグインの「slack>」の>の前の部分を指定します。 次にタスクが実行された際に呼び出されるrunTaskを実装します。
runTask内でConfigクラスのparamsでタスク実行時に付与している情報を収集しUnirest.postでSlackへポストしています。

これ以外に次のファイルを作成しました。 基本的には名前を適切に変更したのみです。

[SlackPlugin.java]

public class SlackPlugin
        implements Plugin
{
    @Override
    public <T> Class<? extends T> getServiceProvider(Class<T> type)
    {
        if (type == OperatorProvider.class) {
            return SlackOperatorProvider.class.asSubclass(type);
        }
        else {
            return null;
        }
    }

    public static class SlackOperatorProvider
            implements OperatorProvider
    {
        @Inject
        protected TemplateEngine templateEngine;

        @Override
        public List<OperatorFactory> get()
        {
            return Arrays.asList(
                    new SlackOperatorFactory(templateEngine));
        }
    }
}

[SlackInfo.java]

public class SlackInfo {
    private String channel;

    private String username;

    private String text;

    private String icon_emoji;

    public SlackInfo(String channel, String username, String text, String icon_emoji) {
        this.channel = channel;
        this.username = username;
        this.text = text;
        this.icon_emoji = icon_emoji;
    }
}

[resources/META-INF/services/io.digdag.spi.Plugin]

io.digdag.plugin.example.SlackPlugin

これで完成しました。 最後にmessage.txtに「digidagから投稿したよ」という文字列を書いていた場合は実行すると次の様になります。

[DigDagの実行]

$ digdag r plugin.dig -p repository_path=/<digdag-plugin-exampleをおいているパス>/digdag-plugin-example/build/repo
2016-07-03 21:05:19 +0900: Digdag v0.8.3
2016-07-03 21:05:20 +0900 [WARN] (main): Using a new session time 2016-07-03T00:00:00+00:00.
2016-07-03 21:05:20 +0900 [INFO] (main): Using session .digdag/status/20160703T000000+0000.
2016-07-03 21:05:20 +0900 [INFO] (main): Starting a new session project id=1 workflow name=plugin session_time=2016-07-03T00:00:00+00:00
2016-07-03 21:05:20 +0900 [INFO] (0019@+plugin+step1): slack>: message.txt
https://hooks.slack.com/services/XXXXXX/XXXXXXX/XXXXXXXX
digidagから投稿したよ
#general[f:id:muchiki0226:20160703210833p:plain]
webhookbot
:ghost:
{"channel":"#general","username":"webhookbot","text":"digidagから投稿したよ","icon_emoji":":ghost:"}
Success. Task state is saved at .digdag/status/20160703T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

[Slackの結果]

f:id:muchiki0226:20160703210833p:plain

実際の動くコードは下記を参照してください。

github.com