今日もプログラミング

IT技術とかプログラミングのこととか特にJavaを中心に書いていきます

EmbulkのCSV parserのスキーマをDBメタ情報により定義する

DBメタ情報からCSVの列を定義したい

EmbulkCSV parserを使うときは、ymlファイルに列の定義を書く必要がある。

Embulkにはguessという便利な機能があって、CSVファイルから列の定義を出力することもできる。

だけどCSVファイルをDBにロードする場合は、ロード先のDBのメタ情報から動的に列を定義できないかな?と考えたのだ。

  • 項目が追加されたりした場合でもymlを修正しなくてよい
  • テーブルがたくさんある場合でもymlをコピーしてテーブル名を直すだけでよい
  • guessがだまされる特殊なケース(一見YYYYMMDDで日付型っぽいけど実は文字列型で'99999999'のような値がまれに出現する列とか)でも大丈夫

のように、あると便利そうだ。

 

どうやって実現するか?

CSVパーサを一から書くのは大変なので、まずはCsvParserPluginを継承する。

transactionメソッドをオーバーライドすれば、列の定義(Schemaオブジェクト)を差し替えることができそうだ。

 

どうやってDBからSchemaオブジェクトを作るか?

これも一から書くのは大変過ぎるので、embulk-input-jdbcembulk-output-jdbcをうまいこと使おう。

どちらもDBから列の情報を取得しているが、embulk-input-jdbcのAbstractJdbcInputPluginクラスにSchemaを返してくれるメソッドがあった。

    private Schema setupTask(JdbcInputConnection con, PluginTask task) throws SQLException

privateだけど…。リフレクションで呼ぶか…。

DBに出力するんだから、embulk-output-jdbcの方が適切な気もするが、残念ながら適当なメソッドが無かった。

DB→CSV→別のDB のような使用法を考えれば、embulk-input-jdbcでもいいのかな。

と言うか、input側とoutput側で似たようなことをやってるので、一箇所にまとめられればよいのだが。

 

プラグインから別のプラグインを呼ぶ

    ExecSession session = Exec.session();
    InputPlugin input = session.newPlugin(InputPlugin.class, new PluginType(type));

のようにやると、別のプラグインインスタンス化できるようだ。

今回は呼びたいメソッドがprotectedとかprivateだったので、とりあえずリフレクションで呼ぶようにしてしまった。

embulk-input-jdbcが修正されたら動かなくなってしまうかも。。

 

embulk-parser-jdbc-schema-csv

という訳で、embulk-parser-jdbc-schema-csvを作ってみた。

ちょっと名前が長いけど…。

ともかく、使い方はこんな感じ。

in:
  type: file
  path_prefix: 'data/test.csv'
  parser:
    type: jdbc-schema-csv
    delimiter: ","
    header_line: false
    schema:
      type: mysql
      host: localhost
      database: embulk_test
      user: embulk_user
      password: embulk_pass
      table: input_test
out:
  type: mysql
  host: localhost
  database: embulk_test
  user: embulk_user
  password: embulk_pass
  table: input_test
  mode: insert

columnsの代わりにschemaを指定する。

すると、embulk-parser-jdbc-schema-csvがDBのメタ情報からCSV列を定義してくれる。

対応するinputプラグイン(この場合はembulk-input-mysql)もインストールされている必要がある。

 

でもこれだとschemaとoutでかなり定義が重複してしまう。

エイリアスを使うと、すっきりする。

in:
  type: file
  path_prefix: 'data/test.csv'
  parser:
    type: jdbc-schema-csv
    delimiter: ","
    header_line: false
    schema: &OUT
      type: mysql
      host: localhost
      database: embulk_test
      user: embulk_user
      password: embulk_pass
      table: input_test
      mode:  insert
out: *OUT

うん、いい感じだ。

 

Embulk 0.6.5
embulk-input-jdbc 0.4.0
embulk-output-jdbc 0.2.3
embulk-parser-jdbc-schema-csv 0.0.1