今日もプログラミング

IT技術とかプログラミングのこととか特にJavaを中心に書いていきます

データベースのタイムゾーン付型について調べてみた

embulk-input-jdbcこんな問題が上がっていたので、データベースのタイムゾーン付の型について調べてみた。

MySQL (5.6)

ドキュメントを見ると、TIMESTAMP型はタイムゾーンに対応しているらしい。

DB内ではUTC、クライアント側ではtime_zoneシステム変数で指定されたタイムゾーンになるようだ。

実際に試してみる。

mysql> show variables like '%time_zone%';
+------------------+-------+
| Variable_name    | Value |
+------------------+-------+
| system_time_zone | UTC   |
| time_zone        | UTC   |
+------------------+-------+
2 rows in set (0.19 sec)

mysql> create table test_tz (
    ->   id char(2),
    ->   dt timestamp
    -> );
Query OK, 0 rows affected (0.59 sec)

mysql> insert into test_tz values('01', '2016-03-13 14:00:00');
Query OK, 1 row affected (0.23 sec)

mysql> select * from test_tz;
+------+---------------------+
| id   | dt                  |
+------+---------------------+
| 01   | 2016-03-13 14:00:00 |
+------+---------------------+
1 row in set (0.16 sec)

time_zoneシステム変数がUTCなので、日時はUTCとして扱われる。

mysql> set time_zone = '+09:00';
Query OK, 0 rows affected (0.19 sec)

mysql> show variables like '%time_zone%';
+------------------+--------+
| Variable_name    | Value  |
+------------------+--------+
| system_time_zone | UTC    |
| time_zone        | +09:00 |
+------------------+--------+
2 rows in set (0.19 sec)

mysql> select * from test_tz;
+------+---------------------+
| id   | dt                  |
+------+---------------------+
| 01   | 2016-03-13 23:00:00 |
+------+---------------------+
1 row in set (0.17 sec)

time_zoneを変えると、日時はそのタイムゾーンで扱われる。

 

Oracle (12c)

Oracleでは、TIMESTAMP WITH TIME ZONE型とTIMESTAMP WITH LOCAL TIME ZONE型というのがある。

この辺りに詳しい説明があった。

まず、TIMESTAMP WITH TIME ZONE型を試してみる。

SQL> create table test_tz (
  2    id char(2),
  3    dt timestamp with time zone
  4  );

表が作成されました。

SQL> insert into test_tz values('01', '2016-03-13 14:00:00');
insert into test_tz values('01', '2016-03-13 14:00:00')
                               *
行1でエラーが発生しました。:
ORA-01840: 入力した値の長さが日付書式に対して不足しています

明示的にタイムゾーンを指定しないといけないようだ。

SQL> insert into test_tz values('01', '2016-03-13 14:00:00+09:00');

1行が作成されました。

SQL> select * from test_tz;

ID   DT
---- ---------------------------------------------------------------------------

01   16-03-13 14:00:00.000000 +09:00

JSTで格納されているが、UTCで検索することもできる。

SQL> select * from test_tz where dt='2016-03-13 05:00:00+00:00';

ID   DT
---- ---------------------------------------------------------------------------

01   16-03-13 14:00:00.000000 +09:00


次に、TIMESTAMP WITH LOCAL TIME ZONE型を試してみる。

SQL> create table test_ltz (
  2    id char(2),
  3    dt timestamp with local time zone
  4  );

表が作成されました。

SQL> insert into test_ltz values('01', '2016-03-13 14:00:00');

1行が作成されました。

SQL> select * from test_ltz;

ID   DT
---- ---------------------------------------------------------------------------

01   16-03-13 14:00:00.000000

こちらではタイムゾーンの指定は必要ない。

selectすると、insertした通りの日時で表示される。

 

これを、UTC環境のOracleクライアントから参照してみる。

SQL> select * from test_ltz;

ID DT
-- ---------------------------------------------------------------------------
01 13-MAR-16 05.00.00.000000 AM

そうすると、UTCに変換されて表示された。

つまり、クライアント環境のタイムゾーンで扱われるということだ。

 

SQL Server (2012)

ドキュメントによると、datetimeoffset型がタイムゾーンを保持できる。

2> create table test_tz (
3>   id char(2),
4>   dt datetimeoffset
5> );
6> go
1>
2> select * from test_tz
3> go
 id   dt
 ---- ----------------------------------

(0 行処理されました)
1> insert into test_tz values('01', '2016-03-13 14:00:00')
2> go
(1 行処理されました)
1> select * from test_tz
2> go
 id   dt
 ---- ----------------------------------
 01   2016-03-13 14:00:00.0000000 +00:00

(1 行処理されました)

JST環境のSQL Serverだが、デフォルトではUTCになるようだ。

タイムゾーンを明示的に指定することもできる。

1> insert into test_tz values('01', '2016-03-13 14:00:00+09:00')
2> go
(1 行処理されました)
1> select * from test_tz
2> go
 id   dt
 ---- ----------------------------------
 01   2016-03-13 14:00:00.0000000 +00:00
 01   2016-03-13 14:00:00.0000000 +09:00

(2 行処理されました)

 

PostgreSQL (9.4)

ドキュメントによると、DB内ではUTC、クライアント側ではTimeZoneシステムパラメータで指定されたタイムゾーンになるようだ。

実際に試してみる。

postgres=> show timezone;
 TimeZone
----------
 UTC
(1 行)

postgres=> create table test_tz (
postgres(>   id char(2),
postgres(>   dt timestamp with time zone
postgres(> );
CREATE TABLE
                                                ^
postgres=> insert into test_tz values('01', '2016-03-13 14:00:00');
INSERT 0 1
postgres=> select * from test_tz;
 id |           dt
----+------------------------
 01 | 2016-03-13 14:00:00+00
(1 行)

たしかに、TimeZoneの値であるUTCで解釈されている。

タイムゾーンを明示的に指定してinsertすることもできる。

postgres=> insert into test_tz values('01', '2016-03-13 14:00:00+09:00');
INSERT 0 1
postgres=> select * from test_tz;
 id |           dt
----+------------------------
 01 | 2016-03-13 14:00:00+00
 01 | 2016-03-13 05:00:00+00
(2 行)

JSTからUTCに変換されて格納されているようだ。

TimeZoneパラメータを変えると、そのタイムゾーンで解釈されるようになる。

postgres=> show timezone;
 TimeZone
----------
 Japan
(1 行)

postgres=> select * from test_tz;
 id |           dt
----+------------------------
 01 | 2016-03-13 23:00:00+09
 01 | 2016-03-13 14:00:00+09
(2 行)

postgres=> insert into test_tz values('03', '2016-03-13 15:00:00');
INSERT 0 1
postgres=> select * from test_tz;
 id |           dt
----+------------------------
 01 | 2016-03-13 23:00:00+09
 01 | 2016-03-13 14:00:00+09
 03 | 2016-03-13 15:00:00+09
(3 行)

 

Redshift

ドキュメントによると、タイムゾーン付のTIMESTAMP型はまだサポートされていないようだ。

 

まとめ

DBMSにおいて、INSERTする日時とかSELECTされた日時がどのタイムゾーンになるかをまとめてみた。

DBMSタイムゾーン
MySQL TIMESTAMP time_zoneシステム変数による
Oracle TIMESTAMP WITH TIME ZONE 明示する
TIMESTAMP WITH LOCAL TIME ZONE クライアント環境のタイムゾーン
SQL Server DATETIMEOFFSET UTC
PostgreSQL time/timestamp with time zone TimeZoneシステムパラメータによる
Redshift - -

 

SQL ServerのNative Clientを使ってバルクロードしてみる

embulk-output-sqlserverをリリースしました

先日、バルクデータローダEmbulkのプラグインとして、SQL Serverにロードするためのembulk-output-sqlserverをリリースした。

しかし、embulk-output-sqlserverは単純にINSERT文でデータをロードするので、たぶん速くない。

 

SQL ServerにはBCPという高速なバルクロードユーティリティがある。

その機能を使えるライブラリもあるようなので、それを試してみることにした。

 

ちなみに、embulk-output-oracleOracleのバルクロード機能を呼び出して高速化している。

 

SQL Server Native Client (ODBC)

Microsoftのドキュメントを見ると、SQL Server Native Clientには「OLE DB」と「ODBC」の2種類があるらしい。

だが…、どうもOLE DBの方は非推奨で、サポートされなくなるらしい。

というわけで、ODBCの方を使おう。

 

サンプルを見てみる

APIドキュメントとかも見てみたが、いまいち使い方がよく分からない。

やはりサンプ見るのが一番!

というわけで、サンプルをダウンロードしてみた。

サンプルをインストールしてみると、C:\Program Files\Microsoft SQL Server\100\Samples\Engine\Data Access\odbc にそれらしいサンプルが展開されていた。

 

サンプルを作ってみる

で、サンプルを参考にしつつ、自分の環境に合わせて作ったのが以下のコードだ。

#include <tchar.h>
#include <locale.h>
#include <stdio.h>
#include <string.h>
#include <windows.h>
#include <sql.h>
#include <sqlext.h>
#include <odbcss.h>

BOOLEAN isSQLError(TCHAR* message, SQLRETURN ret)
{
    if (ret != SQL_SUCCESS_WITH_INFO && ret != SQL_SUCCESS) {
        wprintf(L"%s\r\n", message);
        return TRUE;
    }
    return FALSE;
}

BOOLEAN isBCPError(TCHAR* message, RETCODE ret)
{
    if (ret != SUCCEED) {
        wprintf(L"%s\r\n", message);
        return TRUE;
    }
    return FALSE;
}

void printError(SQLHENV henv, SQLHDBC hdbc)
{
    SQLWCHAR sqlState[6];
    SQLINTEGER nativeError;
    SQLWCHAR errorMessage[256];
    SQLSMALLINT errorMessageLen;
    SQLRETURN ret = SQLError(henv, hdbc, NULL, sqlState, &nativeError, errorMessage, 256, &errorMessageLen);
    wprintf(L"%s\r\n", errorMessage);
}

int _tmain(int argc, _TCHAR* argv[])
{
    _tsetlocale(LC_ALL, _T(""));

    SQLHENV henv = SQL_NULL_HENV;
    if (isSQLError(L"SQLAllocHandle(SQL_HANDLE_ENV)", SQLAllocHandle(SQL_HANDLE_ENV, NULL, &henv))) {
        return 1;
    }

    // ODBCバージョンの設定
    if (isSQLError(L"SQLSetEnvAttr(SQL_ATTR_ODBC_VERSION)", SQLSetEnvAttr(henv, SQL_ATTR_ODBC_VERSION, (SQLPOINTER)SQL_OV_ODBC3, SQL_IS_INTEGER))) {
        return 1;
    }

    // ODBC接続ハンドル
    HDBC hdbc = SQL_NULL_HDBC;
    if (isSQLError(L"SQLAllocHandle(SQL_HANDLE_DBC)", SQLAllocHandle(SQL_HANDLE_DBC, henv, &hdbc))) {
        return 1;
    }

    // BULK COPYモードの設定
    if (isSQLError(L"SQLSetConnectAttr(SQL_COPT_SS_BCP)", SQLSetConnectAttr(hdbc, SQL_COPT_SS_BCP, (SQLPOINTER)SQL_BCP_ON, SQL_IS_INTEGER))) {
        return 1;
    }

    // DBに接続
    //SQLDriverConnect(
    if (isSQLError(L"SQLDriverConnect", SQLDriverConnect(hdbc, NULL, L"Driver={SQL Server Native Client 11.0};Server=localhost,1433\\SQLEXPRESS;UID=user;PWD=password;", SQL_NTS, NULL, SQL_NTS, NULL, SQL_DRIVER_NOPROMPT))) {
        printError(henv, hdbc);
        return 1;
    }

    // BULK COPY開始
    if (isBCPError(L"bcp_init", bcp_init(hdbc, L"TESTDB.dbo.TEST1", NULL, NULL, DB_IN))) {
        printError(henv, hdbc);
        return 1;
    }

    // 列1を設定する
    if (isBCPError(L"bcp_bind(1)", bcp_bind(hdbc, (LPCBYTE)"XXX", 0, SQL_VARLEN_DATA, (LPCBYTE)"", 1, SQLCHARACTER, 1))) {
        printError(henv, hdbc);
        return 1;
    }
    // 列2を設定する
    if (isBCPError(L"bcp_bind(2)", bcp_bind(hdbc, (LPCBYTE)"YYY", 0, SQL_VARLEN_DATA, (LPCBYTE)"", 1, SQLCHARACTER, 2))) {
        printError(henv, hdbc);
        return 1;
    }

    // 1行送信する
    if (isBCPError(L"bcp_sendrow", bcp_sendrow(hdbc))) {
        printError(henv, hdbc);
        return 1;
    }

    // BULK COPY終了
    if (bcp_done(hdbc) != SUCCEED) {
        printError(henv, hdbc);
        return 1;
    }

    SQLDisconnect(hdbc);
    SQLFreeHandle(SQL_HANDLE_DBC, hdbc);
    SQLFreeHandle(SQL_HANDLE_ENV, henv);

    return 0;
}

なお、ロード先のテーブルはこんな感じ。

CREATE TABLE TEST1 (
	ID char(4),
	VALUE varchar(20)
)

 

ビルドについて

追加の依存ファイルに、sqlncli11.libが必要。

SQL ServerSDK\Lib フォルダの下にある。

 

DBへの接続

DBに接続するAPIは、SQLConnectとSQLDriverConnectがある。

SQLConnectの方は、あらかじめODBCデータソースを定義しておかなくてはならない。

SQLDriverConnectなら、JDB Driverのように、サーバ名やインスタンス名、ユーザ名、パスワードを直接渡して接続できる。

 

bcp_bind

bcp_bindは、1列分の値を設定する。

つまり、10列あるテーブルであれば、10回bcp_bindを呼ぶ必要がある。

どの列に設定するかは、最後の引数で列のインデックス(1始まり)を指定する。

 

bcp_sendrow

bcp_sendrowは、bcp_bindを全列に設定した後に呼び、1行分のデータを送信する。

つまり、100行のデータをロードする場合は、100回bcp_sendrowを呼ぶわけだ。

 

bcp_done

bcp_doneは、コミットしてロードを終了する。

bcp_batchを使うと、終了せずにコミットだけすることができる。

 

できてみれば簡単そうだが…

実際には、何ヶ所かはまってしまった。

SQLDriverConnectに渡す接続文字列が分からなかったりとか、bcp_bindで1行分のデータをまとめて渡すと思い込んでいたりとか…。

 

今度は、実際にこれで大量データをロードしてみたい。

 

OS Windows 7 Enterprise
SQL Server 2012
Visual Studio 2010

Oracle SQL*LoaderのPARTITIONについて

Oracle SQL*Loaderでは、制御ファイルに以下のように書くと、特定のパーティションにロードすることができる。

INSERT INTO SOME_TABLE PARTITION (SOME_PARTITION) VALUES ... 

 

挙動について

ドキュメントによると、

「行が指定のパーティションに対応しない場合、その行は拒否され」

とある。

実際に試してみたが、パーティションの条件に合わない行はロードされず、合う行だけがロードされる。

 

なお、PARTITIONを指定しない場合は、各行がそれぞれの条件に合うパーティションにロードされる。

 

パフォーマンスについて

これもドキュメントにある通り、PARTITION指定のロードでは、「従来型パス・ロード」が使用される。

SQL*Loaderには「ダイレクト・パス・ロード」と「従来型パス・ロード」があり、前者は高速だが、後者はそれほどでもない(1行ずつINSERTするよりは高速だが…)。

 

結論

ロードするデータが、単一のパーティションに含まれると分かっている場合は、PARTITIONを指定しない方がよいだろう(その方高速にロードされるので)。

そうでなく、特定パーティションに含まれない行を除外したい場合のみ、PARTITIONを指定するのがよさそうだ。

 

jnr-ffiでJavaからCのポインタを使う

jnr-ffiは、Javaから簡単にCのライブラリを呼び出せるフレームワークだ。

JNIのような煩雑なコードを書かなくてよいのがメリット。

前に記事を書いたときは、プリミティブ型とかStringしか試さなかったので、今回はそれ以外のポインタを試してみる。

 

Pointerクラスを使ってみる

まず、C側のコードを書く。環境は64bitのWindows

extern "C" __declspec(dllexport) void add(int* arguments, int* result)
{
	*result = arguments[0] + arguments[1];
}

NativeLib.dllという名前でビルドした。

 

そして、Java側のコードを書く。

public interface Native1 {
    void add(Pointer arguments, Pointer result);
}

 

import java.nio.ByteBuffer;
import jnr.ffi.LibraryLoader;
import jnr.ffi.Pointer;
import jnr.ffi.Runtime;

public class PointerTest1 {

    public static void main(String[] args) {
        Native1 native1 = LibraryLoader.create(Native1.class).load("NativeLib");

        Pointer in = Pointer.wrap(Runtime.getSystemRuntime(), ByteBuffer.allocate(8));
        in.putInt(0, 60000);
        in.putInt(4, 8000);
        Pointer out = Pointer.wrap(Runtime.getSystemRuntime(), ByteBuffer.allocate(4));

        native1.add(in, out);
        System.out.println(out.getInt(0));
    }

}

jnr-ffi、あまりドキュメントが無いようなのでよく分からないが…、それらしいクラスのそれらしいメソッドを使って書いてみる。

実行の際は、-Djava.library.path=DLLのパス名 を指定する必要がある。

結果は、

2465

う~ん、違うな。

 

調べたところ、エンディアンの違いのようだ。

ByteBufferはデフォルトでビッグエンディアン

x64はリトルエンディアン。

60000は0x0000EA60、8000は0x00001F40なので、ビッグエンディアンで足すと0x000109A0=68000だが、リトルエンディアンで足すと0x000009A1=2465になってしまう。

 

        ...
        Pointer in = Pointer.wrap(Runtime.getSystemRuntime(), ByteBuffer.allocate(8).order(Runtime.getSystemRuntime().byteOrder()));
        ...
        Pointer out = Pointer.wrap(Runtime.getSystemRuntime(), ByteBuffer.allocate(4).order(Runtime.getSystemRuntime().byteOrder()));
        ...

のように正しいをエンディアンを指定したら、

68000

と出力された!

 

ArrayMemoryIOクラスを使ってみる

Pointer#wrapメソッドは、文字通り既存のByteBufferインスタンスをラップするだけで、エンディアンの指定は行ってくれないようだ。

Pointerクラスのサブクラスを調べてみると、ArrayMemoryIOというクラスがあった。

中を見てみると、こちらはエンディアンの指定も行ってくれるっぽいし、簡単そうだ。

上のコードを

        ...
        Pointer in = new ArrayMemoryIO(Runtime.getSystemRuntime(), 8);
        ...
        Pointer out = new ArrayMemoryIO(Runtime.getSystemRuntime(), 4);
        ...

のように変えて実行したところ、正しく68000と出力された。

 

Java側とC側でメモリを共有するには?

C側は

static int* _pointer;

extern "C" __declspec(dllexport) void setAddress(int* pointer)
{
	_pointer = pointer;
}

extern "C" __declspec(dllexport) int getValue()
{
	return *_pointer;
}

のように書き、Java側は

public interface Native2 {
    void setAddress(Pointer pointer);
    int getValue();
}

 

public class PointerTest2 {
    public static void main(String[] args) {
        Native2 native2 = LibraryLoader.create(Native2.class).load("NativeLib");

        Pointer in = Pointer.wrap(Runtime.getSystemRuntime(), ByteBuffer.allocate(4).order(Runtime.getSystemRuntime().byteOrder()));
        native2.setAddress(in);
        in.putInt(0, 1234567);

        System.out.println(native2.getValue());
    }
}

と書く。

つまり、

Java側のPointerをC側に渡して保存する

Java側でPointerの指すメモリを書き換える

③C側でPointerの指すメモリを参照する

としたとき、どうなるのか?

結果は

0

で、共有されていないようだ。。

つまり、メソッドを呼び出すたびに、メモリの内容がコピーされているのだろう。

 

ByteBufferを調べると、allocateDirectというメソッドがある。

これを使うとよさそうだ。

allocateをallocateDirectに変えて実行してみると、

1234567

と出力された!

つまり、Java側とC側とでメモリを共有できた!

 

OS Windows 7 Enterprise (64bit)
Java 8
jnr-ffi 2.0.3

Embulkの環境を丸ごとコピーする

Embulkでプラグインをインストールすると、実行ユーザのホームディレクトリの下に.embulkディレクトリができて、gemがインストールされる。

 

別の環境でも同じようにEmbulkを動かすには、Javaをインストールして、Embulkの実行ファイルをコピーして、ホームディレクトリもコピーするだけ。

なんだけど、アプリケーションはホームディレクトリではなくてアプリケーション用のディレクトリにまとめたい、ということもある。

そこで、その方法について調べてみた。

 

Embulkのホームディレクトリ

".embulk"でgrepしてみると、embulk_bundle.rbに以下のコードがあった。

  user_home = java.lang.System.properties["user.home"] || ENV['HOME']
  unless user_home
    raise "HOME environment variable is not set."
  end
  ENV['GEM_HOME'] = File.expand_path File.join(user_home, '.embulk', Gem.ruby_engine, RbConfig::CONFIG['ruby_version'])

ということは、Javaのシステム変数user.homeを設定してやれば、そこがEmbulkのホームになりそうだ。

 

embulk "-J-Duser.home=/embulk" gem install embulk-input-mysql

と実行したら、/embulkディレクトリにプラグインがインストールされた!

なお、Linuxの場合はオプション指定を""で囲まなくてもいいはず。

 

環境を丸ごとコピーしてみる

/embulkディレクトリに、Embulkの実行ファイルを置く。

そして、上で試した通り、そのディレクトリにプラグインをインストールする。

 

次に、別のマシンにJavaをインストール後、/embulkディレクトリを丸ごとコピーする。

そして、

embulk "-J-Duser.home=/embulk" run test.yml

を実行すると…、

無事実行できた!

 

Bundlerを使う方法

Embulkでは、Bundlerというものを使って、プラグインを含むEmbulkの実行環境をまとめることができる。

これも試してみた。

 

まず、

embulk mkbundle embulk1

のように実行する。すると、embulk1ディレクトリにいろいろファイルが作成される。

次に、embulk1/Gemfileを開いて、インストールしたいプラグインを書く。

gem 'embulk-input-mysql'

特定のバージョンを指定することもできるようだ(詳しくはGemfileにコメントが書いてある)。

そして、

cd embulk1
embulk bundle
cd ..

を実行すると、embulk-input-mysqlがインストールされた。

実行時は、

embulk run -b embulk1 test.yml

のようにすればOK。

 

こちらについても、Embulkの実行ファイルとembulk1フォルダを丸ごとコピーして、動くのが確認できた!

 

EmbulkEmbedを使う場合

EmbulkEmbedを使うと、EmbulkをJavaライブラリ的に使うことができる。

この場合、プラグインはどうなるんだろうか?

 

import java.io.File;
import org.embulk.EmbulkEmbed;
import org.embulk.EmbulkEmbed.Bootstrap;
import org.embulk.config.ConfigLoader;
import org.embulk.config.ConfigSource;

public class EmbulkEmbedTest {
    public static void main(String[] args) throws Exception {
        Bootstrap bootstrap = new EmbulkEmbed.Bootstrap();
        EmbulkEmbed embulk = bootstrap.initializeCloseable();
        try {
            ConfigLoader loader = embulk.newConfigLoader();
            ConfigSource source = loader.fromYamlFile(new File(args[0]));
            embulk.run(source);
        } finally {
            embulk.destroy();
        }
    }
}

のようなクラスを作って、

java -classpath .;embulk.bat EmbulkEmbedTest test.yml

を実行すると、Embulkを実行することはできる。

しかし、プラグインは読み込まれていないようだ…。

 

Embulkのソースを追ってみると、通常はまずorg.embulk.cli.Mainが起動され、そこからembulk_bundle.rbが実行される。

そこで環境変数GEM_HOMEが設定され、プラグインのgemが読み込まれ、そこから更にいろいろ経由してEmbulkEmbedが呼ばれる。

EmbulkEmbedを直接呼ぶと、embulk_bundle.rbが実行されないため、プラグインが読み込まれないようだ。

ひょっとして、環境変数GEM_HOMEを手動で設定すればうまくいくかな?と思って、

set GEM_HOME=/embulk/.embulk/jruby/2.2.0
java -classpath .;embulk.bat EmbulkEmbedTest test.yml

と実行したら、うまくいった!

 

Embulk 0.7.10
OS Windows

 

2015/12/25 Bundlerを使うパターンについて追記しました(hiroysatoさんの記事を参考にしました)。

2015/12/25 EmbulkEmbedを使うケースについて追記しました(ひしだまさんの記事を参考にしました)。

EmbulkでCSVからDBにロードするときのエラーについて調べてみた

以前、Embulkのエラー処理について調べてみるというタイトルで記事を書いたが、あれからずいぶんとバージョンが上がっているので、再度確認してみることにした。

CSVの項目が多過ぎる場合

以下のような警告が出て、スキップされた。

2015-12-11 16:02:29.364 +0900 [WARN] (task-0000): Skipped line 2 (Too many columns): 1,123.40,test1,TEST1,2015-04-24,2015-04-24 01:02:03,2015-04-24 01:02:03.123,999

終了コードは0だった。

 

ドキュメントを見ると、allow_extra_columns: true を設定すると、余計な項目は無視されて正常な行としてと取り込まれるようだ。

 

逆に、stop_on_invalid_record: true を設定すると、エラーで終了する。

2015-12-11 16:07:41.280 +0900 [INFO] (cleanup): > 0.02 seconds
org.embulk.exec.PartialExecutionException: org.embulk.spi.DataException: Invalid record at line 2: 1,123.40,test1,TEST1,2015-04-24,2015-04-24 01:02:03,2015-04-24 01:02:03.123,999
        at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(org/embulk/exec/BulkLoader.java:363)
        at org.embulk.exec.BulkLoader.doRun(org/embulk/exec/BulkLoader.java:572)
        ...

Error: org.embulk.spi.DataException: Invalid record at line 2: 1,123.40,test1,TEST1,2015-04-24,2015-04-24 01:02:03,2015-04-24 01:02:03.123,999

この場合は、終了コードは1だった。

 

CSVの項目が少な過ぎる場合

項目が多過ぎる場合とだいたい同じだ。

デフォルトでは、以下のような警告が出て、スキップされる。

2015-12-11 16:10:52.236 +0900 [WARN] (task-0000): Skipped line 2 (Too few columns): 1,123.40,test1,TEST1,2015-04-24,2015-04-24 01:02:03

 

allow_optional_columns: true を設定すると、足りない項目はnullとして取り込まれる。

当然だが、挿入先のDBの列がNOT NULLだとそっちでエラーになるので注意。

 

項目が多過ぎる場合と同様、stop_on_invalid_record: true を設定すると、エラーで終了する。

(いるかどうかは分からないが…)項目が少な過ぎる場合は警告、多過ぎる場合はエラー、といった設定はできないようだ。

 

数値列に数値でない値

例えば、CSV Parserでlongと定義されている列に、"X"とか突っ込んでみる。

以前はエラーだったが、警告が出てスキップされるようになっていた。

2015-12-11 16:21:21.736 +0900 [WARN] (task-0000): Skipped line 3 (java.lang.NumberFormatException: For input string: "X"): X,1234567890.12,test9999,TEST9999,2015-12-31,2015-12-31 23:59:59,2015-12-31 23:59:59.999

 

これについても、stop_on_invalid_record: true を設定すると、エラーで終了する。

 

DB側のエラーの場合

桁数オーバー、NULL制約違反などである。

DB側でエラーが発生した場合は、Embulkもエラーで終了する。

こちらに関しては、無視したり警告にするようなオプションは今のところ無い。

 

気になるのは、そのときテーブルの中身がどうなっているのか?、だ。

embulk-output-jdbcには複数のモードがあり、それぞれで挙動が異なる。

insert

このモードでは、複数の一時テーブルにデータをロードし、最後に対象テーブルにUNIONしてINSERTする。

エラー行があった場合は、最後のINSERTが行われないため、対象テーブルは元のままである。

 

truncate_insert

このモードでは、insertと同様だが、最後に対象テーブルを空にしてからINSERTする。

なお、モードの名前はtruncate_insertだが、発行されるSQLはTRUNCATEではなくDELETEである。

(DBによってはTRUNCATEはロールバックできないからだろう)

 

insert_direct

このモードでは、対象テーブルに直接INSERTする。

内部的には、1件ずつSQLを発行するのでなく、JDBCjava.sql.Statement#executeBatchを利用して、ある程度まとめてから発行する(このサイズはbatch_sizeオプションで制御可能)。

Embulkでは大量のデータをロードする前提のため、全件でcommitするのは無理があり、毎回commitしている。

従って、エラー行があった場合は、途中までは対象テーブルにロードされている。

ただし、トランザクションは行単位ではなく複数行が含まれるので、エラー行の直前の行がロードされているとは限らない。

 

replace

このモードでは、まず一時テーブルにデータをロードする。最後に対象テーブルをDROPしてから、ALTER TABLEにより一時テーブルを対象テーブルに置き換える。

エラー行があった場合は、最後の処理が行われないため、対象テーブルは元のままである。

なお、DROPとALTER TABLEの間でEmbulkが落ちたりした場合、対象テーブルが消えた状態になってしまうので注意が必要である(PostgreSQL等ではDROPがロールバックされるので大丈夫だが)。

 

merge

このモードでは、まず複数の一時テーブルにデータをロードする。最後に対象テーブルにロードする際、同じキーのレコードが既にあればUPDATE、無ければINSERTする。

エラー行があった場合は、最後の処理が行われないため、対象テーブルは元のままである。

なお、このモードはembulk-output-mysqlとembulk-output-postgresqlで実装されている。

 

各モードの比較

mergeは他とちょっと異なるし、truncate_insertはinsertとだいたい同じなので、insertとinsert_directとreplaceを比較してみる(insert_directはTRUNCATEしてから実行すれば他と同じ結果になる)。

 

処理速度について

insert_directとreplaceはほとんど同じだろう(DROP~ALTER TABLEにはそんなに時間が掛からないだろうし)。

insertは、最後にUNIONしてINSERTするところで時間が掛かりそうだ。

ただ、insert_directやreplaceでは1つのテーブル/一時テーブルにロードするのに対し、insertでは複数の一時テーブルにロードする。

その分速くなっているかもしれないな。

いずれ実際に計測してみるか。

 

データの整合性について

insertでは、対象テーブルの更新が完全にトランザクション内なので、ロード前→ロード後 に直接遷移する。

すなわち、データの不整合が全く無い。

replaceでは、DROP~ALTER TABLEを行うため、ロード前→対象テーブルが無い→ロード後 のように遷移する。

つまり、一時的にデータが無い時間が存在する。

insert_directでは、ロード前→TRUNCATE→ロード中→ロード後 のようになる。

なので、ロードしている間はずっと、中途半端な状態である。

 

まあ、結局のところ、要件に応じて適切なモードを選ぼう、ということだな。

 

Embulk 0.7.10
embulk-output-mysql 0.4.2
embulk-output-postgresql 0.4.2

 

2015/12/17: embulk-output-mysqlもmergeモードに対応していました。

2015/12/18: embulk-output-redshiftはmergeモードが未実装でした。

MySQLのLOAD_FILEではまった

LOAD_FILEがNULLを返す?

 

MySQLのblob列に画像データを突っ込もうとして、

insert into xxx values(
    ...,
    load_file('/data/image.jpg') );

のようなSQLを投げてみたが、どうしてもNULLになってしまう。

ファイルはちゃんとあるのに…、なぜだろう?

 

パス指定の問題?

Windowsだから、

load_file('C:\\data\\image.jpg')

のように指定する必要があるのかな?、と思ったけれど、これでもだめだった。

なお、'\'は2つ重ねて'\\'のようにエスケープする必要がある。

 

権限の問題?

調べてみたところ、以下の権限が必要なようだ。

SQLを実行するMySQLユーザに、FILE権限があること

MySQLサーバを実行しているOSユーザに、該当ファイルへのアクセス権限があること

両方ちゃんとあるんだけどな。。

 

システム変数secure_file_privの問題だった

基本に立ち返って、MySQLdocumentを参照してみた。

If the secure_file_priv system variable is set to a nonempty directory name, the file to be loaded must be located in that directory.

という一文がある。

MySQLにログインして確認してみると…、

mysql> show variables like 'secure_file_priv';
+------------------+------------------------------------------------+
| Variable_name    | Value                                          |
+------------------+------------------------------------------------+
| secure_file_priv | C:\ProgramData\MySQL\MySQL Server 5.6\Uploads\ |
+------------------+------------------------------------------------+
1 row in set (0.00 sec)

どうやら、インストール時にデフォルトで設定されていたようだ。

my.iniのsecure-file-privを書き換えて、MySQLを再起動したら、無事LOAD_FILEが成功した!

 

OS Windows 7 Enterprise
MySQL 5.6.27