今日もプログラミング

IT技術とかプログラミングのこととか特にJavaを中心に書いていきます

embulk-output-oracleを高速化したので使い方についてまとめる

embulk-output-oracleを劇的に高速化した0.2.2がリリースされたので、使い方について書いてみる。

どのくらい速くなったかは改めてまとめる予定だけど、これに沿った感じになると思う。

 

3つの挿入モード

READMEにも書いたけど、embulk-output-oracleには

  • normal
  • direct
  • oci

の3つの挿入モードがあり、insert_methodで指定する。

normal

"normal"は文字通り、通常のINSERT文による挿入。

direct

"direct"は、ダイレクト・パス・インサートにより挿入する。これは、いろいろ制約があるものの通常のINSERTより高速だ(SQL*Loaderに近い)。

ちなみに、INSERT文に/* APPEND_VALUES */ヒントを付けることにより、ダイレクト・パス・インサートにすることができる。

oci

"oci"は、Oracleのネイティブクライアント(Oracle Instant Client)をOCI(Oracle Call Interface)を経由で使ってダイレクト・パス・インサートを実行する。JDBC経由の"direct"よりさらに高速だ。

なお、"oci"を使う場合でも、データベースのメタ情報の取得などにJDBCを使うので、JDBCドライバは必須だ。

 

接続の設定

host、port、database、user、passwordを指定する。

host、port、databaseの代わりに、urlを指定してもよい。

ただし、insert_methodが"oci"の場合は、url指定でもhost、port、databaseは必須だ。OCIでの接続にこれらが必要だからだ。

 

JDBCドライバ

embulk-output-oracleJDBCドライバを同梱していないので、別途ダウンロードする必要がある。

driver_pathに、ドライバのパスを指定する。

動作するバージョンについては、後述。

 

データ型

データ型については、0.2.1のときと変わっていないので、こちらを参照。

 

ymlのサンプル

ymlのサンプルはこんな感じ。

in:
  type: file
  path_prefix: '/data/example.csv'
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    columns:
    - {name: ID, type: string}
    - {name: VARCHAR2_ITEM, type: string}
    - {name: INTEGER_ITEM, type: long}
    - {name: NUMBER_item, type: string}
    - {name: DATE_ITEM, type: timestamp, format: '%Y/%m/%d'}
    - {name: TIMESTAMP_ITEM, type: timestamp, format: '%Y/%m/%d %H:%M:%S'}
out:
    type: oracle
    host: localhost
    database: TESTDB
    user: TEST_USER
    password: test_pw
    table: EXAMPLE
    mode: insert
    insert_method: direct
    driver_path: /drivers/ojdbc7.jar

 

embulk-input-filesplitにより更に高速化

入力がテキストファイルの場合は、embulk-input-filesplitによりマルチスレッドでファイルを読み込むことにより、更に高速化する。

ymlのサンプルはこんな感じ。

in:
  type: filesplit
  path: '/data/example.csv'
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    columns:
    - {name: ID, type: string}
    - {name: VARCHAR2_ITEM, type: string}
    - {name: INTEGER_ITEM, type: long}
    - {name: NUMBER_item, type: string}
    - {name: DATE_ITEM, type: timestamp, format: '%Y/%m/%d'}
    - {name: TIMESTAMP_ITEM, type: timestamp, format: '%Y/%m/%d %H:%M:%S'}
out:
    type: oracle
    host: localhost
    database: TESTDB
    user: TEST_USER
    password: test_pw
    table: EXAMPLE
    mode: insert
    insert_method: direct
    driver_path: /drivers/ojdbc7.jar

 

動作環境について

全ての組み合わせで動作確認を行うことはできていないが、とりあえず分かってる範囲で書いてみる。

サーバ側は12cで確認したが、前のバージョンでも動くと思う。たぶん。

normal(通常のINSERT)

12cのJDBCドライバ、11gR2のJDBCドライバでは正常に動作したが、11gR1のJDBCドライバでは以下のエラーが発生した。

Caused by: java.sql.BatchUpdateException: ORA-00928: SELECTキーワードがありません。
    at oracle.jdbc.driver.DatabaseError.throwBatchUpdateException(DatabaseError.java:629)
    at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9409)
    at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:211)
    at org.embulk.output.jdbc.StandardBatchInsert.flush(StandardBatchInsert.java:64)

10gのJDBCドライバではそもそも接続できなかった(Java1.4の時代のだし…)。

 

direct(ダイレクト・パス・インサート)

12cのJDBCドライバで以下のエラーが発生。11gR2、11gR1のJDBCドライバでは正常に動作した。

Caused by: java.lang.ArrayIndexOutOfBoundsException
    at java.lang.System.arraycopy(Native Method)
    at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:12208)
    at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:246)
    at org.embulk.output.jdbc.StandardBatchInsert.flush(StandardBatchInsert.java:64)

 

oci(OCIによるダイレクト・パス・インサート)

"oci"では、embulk-output-oracle側もネイティブライブラリが必要になる。とりあえずWindows(x68-64)用とLinux(x68-64)用しか同梱していないので…、他の環境の人はここの下にあるビルドスクリプトを参考にご自分でビルドしてください。。

また、"oci"を使うにはOracle Instant Clientをインストールする必要がある。

Windowsの場合はPATH、Linuxの場合はLD_LIBRARY_PATHを設定する。

また、Linuxの場合は

ln -s libclntsh.so.12.1 libclntsh.so
ln -s libocci.so.12.1 libocci.so

も必要。embulk-output-oracleのネイティブライブラリは11gのライブラリを使ってビルドしているので、12cのクライアントを使う人は更に

ln -n libocci.so.12.1 libocci.so.11.1
ln -n libclntsh.so.12.1 libclntsh.so.11.1

も必要。

 

動作確認したところでは、11gR1のLinux版のクライアントで

Caused by: java.sql.SQLException: OCI : OCIDirPathPrepare failed. ORA-01009: missing mandatory parameter

        at org.embulk.output.oracle.oci.OCIWrapper.throwException(OCIWrapper.java:74)
        at org.embulk.output.oracle.oci.OCIWrapper.prepareLoad(OCIWrapper.java:40)
        at org.embulk.output.oracle.oci.OCIManager.open(OCIManager.java:34)

というエラーが発生した。

 

まとめると、こんな感じ。

モードバージョン
11gR111gR212c
normal ×
direct ×
oci Linuxは×

結構×があるな…。

 

まとめ

embulk-output-oracle(0.2.2)では、通常のINSERTの他に、ダイレクト・パス・インサート、OCIによるダイレクト・パス・インサートもできるようになった。

OCIはちょっと環境構築が面倒だけどこの中で最速で、embulk-input-filesplitを組み合わせると更に高速化するのでお勧め。

どのくらい速くなるかは、近いうちに改めてまとめる予定。

 

あと、embulk-output-jdbcのcommitterになりました。

 

Embulk 0.5.4
embulk-output-oracle 0.2.2

 

2015/4/17: ymlファイル中のcolumnsのnameを小文字から大文字に修正しました。Oracle上のカラム名はおそらく大文字の場合が多いと思いますが、大文字/小文字も含めてカラム名が一致しないと無視されてしまいます。

JDBCからストアドプロシージャを呼び出す(Oracle、SQL Server、MySQL)

RDBMSの差異を吸収してくれるのがJDBC…、のはずが、なかなかそううまくはいかない。。

ストアドプロシージャの呼び出しも、RDMBSによって細かな違いがあるようだ。

 

MySQLのストアドプロシージャを呼び出す

まず、テスト用のストアドプロシージャを準備する。

DELIMITER //
CREATE PROCEDURE TEST1(
	IN    n1 INT,
	INOUT n2 INT,
	OUT   n3 INT)
BEGIN
	SET n3 := n1 * n2;
	SET n2 := n2 + 1;
END;
//
DELIMITER ;

ストアドプロシージャの呼び出しには、CallableStatementを利用する。

Connection#prepareCallにより得ることができる。

INの設定にはsetXXX、OUTの取得にはgetXXXを使う。

try (Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/TEST", "testuser", "testpass")) {
    try (CallableStatement statement = connection.prepareCall("CALL TEST1(?, ?, ?)")) {
        // メタ情報を取得するテスト
        for (int i = 1; i <= statement.getParameterMetaData().getParameterCount(); i++) {
            System.out.println(String.format("Parameter #%d : mode = %d", i, statement.getParameterMetaData().getParameterMode(i)));
        }

        //statement.registerOutParameter(1, Types.INTEGER);
        statement.setInt(1, 5);
        statement.setInt(2, 7);

        statement.execute();

        System.out.println(String.format("n2 = %d", statement.getInt(2)));
        System.out.println(String.format("n3 = %d", statement.getInt(3)));
    }
}

registerOutParameterを呼んでおかなくても、OUTの値は取れた。

 

Oracleのストアドプロシージャを呼び出す

同じように、テスト用のストアドプロシージャを準備する。

CREATE OR REPLACE PROCEDURE TEST1(
	n1 IN     INT,
	n2 IN OUT INT,
	n3 OUT    INT) AS
BEGIN
	n3 := n1 * n2;
	n2 := n2 + 1;
END;
/

やっぱりMySQLとは文法が違う。

「IN」とか「OUT」を指定する順番が違うのが、ちょっと混乱する。

 

ストアドプロシージャの呼び出しは、こんな感じ。

try (Connection connection = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:TESTDB", "TEST", "testpass")) {
    try (CallableStatement statement = connection.prepareCall("CALL TEST1(?, ?, ?)")) {
        // メタ情報を取得するテスト
        for (int i = 1; i <= statement.getParameterMetaData().getParameterCount(); i++) {
            // サポートされていない
            // System.out.println(String.format("Parameter #%d : mode = %d", i, statement.getParameterMetaData().getParameterMode(i)));
        }

        // 呼ばないとエラー
        statement.registerOutParameter(2, Types.INTEGER);
        statement.registerOutParameter(3, Types.INTEGER);

        statement.setInt(1, 5);
        statement.setInt(2, 7);

        statement.execute();

        System.out.println(String.format("n2 = %d", statement.getInt(2)));
        System.out.println(String.format("n3 = %d", statement.getInt(3)));
    }
}

残念ながら、ParameterMetaData#getParameterModeはサポートしていなかった。

汎用的なツールを作るときとか、ちょっと不便だなあ…。

それから、registerOutParameterを呼んでおかないと、エラーになってしまう。

(全ての引数について、setXXXかregisterOutParameterのどちらかを呼ばないといけないようだ。なので、INOUTの引数については呼ばなくても動いた。)

 

SQL Serverのストアドプロシージャを呼び出す

こちらも同様に、テスト用のストアドプロシージャを準備する。

CREATE PROCEDURE TEST1(
	@n1	INT,
	@n2	INT,
	@n3 INT OUT) AS
BEGIN
	SET @n3 = @n1 * @n2;
END;

SQL Serverには、INOUTの引数は無いのかな?

ドキュメントを見たけど、見つからなかった。

 

ストアドプロシージャの呼び出しは、こんな感じ。

try (Connection connection = DriverManager.getConnection("jdbc:sqlserver://localhost\\SQLEXPRESS:1433;databasename=TEST", "testuser", "testpass")) {
    try (CallableStatement statement = connection.prepareCall("EXEC TEST1 ?, ?, ?")) {
        // メタ情報を取得するテスト
        for (int i = 1; i <= statement.getParameterMetaData().getParameterCount(); i++) {
            System.out.println(String.format("Parameter #%d : mode = %d", i, statement.getParameterMetaData().getParameterMode(i)));
        }

        // 呼ばないとエラー
        statement.registerOutParameter(3, Types.INTEGER);

        statement.setInt(1, 5);
        statement.setInt(2, 7);

        statement.execute();

        //System.out.println(String.format("n2 = %d", statement.getInt(2)));
        System.out.println(String.format("n3 = %d", statement.getInt(3)));
    }
}

SQL Serverの場合は、「CALL」ではなく「EXECUTE」または「EXEC」で呼び出す。また、括弧を付けるとエラーになる。

Oracleと同様、registerOutParameterを呼んでおかないとエラーになる。

 

JDBCについて調べると、いつもこんな感じだな。。

 

OS Windos 7
Java 8u31
Oracle 12c
JDBC Driver 12.1.0.2
JDBC Driver 11.2.0.4
SQL Server 2012
JDBC Driver 4.1
MySQL 5.6
JDBC Driver 5.1.34

POIで正方形を描画する

ちょっと間が空いてしまったが、この前の続き。

今回はPOIを使って、正方形を描いてみたい。

テキストボックスを描くのは簡単だが、問題は座標だ。

 

POIの座標指定

POIでテキストボックスを描くには、まず

Sheet#createDrawingPatriarch() 

により、XSSFDrawingインスタンスを取得する(戻り値の型はDrawingだが、ダウンキャストできる)。そして、

XSSFDrawing#createTextbox(XSSFClientAnchor)

によりテキストボックスを追加する。このXSSFClientAnchorが座標を表している。で、こいつを得るには、

XSSFDrawing#createAnchor(int dx1, int dy1, int dx2, int dy2, int col1, int row1, int col2, int row2)

を呼ぶ。このcol1とかcol2は列のインデックス、row1とかrow2は行のインデックス、そしてdx1、dx2、dy1、dy2はセル内のX座標やY座標だ。

つまり、シートの左上からの絶対座標ではなく、セルの位置+セル内の相対座標ということだ。

 

確かに、Excelで列幅とか行の高さを変えると、それにまたがる図形の大きさも変わる。

それはそれで便利なのだが、セルとは関係なく絶対座標で図形を描きたい場合にはちょっと面倒だ。

 

POIで作成したExcelブックのフォントサイズは?

絶対座標をXSSFClientAnchorに変換するには、列の幅や行の高さが必要だ。

この前調べたように、デフォルトの列の幅や行の高さはフォントサイズに依存する。

 

POIでExcelブックを作成すると、フォントサイズは11ポイントになっている。

これは、自分の環境ではExcelのデフォルトフォントサイズと同じだ。

では、Excelのデフォルトフォントサイズを変えるとどうなるか?

「オプション」→「基本設定」で、「新しいブックの作成時」の「フォントサイズ」を16にして、再度POIでブックを作成してみる。

すると…、やはり11ポイントだった。

 

Excelの設定は反映されないのかな?

デバッガを使いつつソースを追ってみると…、

new XSSFWorkbook (org.apache.poi.xssf.usermodel)
↓
XSSFWorkbook#onWorkbookCreate
↓
...
↓
new StylesTable (org.apache.poi.xssf.model)
↓
StylesTable#initialize
↓
StylesTable#createDefaultFont
↓
XSSFFont#setFontHeightInPoints(XSSFFont.DEFAULT_FONT_SIZE)

のように呼んでいて、この定数の値は11だった。

ということで、Excelの設定にかかわらず11ポイントになるということだな。

だったら、列の幅とか行の高さのデフォルト値は、決め打ちでいいかな…。

 

X座標の計算

マジックナンバーだらけだが…、以下のようなメソッドを作ってみた。

    private static int[] getAnchorX(Sheet sheet, int xInPixel) {
        int xInEMU = xInPixel * XSSFShape.EMU_PER_PIXEL;
        int currentInXEMU = 0;

        Row row = sheet.getRow(0);
        if (row == null) {
            row = sheet.createRow(0);
        }
        for (int columnNum = 0;; columnNum++) {
            if (row.getCell(columnNum) == null) {
                row.createCell(columnNum);
            }

            if (sheet.getColumnWidth(columnNum) == sheet.getDefaultColumnWidth() * 256) {
                // Excelのデフォルト値は違うので明示的に設定する
                sheet.setColumnWidth(columnNum, 9 * 256);
            }

            // Excelでは256columnWidthが8ピクセル
            int columnWidthInEMU = sheet.getColumnWidth(columnNum) / 256 * 8 * XSSFShape.EMU_PER_PIXEL;
            if (xInEMU < currentInXEMU + columnWidthInEMU) {
                return new int[]{columnNum, xInEMU - currentInXEMU};
            }
            currentInXEMU += columnWidthInEMU;
        }
    }

絶対X座標を渡すと、列のインデックスとセル内の相対X座標が返る。

(本当はそれ用のクラスを作って返した方がいいんだけど、サンプルなので)

順番に列幅を計算しながら、X座標から引いていく。引けなくなった時点で値を返せばよい。

 

デフォルトの列幅の処理が気持ち悪いが…、この前調べたように、POIのデフォルトの列幅がExcelと違っているためで、仕方ない…。

 

Y座標の計算

こちらも似たような感じ。

    private static int[] getAnchorY(Sheet sheet, int yInPixel) {
        int yInEMU = yInPixel * XSSFShape.EMU_PER_PIXEL;
        int currentYInEMU = 0;

        for (int rowNum = 0;;rowNum++) {
            Row currentRow = sheet.getRow(rowNum);
            if (currentRow == null) {
                currentRow = sheet.createRow(rowNum);
            }

            if (currentRow.getHeight() == sheet.getDefaultRowHeight()) {
                // Excelのデフォルト値は違うので明示的に設定する
                currentRow.setHeightInPoints(13.5f);
            }

            // 行の高さ(EMU)
            // ※row.getHeight()は1/20ポイント単位
            int rowHeightInEMU = (int)(currentRow.getHeight() / 20.0D * XSSFShape.EMU_PER_POINT);
            if (yInEMU < currentYInEMU + rowHeightInEMU) {
                return new int[]{rowNum, (yInEMU - currentYInEMU)};
            }
            currentYInEMU += rowHeightInEMU;
        }
    }

 

POIで正方形を描く

ここまでできれば、後は簡単。

100ピクセル×100ピクセルの正方形を描いてみる。

    try (Workbook workbook = new XSSFWorkbook()) {
        Sheet sheet = workbook.createSheet();
        XSSFDrawing drawing = (XSSFDrawing)sheet.createDrawingPatriarch();

        int[] startX = getAnchorX(sheet, 100);
        int[] endX = getAnchorX(sheet, 200);
        int[] startY = getAnchorY(sheet, 100);
        int[] endY = getAnchorY(sheet, 200);

        XSSFClientAnchor anchor = drawing.createAnchor(startX[1], startY[1], endX[1], endY[1], startX[0], startY[0], endX[0], endY[0]);
        XSSFTextBox textBox = drawing.createTextbox(anchor);
        textBox.setText("Hello World!");
        textBox.setLineWidth(1);
        textBox.setLineStyleColor(255, 0, 0);

        File file = new File("test.xlsx");
        try (OutputStream out = new FileOutputStream(file)) {
            workbook.write(out);
        }
    }

 

できたExcelブックはこれ。

f:id:hito4_t:20150330220031p:plain

 

本当に正方形になっているのかな?と思って、ペイントで100×100の正方形を描いて、隣に置いてみた。

ぴったり!

f:id:hito4_t:20150330220035p:plain

 

でも、マジックナンバーとかデフォルト値の処理とか、汚らしいソースになってしまった…。

もっときれいにできる方法は無いのかなあ。

 

OS Windows 7
Office 2010
Java 8u31
Apache POI 3.11

OCIを使ってOracleに高速にデータをロードする

最近こればっかりだが…、現在embulk-output-oracleの高速化に励んでいる。

どうやったら大量データをOracleに高速に突っ込めるか?というのをいろいろ試してみた。

 

SQL*Loader

Oracleが提供する、大量データロード用のツール

その目的に作られているだけあって、さすがに速い。

 

JDBCからダイレクト・パス・ロード

SQL*Loaderでは、普通のINSERTではなく、ダイレクト・パス・ロードという特別な方法で高速にデータをロードしている。

実はこれ、JDBC経由でも使える。

APPEND_VALUESヒントを付けるのだ。

INSERT /*+ APPEND_VALUES */ INTO <テーブル名> ...

普通のINSERTに比べるとだいぶ速い。

とは言え、SQL*Loaderにはまだまだ敵わない。

 

OCI(Oracle Call Interface)でダイレクト・パス・ロード

速度を追求するなら、やはりJavaよりC。

OracleはCで使えるライブラリを提供していて、そこにダイレクト・パス・ロード用のAPIも含まれている。

OracleのInstant Client SDKとかいうやつをダウンロードすればよい。

ドキュメントはこれ。自分が使ったのは12cだが、なぜか11gのドキュメントしか見つからなかった。

書いてある通りなんだけど、少し補足してみる。

 

環境の準備

自分の場合はWindowsVisual Studio利用だが、どの環境でも同じようなものだと思う。

  • コンパイル時のインクルードパスに"oci.h"のパスを追加
  • リンク時のライブラリパスに"oci.lib"のパスを追加
  • リンク対象の"oci.lib"を追加

 

データベースへの接続

データベースへの接続には、OCILogonとかOCILogon2を使えばよい。

でもこいつの引数にはデータベース名、ユーザ名、パスワードしかない。

サーバ名はどうやって指定するの?と思ったが、2通りあるようだ。

  • tnsname.oraでマッピングを定義する
  • データベース名に「<サーバ名>:<ポート番号>/データベース名」の形式で渡す

後者が使えるのが嬉しい。

 

ロードの準備

OCIDirPathPrepareによりロードの準備をするのだが、その前にいろいろ属性を設定しなくてはいけない。

ドキュメントには「操作するオブジェクトの名前、列データの外部属性、およびすべてのロード・オプション」とあるが、具体的には最低限何を設定すればよいのか調べてみた。

  • テーブル名 (OCI_ATTR_NAME)
  • 列数 (OCI_ATTR_NUM_COLS)
  • 各列の列名 (OCI_ATTR_NAME)
  • 各列の型 (OCI_ATTR_DATA_TYPE)
  • 各列のサイズ (OCI_ATTR_DATA_SIZE)

 

ロード処理の流れ

基本的には、

  1. OCIDirPathColArrayEntrySetにより配列にデータを設定する
  2. OCIDirPathColArrayToStreamにより配列をストリームに変換する
  3. OCIDirPathLoadStreamによりストリームをロードする
  4. まだデータがある場合はOCIDirPathStreamResetによりストリームをリセットし、1に戻る
  5. OCIDirPathFinishにより終了する

という流れになる。

配列とストリーム、と2段階になるのがちょっとややこしい。

配列のサイズは、OCIAttrGet関数で取得できる(属性の名前はOCI_ATTR_NUM_ROWS)。

 

実は、配列のキャパシティとストリームのキャパシティは異なり、2で入り切らない場合がある(OCI_CONTINUEが返される)。

その場合は、3、4と進んだ後、2に戻って残りの配列をストリームに変換する。

何行変換されたかは、OCIAttrGet関数で取得できる(属性の名前はOCI_ATTR_ROW_COUNT)。

OCIDirPathColArrayToStreamにはrowcnt(配列内の行数)とrowoff(配列内の開始インデックス)を渡すことができるので、rowoffに足してやればよい。

rowcntの方は、残りの行数ではなくて配列全体の行数なので、変える必要はない(ここを間違えてはまってしまった)。

 

さすがに速い!!!

で、実際にロードしてみたら、さすがに速かった。

SQL*Loaderよりちょっと遅いくらい。

 

まとめ

OCIでのダイレクト・パス・ロードはかなり速いので、スピード狂の人はお試しあれ。

 

OS Windows 7 (64bit)
Oracle 12c
Instant Client SDK 12.1.0.2.0 (Windows x64)
Visual Studio 2010

JNIでC側のポインタを保持する方法について

Cのポインタを保持したい

現在embulk-output-oracleを高速化するために、JNI(Java Native Interface)を使ってプログラミングしている。

 

その時ちょっと悩んだのが、Cのポインタをどうやって保持するか、だ。

フローとしては、

Java C
初期化処理 必要なメモリを確保
処理本体 メモリを使っていろいろ処理
後処理 メモリを解放

のような感じだ。

初期化時に確保したメモリのポインタを、後続の処理に引き継ぐ必要がある。

 

どうやってポインタを保持するか

まず、C側でグローバル変数で持つ方法。

これだと並列処理に対応できないからだめだ。

 

Java側にポインタを返し、後続の処理で渡し直してもらう方法がいいだろう。

と思ったが、Javaにはポインタ型というのが無い。intとかlongにキャストすればいいのかな?とも思ったが、処理系依存になるし気持ち悪いなあ。

 

ちょっと面倒になるが、Java側にはintのIDを返し、C側でIDとポインタとのマップを管理する方法も考えられる。

STL(Standard Template Library)を使えば、mapも簡単。

と思ったが…、どうもスレッドセーフではないっぽい。別のライブラリで排他制御を行わないといけないようだ。

Cで排他制御をおこなうライブラリは…?うーん、めんどくさくなってきた。

 

バイト配列で保持すればいいんじゃないだろうか

結局、処理系によってポインタのサイズは違うかもしれないが、バイト配列で持っちゃえばいいんじゃないかな?と考えたのだ。

 

ポインタからバイト配列の変換はこんな感じ。

JNIEXPORT jbyteArray JNICALL initialize(JNIEnv *env, jobject)
{
    void *buffer = ...
    jbyteArray pointer = env->NewByteArray(sizeof(void*));
    env->SetByteArrayRegion(pointer, 0, sizeof(void*), (jbyte*)&buffer);
    return pointer;
}

 

Java側では byte[] pointer のように持つことができる。

 

バイト配列をポインタに戻すのはこんな感じ。

JNIEXPORT void JNICALL release(JNIEnv *env, jobject, jbyteArray pointer)
{
    void *buffer;
    env->GetByteArrayRegion(pointer, 0, sizeof(buffer), (jbyte*)&buffer);
    ...
}

 

一手間掛かるけど、これなら処理系によらずJava側のコードは統一でよい、はず。たぶん。

 

embulk-output-oracle(0.2.1)の仕様について

embulk-output-oracleが一応公開されたが、とりあえず動く、というレベルのもので、まだ完成度は低い。

とは言え、現状の仕様についてまとめてみる。

 

Oracleのバージョン

12c(Windows)で検証したが、たぶん他のでも動くと思う。

 

driver_path

ymlファイル中のdriver_pathにOracleJDBCドライバのパスを指定する必要がある。

これは、OracleJDBCドライバをプラグインと一緒に配布してよいかどうか分からなかったため。

 

自動的に作成されるテーブルの型

出力先のテーブルが存在しない場合は、自動的に作成される。

その際の型についてまとめる。

embulkの型Oracleの型
boolean 未対応(エラーになる)
long NUMBER(19)
double 未対応(エラーになる)
string CLOB
timestamp TIMESTAMP(6)

 

出力可能な型

次に、実際に出力可能なOracleの型。

Embulkで入力された値は型変換されてOracleに出力される。可能な型変換について表にまとめた。

embulkの型Oracleの型
CHARVARCHAR2LONGCLOBNUMBERFLOATDATETIMESTAMP
boolean "true" / "false" 1.0 / 0.0 NULL
long NULL
double NULL
string NULL
timestamp NULL NULL

実は内部的には、NUMBER型にはStringで突っ込んでいる(longだと小数に対応できないし、floatだと桁数が大きいときに誤差が出るし、EmbulkにはBigDecimalは無いし、なので)。

NCHAR、NVARCHAR2などは未対応。

 

今後、いろいろな型に対応できるようにしていきたい。

それから、パフォーマンスについても検証中。

 

OS Windows 7
Java JDK 8u31
Embulk 0.5.2
embulk-output-oracle 0.2.1
Oracle 12c

ファイルを分割して入力するEmbulkプラグインを作ってみた

Embulkの並列処理

Embulkは、処理を複数のタスクに分割して並列に実行する仕組みを備えている。

しかし、標準のファイル入力プラグインでは、単純に1つのファイルを入力すると1タスクにしかならないようだ(こちら参照)。

ソースを読んでみると、複数ファイルを読むと複数タスクになるようだ。

試しにこんな感じに4ファイルを用意して、

/test
 └in
  ├in1.csv
  ├in2.csv
  ├in3.csv
  └in4.csv

こんなymlファイルを用意して実行したら、

in: 
  type: file 
  path_prefix: '/test/in' 
  parser:
    type: csv
    columns: 
    - {name: id, type: string} 
    - {name: name, type: string} 
out: 
  type: file 
  path_prefix: '/test/out' 
  file_ext: .csv 
  formatter: 
    type: csv

こんな風に4ファイル出力された。ログも4タスク分出力されている。

/test
 ├out.000.00.csv
 ├out.001.00.csv
 ├out.002.00.csv
 └out.003.00.csv

 

出力ファイルを見たところ、入力ファイルにそのまま対応しているようだ。

おそらく、こんな感じで実行されているのだろう。

入力ファイル1 → 入力タスク1 → 出力タスク1 → 出力ファイル1
入力ファイル2 → 入力タスク2 → 出力タスク2 → 出力ファイル2
入力ファイル3 → 入力タスク3 → 出力タスク3 → 出力ファイル3
入力ファイル4 → 入力タスク4 → 出力タスク4 → 出力ファイル4

 

ファイル分割入力プラグイン

という訳で、入力ファイルを複数のタスクで分割して読み込むプラグインを作ってみた。Hadoopとかも参考にしている。

このプラグインにより、入力が1ファイルでも並列で実行される。

ソースはこちら

設定項目はこんな感じ。

項目説明デフォルト値
tasks タスク数 CPUコア数×2
path 入力ファイルのパス CPUコア数×2
header_line 先頭行はヘッダか false

 

設定ファイルの例は、こんな感じ。

in:
  type: filesplit
  tasks: 4
  path: '/test/in.csv'
  parser:
    type: csv
    columns:
    - {name: id, type: string}
    - {name: name, type: string}
    - {name: value1, type: string}
    - {name: value2, type: string}
out: 
    type: file
    path_prefix: '/test/out'
    file_ext: .csv
    formatter:
      type: csv

 

例えば、以下のようなファイル(改行はCRLFで、全45バイト)を4タスクで分割すると、

a,a,a,a
b,b,b,b
c,c,c,c
d,d,d,d
e,e,e,e

1~11バイト、12~22バイト、23~34バイト、35バイト~45バイトに分割される。

a,a,a,a
b,|b,b,b
c,c,|c,c
d,d,d,|d
e,e,e,e|

これだと行が分断されてしまうので、改行位置までずらし、最終的には以下のように分割される。

a,a,a,a
b,b,b,b|
c,c,c,c|
d,d,d,d|
e,e,e,e|

改行コードは、CRLF/CR/LFのいずれにも対応している。

 

header_lineは、先頭行がヘッダかどうかを指定する。

parserでheader_lineがtrueだと、分割ファイルの先頭行(つまり、元のファイルの途中の行)がヘッダとみなされてスキップされてしまう。

そこで、ダミーのヘッダ行を付加してから後ろに渡すようにしている。

parserと重複して指定しなくてはいけないのが気持ち悪いが…、仕方ないか。

 

並列実行の威力!

これを使ってMySQLに突っ込んだところ、結構速くなった。

4並列でさすがに4倍まではいかなかったが、3.2倍くらいになった。

詳細はこちら

 

今後の課題

現状では、CRLF/CR/LFのいずれかが出現すると、改行として認識する。

しかし、ExcelからCSV保存したときなんかは、改行はCRLF、項目内の折り返しはLF、のように使い分けてたりする。

1,aaaa,xxxx<CRLF>
2,"bb<LF>bb"yyyy<CRLF>

これに対応するには、改行コードを明示的に指定できるようにする必要があるな。

それから、今は1ファイルしか入力できないが、複数ファイルの入力にも対応したい。

 

OS Windows 7
Java JDK 8u31
Embulk 0.5.0
embulk-input-filesplit 0.1.1