データベースのタイムゾーン付型について調べてみた
embulk-input-jdbcでこんな問題が上がっていたので、データベースのタイムゾーン付の型について調べてみた。
MySQL (5.6)
ドキュメントを見ると、TIMESTAMP型はタイムゾーンに対応しているらしい。
DB内ではUTC、クライアント側ではtime_zoneシステム変数で指定されたタイムゾーンになるようだ。
実際に試してみる。
mysql> show variables like '%time_zone%'; +------------------+-------+ | Variable_name | Value | +------------------+-------+ | system_time_zone | UTC | | time_zone | UTC | +------------------+-------+ 2 rows in set (0.19 sec) mysql> create table test_tz ( -> id char(2), -> dt timestamp -> ); Query OK, 0 rows affected (0.59 sec) mysql> insert into test_tz values('01', '2016-03-13 14:00:00'); Query OK, 1 row affected (0.23 sec) mysql> select * from test_tz; +------+---------------------+ | id | dt | +------+---------------------+ | 01 | 2016-03-13 14:00:00 | +------+---------------------+ 1 row in set (0.16 sec)
time_zoneシステム変数がUTCなので、日時はUTCとして扱われる。
mysql> set time_zone = '+09:00'; Query OK, 0 rows affected (0.19 sec) mysql> show variables like '%time_zone%'; +------------------+--------+ | Variable_name | Value | +------------------+--------+ | system_time_zone | UTC | | time_zone | +09:00 | +------------------+--------+ 2 rows in set (0.19 sec) mysql> select * from test_tz; +------+---------------------+ | id | dt | +------+---------------------+ | 01 | 2016-03-13 23:00:00 | +------+---------------------+ 1 row in set (0.17 sec)
time_zoneを変えると、日時はそのタイムゾーンで扱われる。
Oracle (12c)
Oracleでは、TIMESTAMP WITH TIME ZONE型とTIMESTAMP WITH LOCAL TIME ZONE型というのがある。
この辺りに詳しい説明があった。
まず、TIMESTAMP WITH TIME ZONE型を試してみる。
SQL> create table test_tz ( 2 id char(2), 3 dt timestamp with time zone 4 ); 表が作成されました。 SQL> insert into test_tz values('01', '2016-03-13 14:00:00'); insert into test_tz values('01', '2016-03-13 14:00:00') * 行1でエラーが発生しました。: ORA-01840: 入力した値の長さが日付書式に対して不足しています
明示的にタイムゾーンを指定しないといけないようだ。
SQL> insert into test_tz values('01', '2016-03-13 14:00:00+09:00'); 1行が作成されました。 SQL> select * from test_tz; ID DT ---- --------------------------------------------------------------------------- 01 16-03-13 14:00:00.000000 +09:00
SQL> select * from test_tz where dt='2016-03-13 05:00:00+00:00'; ID DT ---- --------------------------------------------------------------------------- 01 16-03-13 14:00:00.000000 +09:00
次に、TIMESTAMP WITH LOCAL TIME ZONE型を試してみる。
SQL> create table test_ltz ( 2 id char(2), 3 dt timestamp with local time zone 4 ); 表が作成されました。 SQL> insert into test_ltz values('01', '2016-03-13 14:00:00'); 1行が作成されました。 SQL> select * from test_ltz; ID DT ---- --------------------------------------------------------------------------- 01 16-03-13 14:00:00.000000
こちらではタイムゾーンの指定は必要ない。
selectすると、insertした通りの日時で表示される。
これを、UTC環境のOracleクライアントから参照してみる。
SQL> select * from test_ltz; ID DT -- --------------------------------------------------------------------------- 01 13-MAR-16 05.00.00.000000 AM
そうすると、UTCに変換されて表示された。
つまり、クライアント環境のタイムゾーンで扱われるということだ。
SQL Server (2012)
ドキュメントによると、datetimeoffset型がタイムゾーンを保持できる。
2> create table test_tz ( 3> id char(2), 4> dt datetimeoffset 5> ); 6> go 1> 2> select * from test_tz 3> go id dt ---- ---------------------------------- (0 行処理されました) 1> insert into test_tz values('01', '2016-03-13 14:00:00') 2> go (1 行処理されました) 1> select * from test_tz 2> go id dt ---- ---------------------------------- 01 2016-03-13 14:00:00.0000000 +00:00 (1 行処理されました)
JST環境のSQL Serverだが、デフォルトではUTCになるようだ。
タイムゾーンを明示的に指定することもできる。
1> insert into test_tz values('01', '2016-03-13 14:00:00+09:00') 2> go (1 行処理されました) 1> select * from test_tz 2> go id dt ---- ---------------------------------- 01 2016-03-13 14:00:00.0000000 +00:00 01 2016-03-13 14:00:00.0000000 +09:00 (2 行処理されました)
PostgreSQL (9.4)
ドキュメントによると、DB内ではUTC、クライアント側ではTimeZoneシステムパラメータで指定されたタイムゾーンになるようだ。
実際に試してみる。
postgres=> show timezone; TimeZone ---------- UTC (1 行) postgres=> create table test_tz ( postgres(> id char(2), postgres(> dt timestamp with time zone postgres(> ); CREATE TABLE ^ postgres=> insert into test_tz values('01', '2016-03-13 14:00:00'); INSERT 0 1 postgres=> select * from test_tz; id | dt ----+------------------------ 01 | 2016-03-13 14:00:00+00 (1 行)
たしかに、TimeZoneの値であるUTCで解釈されている。
タイムゾーンを明示的に指定してinsertすることもできる。
postgres=> insert into test_tz values('01', '2016-03-13 14:00:00+09:00'); INSERT 0 1 postgres=> select * from test_tz; id | dt ----+------------------------ 01 | 2016-03-13 14:00:00+00 01 | 2016-03-13 05:00:00+00 (2 行)
TimeZoneパラメータを変えると、そのタイムゾーンで解釈されるようになる。
postgres=> show timezone; TimeZone ---------- Japan (1 行) postgres=> select * from test_tz; id | dt ----+------------------------ 01 | 2016-03-13 23:00:00+09 01 | 2016-03-13 14:00:00+09 (2 行) postgres=> insert into test_tz values('03', '2016-03-13 15:00:00'); INSERT 0 1 postgres=> select * from test_tz; id | dt ----+------------------------ 01 | 2016-03-13 23:00:00+09 01 | 2016-03-13 14:00:00+09 03 | 2016-03-13 15:00:00+09 (3 行)
Redshift
ドキュメントによると、タイムゾーン付のTIMESTAMP型はまだサポートされていないようだ。
まとめ
各DBMSにおいて、INSERTする日時とかSELECTされた日時がどのタイムゾーンになるかをまとめてみた。
DBMS | 型 | タイムゾーン |
---|---|---|
MySQL | TIMESTAMP | time_zoneシステム変数による |
Oracle | TIMESTAMP WITH TIME ZONE | 明示する |
TIMESTAMP WITH LOCAL TIME ZONE | クライアント環境のタイムゾーン | |
SQL Server | DATETIMEOFFSET | UTC |
PostgreSQL | time/timestamp with time zone | TimeZoneシステムパラメータによる |
Redshift | - | - |
SQL ServerのNative Clientを使ってバルクロードしてみる
embulk-output-sqlserverをリリースしました
先日、バルクデータローダEmbulkのプラグインとして、SQL Serverにロードするためのembulk-output-sqlserverをリリースした。
しかし、embulk-output-sqlserverは単純にINSERT文でデータをロードするので、たぶん速くない。
SQL ServerにはBCPという高速なバルクロードユーティリティがある。
その機能を使えるライブラリもあるようなので、それを試してみることにした。
ちなみに、embulk-output-oracleもOracleのバルクロード機能を呼び出して高速化している。
SQL Server Native Client (ODBC)
Microsoftのドキュメントを見ると、SQL Server Native Clientには「OLE DB」と「ODBC」の2種類があるらしい。
だが…、どうもOLE DBの方は非推奨で、サポートされなくなるらしい。
というわけで、ODBCの方を使おう。
サンプルを見てみる
APIドキュメントとかも見てみたが、いまいち使い方がよく分からない。
やはりサンプ見るのが一番!
というわけで、サンプルをダウンロードしてみた。
サンプルをインストールしてみると、C:\Program Files\Microsoft SQL Server\100\Samples\Engine\Data Access\odbc にそれらしいサンプルが展開されていた。
サンプルを作ってみる
で、サンプルを参考にしつつ、自分の環境に合わせて作ったのが以下のコードだ。
#include <tchar.h> #include <locale.h> #include <stdio.h> #include <string.h> #include <windows.h> #include <sql.h> #include <sqlext.h> #include <odbcss.h> BOOLEAN isSQLError(TCHAR* message, SQLRETURN ret) { if (ret != SQL_SUCCESS_WITH_INFO && ret != SQL_SUCCESS) { wprintf(L"%s\r\n", message); return TRUE; } return FALSE; } BOOLEAN isBCPError(TCHAR* message, RETCODE ret) { if (ret != SUCCEED) { wprintf(L"%s\r\n", message); return TRUE; } return FALSE; } void printError(SQLHENV henv, SQLHDBC hdbc) { SQLWCHAR sqlState[6]; SQLINTEGER nativeError; SQLWCHAR errorMessage[256]; SQLSMALLINT errorMessageLen; SQLRETURN ret = SQLError(henv, hdbc, NULL, sqlState, &nativeError, errorMessage, 256, &errorMessageLen); wprintf(L"%s\r\n", errorMessage); } int _tmain(int argc, _TCHAR* argv[]) { _tsetlocale(LC_ALL, _T("")); SQLHENV henv = SQL_NULL_HENV; if (isSQLError(L"SQLAllocHandle(SQL_HANDLE_ENV)", SQLAllocHandle(SQL_HANDLE_ENV, NULL, &henv))) { return 1; } // ODBCバージョンの設定 if (isSQLError(L"SQLSetEnvAttr(SQL_ATTR_ODBC_VERSION)", SQLSetEnvAttr(henv, SQL_ATTR_ODBC_VERSION, (SQLPOINTER)SQL_OV_ODBC3, SQL_IS_INTEGER))) { return 1; } // ODBC接続ハンドル HDBC hdbc = SQL_NULL_HDBC; if (isSQLError(L"SQLAllocHandle(SQL_HANDLE_DBC)", SQLAllocHandle(SQL_HANDLE_DBC, henv, &hdbc))) { return 1; } // BULK COPYモードの設定 if (isSQLError(L"SQLSetConnectAttr(SQL_COPT_SS_BCP)", SQLSetConnectAttr(hdbc, SQL_COPT_SS_BCP, (SQLPOINTER)SQL_BCP_ON, SQL_IS_INTEGER))) { return 1; } // DBに接続 //SQLDriverConnect( if (isSQLError(L"SQLDriverConnect", SQLDriverConnect(hdbc, NULL, L"Driver={SQL Server Native Client 11.0};Server=localhost,1433\\SQLEXPRESS;UID=user;PWD=password;", SQL_NTS, NULL, SQL_NTS, NULL, SQL_DRIVER_NOPROMPT))) { printError(henv, hdbc); return 1; } // BULK COPY開始 if (isBCPError(L"bcp_init", bcp_init(hdbc, L"TESTDB.dbo.TEST1", NULL, NULL, DB_IN))) { printError(henv, hdbc); return 1; } // 列1を設定する if (isBCPError(L"bcp_bind(1)", bcp_bind(hdbc, (LPCBYTE)"XXX", 0, SQL_VARLEN_DATA, (LPCBYTE)"", 1, SQLCHARACTER, 1))) { printError(henv, hdbc); return 1; } // 列2を設定する if (isBCPError(L"bcp_bind(2)", bcp_bind(hdbc, (LPCBYTE)"YYY", 0, SQL_VARLEN_DATA, (LPCBYTE)"", 1, SQLCHARACTER, 2))) { printError(henv, hdbc); return 1; } // 1行送信する if (isBCPError(L"bcp_sendrow", bcp_sendrow(hdbc))) { printError(henv, hdbc); return 1; } // BULK COPY終了 if (bcp_done(hdbc) != SUCCEED) { printError(henv, hdbc); return 1; } SQLDisconnect(hdbc); SQLFreeHandle(SQL_HANDLE_DBC, hdbc); SQLFreeHandle(SQL_HANDLE_ENV, henv); return 0; }
なお、ロード先のテーブルはこんな感じ。
CREATE TABLE TEST1 ( ID char(4), VALUE varchar(20) )
ビルドについて
追加の依存ファイルに、sqlncli11.libが必要。
SQL Serverの SDK\Lib フォルダの下にある。
DBへの接続
DBに接続するAPIは、SQLConnectとSQLDriverConnectがある。
SQLConnectの方は、あらかじめODBCデータソースを定義しておかなくてはならない。
SQLDriverConnectなら、JDB Driverのように、サーバ名やインスタンス名、ユーザ名、パスワードを直接渡して接続できる。
bcp_bind
bcp_bindは、1列分の値を設定する。
つまり、10列あるテーブルであれば、10回bcp_bindを呼ぶ必要がある。
どの列に設定するかは、最後の引数で列のインデックス(1始まり)を指定する。
bcp_sendrow
bcp_sendrowは、bcp_bindを全列に設定した後に呼び、1行分のデータを送信する。
つまり、100行のデータをロードする場合は、100回bcp_sendrowを呼ぶわけだ。
bcp_done
bcp_doneは、コミットしてロードを終了する。
bcp_batchを使うと、終了せずにコミットだけすることができる。
できてみれば簡単そうだが…
実際には、何ヶ所かはまってしまった。
SQLDriverConnectに渡す接続文字列が分からなかったりとか、bcp_bindで1行分のデータをまとめて渡すと思い込んでいたりとか…。
今度は、実際にこれで大量データをロードしてみたい。
OS | Windows 7 Enterprise |
SQL Server | 2012 |
Visual Studio | 2010 |
Oracle SQL*LoaderのPARTITIONについて
Oracle SQL*Loaderでは、制御ファイルに以下のように書くと、特定のパーティションにロードすることができる。
INSERT INTO SOME_TABLE PARTITION (SOME_PARTITION) VALUES ...
挙動について
ドキュメントによると、
「行が指定のパーティションに対応しない場合、その行は拒否され」
とある。
実際に試してみたが、パーティションの条件に合わない行はロードされず、合う行だけがロードされる。
なお、PARTITIONを指定しない場合は、各行がそれぞれの条件に合うパーティションにロードされる。
パフォーマンスについて
これもドキュメントにある通り、PARTITION指定のロードでは、「従来型パス・ロード」が使用される。
SQL*Loaderには「ダイレクト・パス・ロード」と「従来型パス・ロード」があり、前者は高速だが、後者はそれほどでもない(1行ずつINSERTするよりは高速だが…)。
結論
ロードするデータが、単一のパーティションに含まれると分かっている場合は、PARTITIONを指定しない方がよいだろう(その方高速にロードされるので)。
そうでなく、特定パーティションに含まれない行を除外したい場合のみ、PARTITIONを指定するのがよさそうだ。
jnr-ffiでJavaからCのポインタを使う
jnr-ffiは、Javaから簡単にCのライブラリを呼び出せるフレームワークだ。
JNIのような煩雑なコードを書かなくてよいのがメリット。
前に記事を書いたときは、プリミティブ型とかStringしか試さなかったので、今回はそれ以外のポインタを試してみる。
Pointerクラスを使ってみる
まず、C側のコードを書く。環境は64bitのWindows。
extern "C" __declspec(dllexport) void add(int* arguments, int* result) { *result = arguments[0] + arguments[1]; }
NativeLib.dllという名前でビルドした。
そして、Java側のコードを書く。
public interface Native1 { void add(Pointer arguments, Pointer result); }
import java.nio.ByteBuffer; import jnr.ffi.LibraryLoader; import jnr.ffi.Pointer; import jnr.ffi.Runtime; public class PointerTest1 { public static void main(String[] args) { Native1 native1 = LibraryLoader.create(Native1.class).load("NativeLib"); Pointer in = Pointer.wrap(Runtime.getSystemRuntime(), ByteBuffer.allocate(8)); in.putInt(0, 60000); in.putInt(4, 8000); Pointer out = Pointer.wrap(Runtime.getSystemRuntime(), ByteBuffer.allocate(4)); native1.add(in, out); System.out.println(out.getInt(0)); } }
jnr-ffi、あまりドキュメントが無いようなのでよく分からないが…、それらしいクラスのそれらしいメソッドを使って書いてみる。
実行の際は、-Djava.library.path=DLLのパス名 を指定する必要がある。
結果は、
2465
う~ん、違うな。
調べたところ、エンディアンの違いのようだ。
ByteBufferはデフォルトでビッグエンディアン。
60000は0x0000EA60、8000は0x00001F40なので、ビッグエンディアンで足すと0x000109A0=68000だが、リトルエンディアンで足すと0x000009A1=2465になってしまう。
... Pointer in = Pointer.wrap(Runtime.getSystemRuntime(), ByteBuffer.allocate(8).order(Runtime.getSystemRuntime().byteOrder())); ... Pointer out = Pointer.wrap(Runtime.getSystemRuntime(), ByteBuffer.allocate(4).order(Runtime.getSystemRuntime().byteOrder())); ...
のように正しいをエンディアンを指定したら、
68000
と出力された!
ArrayMemoryIOクラスを使ってみる
Pointer#wrapメソッドは、文字通り既存のByteBufferインスタンスをラップするだけで、エンディアンの指定は行ってくれないようだ。
Pointerクラスのサブクラスを調べてみると、ArrayMemoryIOというクラスがあった。
中を見てみると、こちらはエンディアンの指定も行ってくれるっぽいし、簡単そうだ。
上のコードを
... Pointer in = new ArrayMemoryIO(Runtime.getSystemRuntime(), 8); ... Pointer out = new ArrayMemoryIO(Runtime.getSystemRuntime(), 4); ...
のように変えて実行したところ、正しく68000と出力された。
Java側とC側でメモリを共有するには?
C側は
static int* _pointer; extern "C" __declspec(dllexport) void setAddress(int* pointer) { _pointer = pointer; } extern "C" __declspec(dllexport) int getValue() { return *_pointer; }
のように書き、Java側は
public interface Native2 { void setAddress(Pointer pointer); int getValue(); }
public class PointerTest2 { public static void main(String[] args) { Native2 native2 = LibraryLoader.create(Native2.class).load("NativeLib"); Pointer in = Pointer.wrap(Runtime.getSystemRuntime(), ByteBuffer.allocate(4).order(Runtime.getSystemRuntime().byteOrder())); native2.setAddress(in); in.putInt(0, 1234567); System.out.println(native2.getValue()); } }
と書く。
つまり、
①Java側のPointerをC側に渡して保存する
②Java側でPointerの指すメモリを書き換える
③C側でPointerの指すメモリを参照する
としたとき、どうなるのか?
結果は
0
で、共有されていないようだ。。
つまり、メソッドを呼び出すたびに、メモリの内容がコピーされているのだろう。
ByteBufferを調べると、allocateDirectというメソッドがある。
これを使うとよさそうだ。
allocateをallocateDirectに変えて実行してみると、
1234567
と出力された!
つまり、Java側とC側とでメモリを共有できた!
OS | Windows 7 Enterprise (64bit) |
Java | 8 |
jnr-ffi | 2.0.3 |
Embulkの環境を丸ごとコピーする
Embulkでプラグインをインストールすると、実行ユーザのホームディレクトリの下に.embulkディレクトリができて、gemがインストールされる。
別の環境でも同じようにEmbulkを動かすには、Javaをインストールして、Embulkの実行ファイルをコピーして、ホームディレクトリもコピーするだけ。
なんだけど、アプリケーションはホームディレクトリではなくてアプリケーション用のディレクトリにまとめたい、ということもある。
そこで、その方法について調べてみた。
Embulkのホームディレクトリ
".embulk"でgrepしてみると、embulk_bundle.rbに以下のコードがあった。
user_home = java.lang.System.properties["user.home"] || ENV['HOME'] unless user_home raise "HOME environment variable is not set." end ENV['GEM_HOME'] = File.expand_path File.join(user_home, '.embulk', Gem.ruby_engine, RbConfig::CONFIG['ruby_version'])
ということは、Javaのシステム変数user.homeを設定してやれば、そこがEmbulkのホームになりそうだ。
embulk "-J-Duser.home=/embulk" gem install embulk-input-mysql
と実行したら、/embulkディレクトリにプラグインがインストールされた!
なお、Linuxの場合はオプション指定を""で囲まなくてもいいはず。
環境を丸ごとコピーしてみる
/embulkディレクトリに、Embulkの実行ファイルを置く。
そして、上で試した通り、そのディレクトリにプラグインをインストールする。
次に、別のマシンにJavaをインストール後、/embulkディレクトリを丸ごとコピーする。
そして、
embulk "-J-Duser.home=/embulk" run test.yml
を実行すると…、
無事実行できた!
Bundlerを使う方法
Embulkでは、Bundlerというものを使って、プラグインを含むEmbulkの実行環境をまとめることができる。
これも試してみた。
まず、
embulk mkbundle embulk1
のように実行する。すると、embulk1ディレクトリにいろいろファイルが作成される。
次に、embulk1/Gemfileを開いて、インストールしたいプラグインを書く。
gem 'embulk-input-mysql'
特定のバージョンを指定することもできるようだ(詳しくはGemfileにコメントが書いてある)。
そして、
cd embulk1 embulk bundle cd ..
を実行すると、embulk-input-mysqlがインストールされた。
実行時は、
embulk run -b embulk1 test.yml
のようにすればOK。
こちらについても、Embulkの実行ファイルとembulk1フォルダを丸ごとコピーして、動くのが確認できた!
EmbulkEmbedを使う場合
EmbulkEmbedを使うと、EmbulkをJavaライブラリ的に使うことができる。
この場合、プラグインはどうなるんだろうか?
import java.io.File; import org.embulk.EmbulkEmbed; import org.embulk.EmbulkEmbed.Bootstrap; import org.embulk.config.ConfigLoader; import org.embulk.config.ConfigSource; public class EmbulkEmbedTest { public static void main(String[] args) throws Exception { Bootstrap bootstrap = new EmbulkEmbed.Bootstrap(); EmbulkEmbed embulk = bootstrap.initializeCloseable(); try { ConfigLoader loader = embulk.newConfigLoader(); ConfigSource source = loader.fromYamlFile(new File(args[0])); embulk.run(source); } finally { embulk.destroy(); } } }
のようなクラスを作って、
java -classpath .;embulk.bat EmbulkEmbedTest test.yml
を実行すると、Embulkを実行することはできる。
しかし、プラグインは読み込まれていないようだ…。
Embulkのソースを追ってみると、通常はまずorg.embulk.cli.Mainが起動され、そこからembulk_bundle.rbが実行される。
そこで環境変数GEM_HOMEが設定され、プラグインのgemが読み込まれ、そこから更にいろいろ経由してEmbulkEmbedが呼ばれる。
EmbulkEmbedを直接呼ぶと、embulk_bundle.rbが実行されないため、プラグインが読み込まれないようだ。
ひょっとして、環境変数GEM_HOMEを手動で設定すればうまくいくかな?と思って、
set GEM_HOME=/embulk/.embulk/jruby/2.2.0 java -classpath .;embulk.bat EmbulkEmbedTest test.yml
と実行したら、うまくいった!
Embulk | 0.7.10 |
OS | Windows |
2015/12/25 Bundlerを使うパターンについて追記しました(hiroysatoさんの記事を参考にしました)。
2015/12/25 EmbulkEmbedを使うケースについて追記しました(ひしだまさんの記事を参考にしました)。
EmbulkでCSVからDBにロードするときのエラーについて調べてみた
以前、Embulkのエラー処理について調べてみるというタイトルで記事を書いたが、あれからずいぶんとバージョンが上がっているので、再度確認してみることにした。
CSVの項目が多過ぎる場合
以下のような警告が出て、スキップされた。
2015-12-11 16:02:29.364 +0900 [WARN] (task-0000): Skipped line 2 (Too many columns): 1,123.40,test1,TEST1,2015-04-24,2015-04-24 01:02:03,2015-04-24 01:02:03.123,999
終了コードは0だった。
ドキュメントを見ると、allow_extra_columns: true を設定すると、余計な項目は無視されて正常な行としてと取り込まれるようだ。
逆に、stop_on_invalid_record: true を設定すると、エラーで終了する。
2015-12-11 16:07:41.280 +0900 [INFO] (cleanup): > 0.02 seconds org.embulk.exec.PartialExecutionException: org.embulk.spi.DataException: Invalid record at line 2: 1,123.40,test1,TEST1,2015-04-24,2015-04-24 01:02:03,2015-04-24 01:02:03.123,999 at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(org/embulk/exec/BulkLoader.java:363) at org.embulk.exec.BulkLoader.doRun(org/embulk/exec/BulkLoader.java:572) ... Error: org.embulk.spi.DataException: Invalid record at line 2: 1,123.40,test1,TEST1,2015-04-24,2015-04-24 01:02:03,2015-04-24 01:02:03.123,999
この場合は、終了コードは1だった。
CSVの項目が少な過ぎる場合
項目が多過ぎる場合とだいたい同じだ。
デフォルトでは、以下のような警告が出て、スキップされる。
2015-12-11 16:10:52.236 +0900 [WARN] (task-0000): Skipped line 2 (Too few columns): 1,123.40,test1,TEST1,2015-04-24,2015-04-24 01:02:03
allow_optional_columns: true を設定すると、足りない項目はnullとして取り込まれる。
当然だが、挿入先のDBの列がNOT NULLだとそっちでエラーになるので注意。
項目が多過ぎる場合と同様、stop_on_invalid_record: true を設定すると、エラーで終了する。
(いるかどうかは分からないが…)項目が少な過ぎる場合は警告、多過ぎる場合はエラー、といった設定はできないようだ。
数値列に数値でない値
例えば、CSV Parserでlongと定義されている列に、"X"とか突っ込んでみる。
以前はエラーだったが、警告が出てスキップされるようになっていた。
2015-12-11 16:21:21.736 +0900 [WARN] (task-0000): Skipped line 3 (java.lang.NumberFormatException: For input string: "X"): X,1234567890.12,test9999,TEST9999,2015-12-31,2015-12-31 23:59:59,2015-12-31 23:59:59.999
これについても、stop_on_invalid_record: true を設定すると、エラーで終了する。
DB側のエラーの場合
桁数オーバー、NULL制約違反などである。
DB側でエラーが発生した場合は、Embulkもエラーで終了する。
こちらに関しては、無視したり警告にするようなオプションは今のところ無い。
気になるのは、そのときテーブルの中身がどうなっているのか?、だ。
embulk-output-jdbcには複数のモードがあり、それぞれで挙動が異なる。
insert
このモードでは、複数の一時テーブルにデータをロードし、最後に対象テーブルにUNIONしてINSERTする。
エラー行があった場合は、最後のINSERTが行われないため、対象テーブルは元のままである。
truncate_insert
このモードでは、insertと同様だが、最後に対象テーブルを空にしてからINSERTする。
なお、モードの名前はtruncate_insertだが、発行されるSQLはTRUNCATEではなくDELETEである。
(DBによってはTRUNCATEはロールバックできないからだろう)
insert_direct
このモードでは、対象テーブルに直接INSERTする。
内部的には、1件ずつSQLを発行するのでなく、JDBCのjava.sql.Statement#executeBatchを利用して、ある程度まとめてから発行する(このサイズはbatch_sizeオプションで制御可能)。
Embulkでは大量のデータをロードする前提のため、全件でcommitするのは無理があり、毎回commitしている。
従って、エラー行があった場合は、途中までは対象テーブルにロードされている。
ただし、トランザクションは行単位ではなく複数行が含まれるので、エラー行の直前の行がロードされているとは限らない。
replace
このモードでは、まず一時テーブルにデータをロードする。最後に対象テーブルをDROPしてから、ALTER TABLEにより一時テーブルを対象テーブルに置き換える。
エラー行があった場合は、最後の処理が行われないため、対象テーブルは元のままである。
なお、DROPとALTER TABLEの間でEmbulkが落ちたりした場合、対象テーブルが消えた状態になってしまうので注意が必要である(PostgreSQL等ではDROPがロールバックされるので大丈夫だが)。
merge
このモードでは、まず複数の一時テーブルにデータをロードする。最後に対象テーブルにロードする際、同じキーのレコードが既にあればUPDATE、無ければINSERTする。
エラー行があった場合は、最後の処理が行われないため、対象テーブルは元のままである。
なお、このモードはembulk-output-mysqlとembulk-output-postgresqlで実装されている。
各モードの比較
mergeは他とちょっと異なるし、truncate_insertはinsertとだいたい同じなので、insertとinsert_directとreplaceを比較してみる(insert_directはTRUNCATEしてから実行すれば他と同じ結果になる)。
処理速度について
insert_directとreplaceはほとんど同じだろう(DROP~ALTER TABLEにはそんなに時間が掛からないだろうし)。
insertは、最後にUNIONしてINSERTするところで時間が掛かりそうだ。
ただ、insert_directやreplaceでは1つのテーブル/一時テーブルにロードするのに対し、insertでは複数の一時テーブルにロードする。
その分速くなっているかもしれないな。
いずれ実際に計測してみるか。
データの整合性について
insertでは、対象テーブルの更新が完全にトランザクション内なので、ロード前→ロード後 に直接遷移する。
すなわち、データの不整合が全く無い。
replaceでは、DROP~ALTER TABLEを行うため、ロード前→対象テーブルが無い→ロード後 のように遷移する。
つまり、一時的にデータが無い時間が存在する。
insert_directでは、ロード前→TRUNCATE→ロード中→ロード後 のようになる。
なので、ロードしている間はずっと、中途半端な状態である。
まあ、結局のところ、要件に応じて適切なモードを選ぼう、ということだな。
Embulk | 0.7.10 |
embulk-output-mysql | 0.4.2 |
embulk-output-postgresql | 0.4.2 |
2015/12/17: embulk-output-mysqlもmergeモードに対応していました。
2015/12/18: embulk-output-redshiftはmergeモードが未実装でした。
MySQLのLOAD_FILEではまった
LOAD_FILEがNULLを返す?
MySQLのblob列に画像データを突っ込もうとして、
insert into xxx values( ..., load_file('/data/image.jpg') );
のようなSQLを投げてみたが、どうしてもNULLになってしまう。
ファイルはちゃんとあるのに…、なぜだろう?
パス指定の問題?
Windowsだから、
load_file('C:\\data\\image.jpg')
のように指定する必要があるのかな?、と思ったけれど、これでもだめだった。
なお、'\'は2つ重ねて'\\'のようにエスケープする必要がある。
権限の問題?
調べてみたところ、以下の権限が必要なようだ。
・SQLを実行するMySQLユーザに、FILE権限があること
・MySQLサーバを実行しているOSユーザに、該当ファイルへのアクセス権限があること
両方ちゃんとあるんだけどな。。
システム変数secure_file_privの問題だった
基本に立ち返って、MySQLのdocumentを参照してみた。
If the secure_file_priv system variable is set to a nonempty directory name, the file to be loaded must be located in that directory.
という一文がある。
MySQLにログインして確認してみると…、
mysql> show variables like 'secure_file_priv'; +------------------+------------------------------------------------+ | Variable_name | Value | +------------------+------------------------------------------------+ | secure_file_priv | C:\ProgramData\MySQL\MySQL Server 5.6\Uploads\ | +------------------+------------------------------------------------+ 1 row in set (0.00 sec)
どうやら、インストール時にデフォルトで設定されていたようだ。
my.iniのsecure-file-privを書き換えて、MySQLを再起動したら、無事LOAD_FILEが成功した!
OS | Windows 7 Enterprise |
MySQL | 5.6.27 |