今日もプログラミング

IT技術とかプログラミングのこととか特にJavaを中心に書いていきます

SQL ServerのNative Clientを使ってバルクロードしてみる

embulk-output-sqlserverをリリースしました

先日、バルクデータローダEmbulkのプラグインとして、SQL Serverにロードするためのembulk-output-sqlserverをリリースした。

しかし、embulk-output-sqlserverは単純にINSERT文でデータをロードするので、たぶん速くない。

 

SQL ServerにはBCPという高速なバルクロードユーティリティがある。

その機能を使えるライブラリもあるようなので、それを試してみることにした。

 

ちなみに、embulk-output-oracleOracleのバルクロード機能を呼び出して高速化している。

 

SQL Server Native Client (ODBC)

Microsoftのドキュメントを見ると、SQL Server Native Clientには「OLE DB」と「ODBC」の2種類があるらしい。

だが…、どうもOLE DBの方は非推奨で、サポートされなくなるらしい。

というわけで、ODBCの方を使おう。

 

サンプルを見てみる

APIドキュメントとかも見てみたが、いまいち使い方がよく分からない。

やはりサンプ見るのが一番!

というわけで、サンプルをダウンロードしてみた。

サンプルをインストールしてみると、C:\Program Files\Microsoft SQL Server\100\Samples\Engine\Data Access\odbc にそれらしいサンプルが展開されていた。

 

サンプルを作ってみる

で、サンプルを参考にしつつ、自分の環境に合わせて作ったのが以下のコードだ。

#include <tchar.h>
#include <locale.h>
#include <stdio.h>
#include <string.h>
#include <windows.h>
#include <sql.h>
#include <sqlext.h>
#include <odbcss.h>

BOOLEAN isSQLError(TCHAR* message, SQLRETURN ret)
{
    if (ret != SQL_SUCCESS_WITH_INFO && ret != SQL_SUCCESS) {
        wprintf(L"%s\r\n", message);
        return TRUE;
    }
    return FALSE;
}

BOOLEAN isBCPError(TCHAR* message, RETCODE ret)
{
    if (ret != SUCCEED) {
        wprintf(L"%s\r\n", message);
        return TRUE;
    }
    return FALSE;
}

void printError(SQLHENV henv, SQLHDBC hdbc)
{
    SQLWCHAR sqlState[6];
    SQLINTEGER nativeError;
    SQLWCHAR errorMessage[256];
    SQLSMALLINT errorMessageLen;
    SQLRETURN ret = SQLError(henv, hdbc, NULL, sqlState, &nativeError, errorMessage, 256, &errorMessageLen);
    wprintf(L"%s\r\n", errorMessage);
}

int _tmain(int argc, _TCHAR* argv[])
{
    _tsetlocale(LC_ALL, _T(""));

    SQLHENV henv = SQL_NULL_HENV;
    if (isSQLError(L"SQLAllocHandle(SQL_HANDLE_ENV)", SQLAllocHandle(SQL_HANDLE_ENV, NULL, &henv))) {
        return 1;
    }

    // ODBCバージョンの設定
    if (isSQLError(L"SQLSetEnvAttr(SQL_ATTR_ODBC_VERSION)", SQLSetEnvAttr(henv, SQL_ATTR_ODBC_VERSION, (SQLPOINTER)SQL_OV_ODBC3, SQL_IS_INTEGER))) {
        return 1;
    }

    // ODBC接続ハンドル
    HDBC hdbc = SQL_NULL_HDBC;
    if (isSQLError(L"SQLAllocHandle(SQL_HANDLE_DBC)", SQLAllocHandle(SQL_HANDLE_DBC, henv, &hdbc))) {
        return 1;
    }

    // BULK COPYモードの設定
    if (isSQLError(L"SQLSetConnectAttr(SQL_COPT_SS_BCP)", SQLSetConnectAttr(hdbc, SQL_COPT_SS_BCP, (SQLPOINTER)SQL_BCP_ON, SQL_IS_INTEGER))) {
        return 1;
    }

    // DBに接続
    //SQLDriverConnect(
    if (isSQLError(L"SQLDriverConnect", SQLDriverConnect(hdbc, NULL, L"Driver={SQL Server Native Client 11.0};Server=localhost,1433\\SQLEXPRESS;UID=user;PWD=password;", SQL_NTS, NULL, SQL_NTS, NULL, SQL_DRIVER_NOPROMPT))) {
        printError(henv, hdbc);
        return 1;
    }

    // BULK COPY開始
    if (isBCPError(L"bcp_init", bcp_init(hdbc, L"TESTDB.dbo.TEST1", NULL, NULL, DB_IN))) {
        printError(henv, hdbc);
        return 1;
    }

    // 列1を設定する
    if (isBCPError(L"bcp_bind(1)", bcp_bind(hdbc, (LPCBYTE)"XXX", 0, SQL_VARLEN_DATA, (LPCBYTE)"", 1, SQLCHARACTER, 1))) {
        printError(henv, hdbc);
        return 1;
    }
    // 列2を設定する
    if (isBCPError(L"bcp_bind(2)", bcp_bind(hdbc, (LPCBYTE)"YYY", 0, SQL_VARLEN_DATA, (LPCBYTE)"", 1, SQLCHARACTER, 2))) {
        printError(henv, hdbc);
        return 1;
    }

    // 1行送信する
    if (isBCPError(L"bcp_sendrow", bcp_sendrow(hdbc))) {
        printError(henv, hdbc);
        return 1;
    }

    // BULK COPY終了
    if (bcp_done(hdbc) != SUCCEED) {
        printError(henv, hdbc);
        return 1;
    }

    SQLDisconnect(hdbc);
    SQLFreeHandle(SQL_HANDLE_DBC, hdbc);
    SQLFreeHandle(SQL_HANDLE_ENV, henv);

    return 0;
}

なお、ロード先のテーブルはこんな感じ。

CREATE TABLE TEST1 (
	ID char(4),
	VALUE varchar(20)
)

 

ビルドについて

追加の依存ファイルに、sqlncli11.libが必要。

SQL ServerSDK\Lib フォルダの下にある。

 

DBへの接続

DBに接続するAPIは、SQLConnectとSQLDriverConnectがある。

SQLConnectの方は、あらかじめODBCデータソースを定義しておかなくてはならない。

SQLDriverConnectなら、JDB Driverのように、サーバ名やインスタンス名、ユーザ名、パスワードを直接渡して接続できる。

 

bcp_bind

bcp_bindは、1列分の値を設定する。

つまり、10列あるテーブルであれば、10回bcp_bindを呼ぶ必要がある。

どの列に設定するかは、最後の引数で列のインデックス(1始まり)を指定する。

 

bcp_sendrow

bcp_sendrowは、bcp_bindを全列に設定した後に呼び、1行分のデータを送信する。

つまり、100行のデータをロードする場合は、100回bcp_sendrowを呼ぶわけだ。

 

bcp_done

bcp_doneは、コミットしてロードを終了する。

bcp_batchを使うと、終了せずにコミットだけすることができる。

 

できてみれば簡単そうだが…

実際には、何ヶ所かはまってしまった。

SQLDriverConnectに渡す接続文字列が分からなかったりとか、bcp_bindで1行分のデータをまとめて渡すと思い込んでいたりとか…。

 

今度は、実際にこれで大量データをロードしてみたい。

 

OS Windows 7 Enterprise
SQL Server 2012
Visual Studio 2010