今日もプログラミング

IT技術とかプログラミングのこととか特にJavaを中心に書いていきます

Embulkのエラー処理について調べてみる

自分はEmbulkを業務システムに組み込むことを考えている。

となると、気になるのはエラー処理。

エラー時にどんな挙動になるかを確認しておきたい。

Embulkのプラグインの組み合わせは多数あるが、とりあえず想定しているのはCSVファイルを読み込んでRDB(例えばOracle)に突っ込む処理なので、この組み合わせで確認してみる。

 

正常終了

一応正常終了時の終了コードも確認しておく。

当然、0だった。

 

CSVファイルのエラー

列が多い行がある場合

こんなログが出力された。

2015-04-08 22:48:46.731 +0900 [WARN] (task-0000): Skipped (line 18): 17,1360,...
org.embulk.standards.CsvTokenizer$TooManyColumnsException: Too many columns
        at org.embulk.standards.CsvTokenizer.nextRecord(CsvTokenizer.java:84)
        at org.embulk.standards.CsvParserPlugin.run(CsvParserPlugin.java:146)
        at org.embulk.spi.FileInputRunner.run(FileInputRunner.java:145)
        ...
2015-04-08 22:48:46.760 +0900 [INFO] (task-0000): Loading 100 rows
2015-04-08 22:48:46.783 +0900 [INFO] (task-0000): > 0.02 seconds (loaded 100 row
s in total)
2015-04-08 22:48:46.784 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-04-08 22:48:46.808 +0900 [INFO] (main): Committed.

列の多い行はスキップされてWARNログが出力され、それ以外の行はコミットされている。

ん?よく見ると100行全部ロードされているな。

DBの中を見ると、列が多い行も入っている。

ソースを見たところでは、定義された列数分は正常な行として読み込まれ、余分な列のみがエラーとなるようだ。

終了コードを確認したところ、0だった。

 

列が少ない行がある場合

こんなログが出力された。

2015-04-08 22:55:15.677 +0900 [WARN] (task-0000): Skipped (line 18): 17,1360,...
org.embulk.standards.CsvTokenizer$TooFewColumnsException: Too few columns
        at org.embulk.standards.CsvTokenizer.nextColumn(CsvTokenizer.java:124)
        at org.embulk.standards.CsvParserPlugin.nextColumn(CsvParserPlugin.java:236)
        at org.embulk.standards.CsvParserPlugin.access$000(CsvParserPlugin.java:26)
        at org.embulk.standards.CsvParserPlugin$1.stringColumn(CsvParserPlugin.java:193)
        at org.embulk.spi.Column.visit(Column.java:57)
        at org.embulk.spi.Schema.visitColumns(Schema.java:48)
        at org.embulk.standards.CsvParserPlugin.run(CsvParserPlugin.java:150)
        at org.embulk.spi.FileInputRunner.run(FileInputRunner.java:145)
        ...
2015-04-08 22:55:15.715 +0900 [INFO] (task-0000): Loading 99 rows
2015-04-08 22:55:15.731 +0900 [INFO] (task-0000): > 0.01 seconds (loaded 99 rows
 in total)
2015-04-08 22:55:15.732 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-04-08 22:55:15.758 +0900 [INFO] (main): Committed.

こちらの場合は、列の少ない行はスキップされ、それ以外の行がコミットされた。

終了コードは、やはり0だった。

 

embulk-input-filesplitを使う場合

ふと気になって、embulk-input-filesplitを使って試してみた。

やはり、ログに出る行番号は、分割位置からの行番号になっていた。

embulk-input-filesplitでファイルを読み込み→CSV Parserでエラー という順序なので、本当の(ファイルの先頭からの)行番号を出すのは無理っぽいな…。

各タスクの開始行番号をログに出すくらいか。

2箇所ログを見なくてはいけないが、一応スキップ位置は特定できる。

 

数値列が数値でない場合

こんなログが出力された。

2015-04-08 22:59:12.985 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-04-08 22:59:13.008 +0900 [INFO] (main): Transaction partially failed. Cleaning up the intermediate data. Use -r option to make it resumable.
org.embulk.exec.PartialExecutionException: org.embulk.standards.CsvParserPlugin$CsvRecordValidateException: java.lang.NumberFormatException: For input string: "
a1360"
        at org.embulk.exec.LocalExecutor$ProcessState.buildPartialExecuteException(org/embulk/exec/LocalExecutor.java:333)
        at org.embulk.exec.LocalExecutor.doRun(org/embulk/exec/LocalExecutor.java:483)
        ...
        at org.embulk.cli.Main.main(org/embulk/cli/Main.java:13)
Caused by: org.embulk.standards.CsvParserPlugin$CsvRecordValidateException: java.lang.NumberFormatException: For input string: "a1360"
        at org.embulk.standards.CsvParserPlugin$1.longColumn(CsvParserPlugin.java:171)
        at org.embulk.spi.Column.visit(Column.java:53)
        at org.embulk.spi.Schema.visitColumns(Schema.java:48)
        at org.embulk.standards.CsvParserPlugin.run(CsvParserPlugin.java:150)
        at org.embulk.spi.FileInputRunner.run(FileInputRunner.java:145)
        ...

Error: org.embulk.standards.CsvParserPlugin$CsvRecordValidateException: java.lang.NumberFormatException: For input string: "a1360"

この場合は、例外がcatchされず、異常終了した。

残念ながら、ログを見てもエラーの行番号は分からない。

トランザクションはコミットされておらず、DBは0件だった。

未捕捉例外で終了したので、終了コードは1だった。

 

なお、embulk-output-jdbcでは、ある程度レコードがたまったらコミットする。

上は100件しかないので全件でコミットされたが、もっと件数が多い場合はどうなるんだろうか?

10万件で試したところ、エラー行の無いトランザクションまではコミットされ、エラー行を含むトランザクションロールバックされて終了、という結果になった。

DBを見ると、ログ通り、33,390件入っていた。

2015-04-08 23:11:05.483 +0900 [INFO] (task-0000): Loading 16,704 rows
2015-04-08 23:11:06.073 +0900 [INFO] (task-0000): > 0.58 seconds (loaded 16,704rows in total)
2015-04-08 23:11:06.323 +0900 [INFO] (task-0000): Loading 16,686 rows
2015-04-08 23:11:06.773 +0900 [INFO] (task-0000): > 0.45 seconds (loaded 33,390rows in total)
2015-04-08 23:11:06.793 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-04-08 23:11:06.803 +0900 [INFO] (main): Transaction partially failed. Cleaning up the intermediate data. Use -r option to make it resumable.
org.embulk.exec.PartialExecutionException: org.embulk.standards.CsvParserPlugin$CsvRecordValidateException: java.lang.NumberFormatException: For input string: "
a34750"
        at org.embulk.exec.LocalExecutor$ProcessState.buildPartialExecuteExcepti
        ...

 

DBでのエラー

embulk-output-oracleの場合は、普通にJDBCでINSERTするモード(normal)、ダイレクト・パス・インサートするモード(direct)と、ネイティブライブラリのOCI(Oracle Call Interface)経由でダイレクト・パス・インサートするモード(oci)がある。

それぞれについて確認した。

通常のINSERT(normal)

桁数オーバー

未捕捉例外により異常終了した。

2015-04-09 09:13:48.906 +0900 [INFO] (task-0000): Loading 16,704 rows
2015-04-09 09:13:49.476 +0900 [INFO] (task-0000): > 0.57 seconds (loaded 16,704rows in total)
2015-04-09 09:13:49.716 +0900 [INFO] (task-0000): Loading 16,686 rows
2015-04-09 09:13:50.116 +0900 [INFO] (task-0000): > 0.40 seconds (loaded 33,390rows in total)
2015-04-09 09:13:50.346 +0900 [INFO] (task-0000): Loading 16,686 rows
2015-04-09 09:13:50.396 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-04-09 09:13:50.416 +0900 [INFO] (main): Transaction partially failed. Cleaning up the intermediate data. Use -r option to make it resumable.
org.embulk.exec.PartialExecutionException: java.lang.RuntimeException: java.sql.BatchUpdateException: ORA-01438: この列に許容される指定精度より大きな値です

        at org.embulk.exec.LocalExecutor$ProcessState.buildPartialExecuteException(org/embulk/exec/LocalExecutor.java:333)
        at org.embulk.exec.LocalExecutor.doRun(org/embulk/exec/LocalExecutor.java:483)
        ...
        at org.embulk.cli.Main.main(org/embulk/cli/Main.java:13)
Caused by: java.lang.RuntimeException: java.sql.BatchUpdateException: ORA-01438: この列に許容される指定精度より大きな値です

        at org.embulk.output.jdbc.AbstractJdbcOutputPlugin$PluginPageOutput.add(AbstractJdbcOutputPlugin.java:584)
        at org.embulk.spi.PageBuilder.doFlush(PageBuilder.java:197)
        at org.embulk.spi.PageBuilder.flush(PageBuilder.java:203)
        at org.embulk.spi.PageBuilder.addRecord(PageBuilder.java:182)
        at org.embulk.standards.CsvParserPlugin.run(CsvParserPlugin.java:216)
        at org.embulk.spi.FileInputRunner.run(FileInputRunner.java:145)
        ...

Error: java.lang.RuntimeException: java.sql.BatchUpdateException: ORA-01438: この列に許容される指定精度より大きな値です

DBを見ると、途中まではコミットされていた。

エラーの行番号は出ていない。

たぶん、BatchUpdateException#getUpdateCountsを使えば出せそう。

 

キー重複

あらかじめ1レコード入れてから実行してみた。

桁数オーバーのときと同様、未捕捉例外により異常終了した。

 

ダイレクト・パス・インサート(direct)

normalと同じだった。

 

OCI経由でのダイレクト・パス・インサート(oci)

桁数オーバー

未捕捉例外により異常終了した。

2015-04-09 18:57:34.298 +0900 [INFO] (task-0000): Loading 26,886 rows
2015-04-09 18:57:34.471 +0900 [INFO] (task-0000): > 0.17 seconds (loaded 26,886rows in total)
2015-04-09 18:57:34.876 +0900 [INFO] (task-0000): Loading 26,886 rows
2015-04-09 18:57:34.929 +0900 [ERROR] (task-0000): OCI : OCIDirPathColArrayToStream failed. ORA-01438: value larger than specified precision allowed for this column

2015-04-09 18:57:34.930 +0900 [INFO] (task-0000): OCI : close for [localhost:1521/TESTDB, TEST_USER, EXAMPLE].
2015-04-09 18:57:34.930 +0900 [INFO] (task-0000): OCI : start to rollback.
2015-04-09 18:57:34.936 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-04-09 18:57:34.967 +0900 [INFO] (main): Transaction partially failed. Cleaning up the intermediate data. Use -r option to make it resumable.
org.embulk.exec.PartialExecutionException: java.lang.RuntimeException: java.sql.SQLException: OCI : OCIDirPathColArrayToStream failed. ORA-01438: value larger than specified precision allowed for this column

DBの件数を見ると、0件だった。

ociモードでは、途中ではコミットせず、最後にコミットまたはロールバックをするようにしているためである。

こちらもエラー行番号は出ていないが、たぶんOCIにエラー位置を取れるAPIはあると思う。

 

キー重複

これも異常終了…、と思いきや、正常終了してしまった。。

DBを見ると、確かに同一キーのレコードが入っている。

SQL> SELECT ID,NUM FROM EXAMPLE WHERE ID=34751;

        ID        NUM
---------- ----------
     34751          1
     34751       7038

ただし、この後にINSERT文を実行したりすると、エラーになってしまう。

INSERT INTO EXAMPLE VALUES(99999999,1,'1','1','1','1','1','1','1','1','1','1')
*
行1でエラーが発生しました。:
ORA-01502:
索引'TEST_USER.SYS_C009854'またはそのパーティションが使用不可の状態です。

Oracleのドキュメントを見ると、以下のような記述があった。

ダイレクト・パス・ロード時にUNIQUE制約が使用可能であっても、その制約に違反する行もロードされます。
(これは、そのような行が拒否される従来型パスとは異なります。)
UNIQUE制約は、ダイレクト・パス・ロードの最後で索引が再作成されるときに検証されます。
違反が検出されると、索引は索引使用禁止状態のままになります。
詳細は、「使用禁止状態(Index Unusable)のままの索引」を参照してください。

なお、SQL*LoaderでもAPPENDでロードすると、同じ現象が発生することがある。

 

ほしいものリスト

調べた結果、ほしくなったものをまとめてみる。

  1. 列の過不足がある場合にスキップするかエラーにするかを選べるようにしたい。
  2. 警告がある場合は終了コードを変えた方がよいかも。
  3. どの行でエラーが発生したか分かるようにしたい。
  4. OCIモードでキー重複が発生した場合に検知したい(この辺見るとできそう)。

1ができれば、2は無くてもいいかな?

大抵はTRUNCATEしてからロードするから、4もあまり必要無いかも。

3はぜひほしいけど、結構めんどいかも。。

がんばってコード書いてみるか…。

 

Embulk 0.5.4
embulk-input-filesplit 0.1.2
embulk-output-oracle 0.2.2