今日もプログラミング

IT技術とかプログラミングのこととか特にJavaを中心に書いていきます

EmbulkでCSVからDBにロードするときのエラーについて調べてみた

以前、Embulkのエラー処理について調べてみるというタイトルで記事を書いたが、あれからずいぶんとバージョンが上がっているので、再度確認してみることにした。

CSVの項目が多過ぎる場合

以下のような警告が出て、スキップされた。

2015-12-11 16:02:29.364 +0900 [WARN] (task-0000): Skipped line 2 (Too many columns): 1,123.40,test1,TEST1,2015-04-24,2015-04-24 01:02:03,2015-04-24 01:02:03.123,999

終了コードは0だった。

 

ドキュメントを見ると、allow_extra_columns: true を設定すると、余計な項目は無視されて正常な行としてと取り込まれるようだ。

 

逆に、stop_on_invalid_record: true を設定すると、エラーで終了する。

2015-12-11 16:07:41.280 +0900 [INFO] (cleanup): > 0.02 seconds
org.embulk.exec.PartialExecutionException: org.embulk.spi.DataException: Invalid record at line 2: 1,123.40,test1,TEST1,2015-04-24,2015-04-24 01:02:03,2015-04-24 01:02:03.123,999
        at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(org/embulk/exec/BulkLoader.java:363)
        at org.embulk.exec.BulkLoader.doRun(org/embulk/exec/BulkLoader.java:572)
        ...

Error: org.embulk.spi.DataException: Invalid record at line 2: 1,123.40,test1,TEST1,2015-04-24,2015-04-24 01:02:03,2015-04-24 01:02:03.123,999

この場合は、終了コードは1だった。

 

CSVの項目が少な過ぎる場合

項目が多過ぎる場合とだいたい同じだ。

デフォルトでは、以下のような警告が出て、スキップされる。

2015-12-11 16:10:52.236 +0900 [WARN] (task-0000): Skipped line 2 (Too few columns): 1,123.40,test1,TEST1,2015-04-24,2015-04-24 01:02:03

 

allow_optional_columns: true を設定すると、足りない項目はnullとして取り込まれる。

当然だが、挿入先のDBの列がNOT NULLだとそっちでエラーになるので注意。

 

項目が多過ぎる場合と同様、stop_on_invalid_record: true を設定すると、エラーで終了する。

(いるかどうかは分からないが…)項目が少な過ぎる場合は警告、多過ぎる場合はエラー、といった設定はできないようだ。

 

数値列に数値でない値

例えば、CSV Parserでlongと定義されている列に、"X"とか突っ込んでみる。

以前はエラーだったが、警告が出てスキップされるようになっていた。

2015-12-11 16:21:21.736 +0900 [WARN] (task-0000): Skipped line 3 (java.lang.NumberFormatException: For input string: "X"): X,1234567890.12,test9999,TEST9999,2015-12-31,2015-12-31 23:59:59,2015-12-31 23:59:59.999

 

これについても、stop_on_invalid_record: true を設定すると、エラーで終了する。

 

DB側のエラーの場合

桁数オーバー、NULL制約違反などである。

DB側でエラーが発生した場合は、Embulkもエラーで終了する。

こちらに関しては、無視したり警告にするようなオプションは今のところ無い。

 

気になるのは、そのときテーブルの中身がどうなっているのか?、だ。

embulk-output-jdbcには複数のモードがあり、それぞれで挙動が異なる。

insert

このモードでは、複数の一時テーブルにデータをロードし、最後に対象テーブルにUNIONしてINSERTする。

エラー行があった場合は、最後のINSERTが行われないため、対象テーブルは元のままである。

 

truncate_insert

このモードでは、insertと同様だが、最後に対象テーブルを空にしてからINSERTする。

なお、モードの名前はtruncate_insertだが、発行されるSQLはTRUNCATEではなくDELETEである。

(DBによってはTRUNCATEはロールバックできないからだろう)

 

insert_direct

このモードでは、対象テーブルに直接INSERTする。

内部的には、1件ずつSQLを発行するのでなく、JDBCjava.sql.Statement#executeBatchを利用して、ある程度まとめてから発行する(このサイズはbatch_sizeオプションで制御可能)。

Embulkでは大量のデータをロードする前提のため、全件でcommitするのは無理があり、毎回commitしている。

従って、エラー行があった場合は、途中までは対象テーブルにロードされている。

ただし、トランザクションは行単位ではなく複数行が含まれるので、エラー行の直前の行がロードされているとは限らない。

 

replace

このモードでは、まず一時テーブルにデータをロードする。最後に対象テーブルをDROPしてから、ALTER TABLEにより一時テーブルを対象テーブルに置き換える。

エラー行があった場合は、最後の処理が行われないため、対象テーブルは元のままである。

なお、DROPとALTER TABLEの間でEmbulkが落ちたりした場合、対象テーブルが消えた状態になってしまうので注意が必要である(PostgreSQL等ではDROPがロールバックされるので大丈夫だが)。

 

merge

このモードでは、まず複数の一時テーブルにデータをロードする。最後に対象テーブルにロードする際、同じキーのレコードが既にあればUPDATE、無ければINSERTする。

エラー行があった場合は、最後の処理が行われないため、対象テーブルは元のままである。

なお、このモードはembulk-output-mysqlとembulk-output-postgresqlで実装されている。

 

各モードの比較

mergeは他とちょっと異なるし、truncate_insertはinsertとだいたい同じなので、insertとinsert_directとreplaceを比較してみる(insert_directはTRUNCATEしてから実行すれば他と同じ結果になる)。

 

処理速度について

insert_directとreplaceはほとんど同じだろう(DROP~ALTER TABLEにはそんなに時間が掛からないだろうし)。

insertは、最後にUNIONしてINSERTするところで時間が掛かりそうだ。

ただ、insert_directやreplaceでは1つのテーブル/一時テーブルにロードするのに対し、insertでは複数の一時テーブルにロードする。

その分速くなっているかもしれないな。

いずれ実際に計測してみるか。

 

データの整合性について

insertでは、対象テーブルの更新が完全にトランザクション内なので、ロード前→ロード後 に直接遷移する。

すなわち、データの不整合が全く無い。

replaceでは、DROP~ALTER TABLEを行うため、ロード前→対象テーブルが無い→ロード後 のように遷移する。

つまり、一時的にデータが無い時間が存在する。

insert_directでは、ロード前→TRUNCATE→ロード中→ロード後 のようになる。

なので、ロードしている間はずっと、中途半端な状態である。

 

まあ、結局のところ、要件に応じて適切なモードを選ぼう、ということだな。

 

Embulk 0.7.10
embulk-output-mysql 0.4.2
embulk-output-postgresql 0.4.2

 

2015/12/17: embulk-output-mysqlもmergeモードに対応していました。

2015/12/18: embulk-output-redshiftはmergeモードが未実装でした。