EmbulkでCSVからDBにロードするときのエラーについて調べてみた
以前、Embulkのエラー処理について調べてみるというタイトルで記事を書いたが、あれからずいぶんとバージョンが上がっているので、再度確認してみることにした。
CSVの項目が多過ぎる場合
以下のような警告が出て、スキップされた。
2015-12-11 16:02:29.364 +0900 [WARN] (task-0000): Skipped line 2 (Too many columns): 1,123.40,test1,TEST1,2015-04-24,2015-04-24 01:02:03,2015-04-24 01:02:03.123,999
終了コードは0だった。
ドキュメントを見ると、allow_extra_columns: true を設定すると、余計な項目は無視されて正常な行としてと取り込まれるようだ。
逆に、stop_on_invalid_record: true を設定すると、エラーで終了する。
2015-12-11 16:07:41.280 +0900 [INFO] (cleanup): > 0.02 seconds org.embulk.exec.PartialExecutionException: org.embulk.spi.DataException: Invalid record at line 2: 1,123.40,test1,TEST1,2015-04-24,2015-04-24 01:02:03,2015-04-24 01:02:03.123,999 at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(org/embulk/exec/BulkLoader.java:363) at org.embulk.exec.BulkLoader.doRun(org/embulk/exec/BulkLoader.java:572) ... Error: org.embulk.spi.DataException: Invalid record at line 2: 1,123.40,test1,TEST1,2015-04-24,2015-04-24 01:02:03,2015-04-24 01:02:03.123,999
この場合は、終了コードは1だった。
CSVの項目が少な過ぎる場合
項目が多過ぎる場合とだいたい同じだ。
デフォルトでは、以下のような警告が出て、スキップされる。
2015-12-11 16:10:52.236 +0900 [WARN] (task-0000): Skipped line 2 (Too few columns): 1,123.40,test1,TEST1,2015-04-24,2015-04-24 01:02:03
allow_optional_columns: true を設定すると、足りない項目はnullとして取り込まれる。
当然だが、挿入先のDBの列がNOT NULLだとそっちでエラーになるので注意。
項目が多過ぎる場合と同様、stop_on_invalid_record: true を設定すると、エラーで終了する。
(いるかどうかは分からないが…)項目が少な過ぎる場合は警告、多過ぎる場合はエラー、といった設定はできないようだ。
数値列に数値でない値
例えば、CSV Parserでlongと定義されている列に、"X"とか突っ込んでみる。
以前はエラーだったが、警告が出てスキップされるようになっていた。
2015-12-11 16:21:21.736 +0900 [WARN] (task-0000): Skipped line 3 (java.lang.NumberFormatException: For input string: "X"): X,1234567890.12,test9999,TEST9999,2015-12-31,2015-12-31 23:59:59,2015-12-31 23:59:59.999
これについても、stop_on_invalid_record: true を設定すると、エラーで終了する。
DB側のエラーの場合
桁数オーバー、NULL制約違反などである。
DB側でエラーが発生した場合は、Embulkもエラーで終了する。
こちらに関しては、無視したり警告にするようなオプションは今のところ無い。
気になるのは、そのときテーブルの中身がどうなっているのか?、だ。
embulk-output-jdbcには複数のモードがあり、それぞれで挙動が異なる。
insert
このモードでは、複数の一時テーブルにデータをロードし、最後に対象テーブルにUNIONしてINSERTする。
エラー行があった場合は、最後のINSERTが行われないため、対象テーブルは元のままである。
truncate_insert
このモードでは、insertと同様だが、最後に対象テーブルを空にしてからINSERTする。
なお、モードの名前はtruncate_insertだが、発行されるSQLはTRUNCATEではなくDELETEである。
(DBによってはTRUNCATEはロールバックできないからだろう)
insert_direct
このモードでは、対象テーブルに直接INSERTする。
内部的には、1件ずつSQLを発行するのでなく、JDBCのjava.sql.Statement#executeBatchを利用して、ある程度まとめてから発行する(このサイズはbatch_sizeオプションで制御可能)。
Embulkでは大量のデータをロードする前提のため、全件でcommitするのは無理があり、毎回commitしている。
従って、エラー行があった場合は、途中までは対象テーブルにロードされている。
ただし、トランザクションは行単位ではなく複数行が含まれるので、エラー行の直前の行がロードされているとは限らない。
replace
このモードでは、まず一時テーブルにデータをロードする。最後に対象テーブルをDROPしてから、ALTER TABLEにより一時テーブルを対象テーブルに置き換える。
エラー行があった場合は、最後の処理が行われないため、対象テーブルは元のままである。
なお、DROPとALTER TABLEの間でEmbulkが落ちたりした場合、対象テーブルが消えた状態になってしまうので注意が必要である(PostgreSQL等ではDROPがロールバックされるので大丈夫だが)。
merge
このモードでは、まず複数の一時テーブルにデータをロードする。最後に対象テーブルにロードする際、同じキーのレコードが既にあればUPDATE、無ければINSERTする。
エラー行があった場合は、最後の処理が行われないため、対象テーブルは元のままである。
なお、このモードはembulk-output-mysqlとembulk-output-postgresqlで実装されている。
各モードの比較
mergeは他とちょっと異なるし、truncate_insertはinsertとだいたい同じなので、insertとinsert_directとreplaceを比較してみる(insert_directはTRUNCATEしてから実行すれば他と同じ結果になる)。
処理速度について
insert_directとreplaceはほとんど同じだろう(DROP~ALTER TABLEにはそんなに時間が掛からないだろうし)。
insertは、最後にUNIONしてINSERTするところで時間が掛かりそうだ。
ただ、insert_directやreplaceでは1つのテーブル/一時テーブルにロードするのに対し、insertでは複数の一時テーブルにロードする。
その分速くなっているかもしれないな。
いずれ実際に計測してみるか。
データの整合性について
insertでは、対象テーブルの更新が完全にトランザクション内なので、ロード前→ロード後 に直接遷移する。
すなわち、データの不整合が全く無い。
replaceでは、DROP~ALTER TABLEを行うため、ロード前→対象テーブルが無い→ロード後 のように遷移する。
つまり、一時的にデータが無い時間が存在する。
insert_directでは、ロード前→TRUNCATE→ロード中→ロード後 のようになる。
なので、ロードしている間はずっと、中途半端な状態である。
まあ、結局のところ、要件に応じて適切なモードを選ぼう、ということだな。
Embulk | 0.7.10 |
embulk-output-mysql | 0.4.2 |
embulk-output-postgresql | 0.4.2 |
2015/12/17: embulk-output-mysqlもmergeモードに対応していました。
2015/12/18: embulk-output-redshiftはmergeモードが未実装でした。