EmbulkのCSV parserのスキーマをDBメタ情報により定義する
DBメタ情報からCSVの列を定義したい
EmbulkでCSV parserを使うときは、ymlファイルに列の定義を書く必要がある。
Embulkにはguessという便利な機能があって、CSVファイルから列の定義を出力することもできる。
だけどCSVファイルをDBにロードする場合は、ロード先のDBのメタ情報から動的に列を定義できないかな?と考えたのだ。
- 項目が追加されたりした場合でもymlを修正しなくてよい
- テーブルがたくさんある場合でもymlをコピーしてテーブル名を直すだけでよい
- guessがだまされる特殊なケース(一見YYYYMMDDで日付型っぽいけど実は文字列型で'99999999'のような値がまれに出現する列とか)でも大丈夫
のように、あると便利そうだ。
どうやって実現するか?
CSVパーサを一から書くのは大変なので、まずはCsvParserPluginを継承する。
transactionメソッドをオーバーライドすれば、列の定義(Schemaオブジェクト)を差し替えることができそうだ。
どうやってDBからSchemaオブジェクトを作るか?
これも一から書くのは大変過ぎるので、embulk-input-jdbcかembulk-output-jdbcをうまいこと使おう。
どちらもDBから列の情報を取得しているが、embulk-input-jdbcのAbstractJdbcInputPluginクラスにSchemaを返してくれるメソッドがあった。
private Schema setupTask(JdbcInputConnection con, PluginTask task) throws SQLException
privateだけど…。リフレクションで呼ぶか…。
DBに出力するんだから、embulk-output-jdbcの方が適切な気もするが、残念ながら適当なメソッドが無かった。
DB→CSV→別のDB のような使用法を考えれば、embulk-input-jdbcでもいいのかな。
と言うか、input側とoutput側で似たようなことをやってるので、一箇所にまとめられればよいのだが。
プラグインから別のプラグインを呼ぶ
ExecSession session = Exec.session(); InputPlugin input = session.newPlugin(InputPlugin.class, new PluginType(type));
のようにやると、別のプラグインをインスタンス化できるようだ。
今回は呼びたいメソッドがprotectedとかprivateだったので、とりあえずリフレクションで呼ぶようにしてしまった。
embulk-input-jdbcが修正されたら動かなくなってしまうかも。。
embulk-parser-jdbc-schema-csv
という訳で、embulk-parser-jdbc-schema-csvを作ってみた。
ちょっと名前が長いけど…。
ともかく、使い方はこんな感じ。
in: type: file path_prefix: 'data/test.csv' parser: type: jdbc-schema-csv delimiter: "," header_line: false schema: type: mysql host: localhost database: embulk_test user: embulk_user password: embulk_pass table: input_test out: type: mysql host: localhost database: embulk_test user: embulk_user password: embulk_pass table: input_test mode: insert
columnsの代わりにschemaを指定する。
すると、embulk-parser-jdbc-schema-csvがDBのメタ情報からCSV列を定義してくれる。
対応するinputプラグイン(この場合はembulk-input-mysql)もインストールされている必要がある。
でもこれだとschemaとoutでかなり定義が重複してしまう。
エイリアスを使うと、すっきりする。
in: type: file path_prefix: 'data/test.csv' parser: type: jdbc-schema-csv delimiter: "," header_line: false schema: &OUT type: mysql host: localhost database: embulk_test user: embulk_user password: embulk_pass table: input_test mode: insert out: *OUT
うん、いい感じだ。
Embulk | 0.6.5 |
embulk-input-jdbc | 0.4.0 |
embulk-output-jdbc | 0.2.3 |
embulk-parser-jdbc-schema-csv | 0.0.1 |