今日もプログラミング

IT技術とかプログラミングのこととか特にJavaを中心に書いていきます

POIでコネクタを描いてみる

POIでコネクタは描けるのか?

SIerExcelが大好きなので、設計書もExcelで作ることが多い(いいか悪いかは置いといて…)。

でも手で全部書くのは面倒なので、Javaのソースから自動生成できると便利だなー、とか思うことがある。

設計書には、オートシェイプとかコネクタを使って図も入れたい。

だけど、POIでコネクタは描けるんだろうか?

 

一応それらしいメソッドはあったが…

Javadocを覗くと、XSSFDrawingクラスにcreateConnectorというメソッドがあった。

これっぽいんだけど…、こいつの引数はXSSFClientAnchorなんだよね。

つまり、座標を指定するんだけど、接続先の図形は指定しなくていいの?

とりあえず、コードを書いて実行してみる。

    try (Workbook workbook = new XSSFWorkbook()) {
        Sheet sheet = workbook.createSheet();
        XSSFDrawing drawing = (XSSFDrawing)sheet.createDrawingPatriarch();

        XSSFClientAnchor anchor1 = drawing.createAnchor(0, 0, 400000, 100000, 1, 1, 1, 1);
        XSSFTextBox textBox1 = drawing.createTextbox(anchor1);
        textBox1.setLineWidth(1);
        textBox1.setLineStyleColor(255, 0, 0);
        textBox1.setFillColor(255, 255, 0);

        XSSFClientAnchor anchor2 = drawing.createAnchor(0, 0, 400000, 100000, 2, 3, 2, 3);
        XSSFTextBox textBox2 = drawing.createTextbox(anchor2);
        textBox2.setLineWidth(1);
        textBox2.setLineStyleColor(255, 0, 0);
        textBox2.setFillColor(255, 255, 0);

        XSSFClientAnchor anchor3 = drawing.createAnchor(200000, 100000, 200000, 0, 1, 1, 2, 3);
        XSSFConnector connector = drawing.createConnector(anchor3);
        connector.setLineWidth(1);
        connector.setLineStyleColor(128, 0, 0);

        File file = new File("connector.xlsx");
        try (OutputStream out = new FileOutputStream(file)) {
            workbook.write(out);
        }
    }

一応それらしいのはできたんだけど…

f:id:hito4_t:20150515175906p:plain

テキストボックスを動かしてみると、線が付いてこない。

f:id:hito4_t:20150515180416p:plain

これは「コネクタ」じゃなくて、ただの「線」だ。

 

コネクタXMLを見てみる

とは言え、XSSFConnectorにそれらしいメソッドは見当たらない。

ただ、XMLレベルでいじれるメソッド(getCTConnector)はあるので、これを使えば何とかなるかも。

という訳で、コネクタが保存されたxlsxファイルのXML構造を調べてみる。

上で作成したxlsxファイルの他、ちゃんとコネクタの接点をつなげたxlsxファイルを用意して、拡張子をzipに変えて展開、xl/drawings/drawing1.xmlを見比べてみる。

すると、後者だけにある怪しい要素が見つかった。

<xdr:wsdr>
    <xdr:twoCellAnchor>
        <xdr:cxnSp>
            <xdr:nvCxnSpPr>
                <xdr:cNvCxnSpPr>
                    <a:stCxn id="2" idx="2"/>
                    <a:endCxn id="3" idx="0"/>
                </xdr:cNvCxnSpPr>
            ...
    

おそらく、idは図形を特定するID、idxは図形における接点の場所(たぶん上が0で下が2)だろう。

 

IDの謎

で、以下のようなコードを追加したんだけど…、うまくいかない…。

    connector.getCTConnector().getNvCxnSpPr().getCNvCxnSpPr().addNewStCxn().setId(textBox1.getCTShape().getNvSpPr().getCNvPr().getId());
    connector.getCTConnector().getNvCxnSpPr().getCNvCxnSpPr().getStCxn().setIdx(2);
    connector.getCTConnector().getNvCxnSpPr().getCNvCxnSpPr().addNewEndCxn().setId(textBox2.getCTShape().getNvSpPr().getCNvPr().getId());
    connector.getCTConnector().getNvCxnSpPr().getCNvCxnSpPr().getEndCxn().setIdx(0);

いろいろ調べてみると、どうもIDがおかしいっぽい。

正しいxlsxの方は、

テキストボックス1 = 2
テキストボックス2 = 3
コネクタ = 4

なんだけど、POIの方は

テキストボックス1 = 1
テキストボックス2 = 2
コネクタ = 1

になっている。

IDが重複しているのがおかしいし、正しいxlsxでは1が欠番になっている。

以下のようにしたら、どうやらうまくいったようだ。

    textBox1.getCTShape().getNvSpPr().getCNvPr().setId(2);
    textBox2.getCTShape().getNvSpPr().getCNvPr().setId(3);
    connector.getCTConnector().getNvCxnSpPr().getCNvPr().setId(4);
    connector.getCTConnector().getNvCxnSpPr().getCNvCxnSpPr().addNewStCxn().setId(textBox1.getCTShape().getNvSpPr().getCNvPr().getId());
    connector.getCTConnector().getNvCxnSpPr().getCNvCxnSpPr().getStCxn().setIdx(2);
    connector.getCTConnector().getNvCxnSpPr().getCNvCxnSpPr().addNewEndCxn().setId(textBox2.getCTShape().getNvSpPr().getCNvPr().getId());
    connector.getCTConnector().getNvCxnSpPr().getCNvCxnSpPr().getEndCxn().setIdx(0);

ちゃんとつながっている!

f:id:hito4_t:20150515190742p:plain

とは言え、また今回も謎が残ってしまった…。

 

OS Windows 7
Office 2010
Java 8u31
Apache POI 3.11

EmbulkのCSV parserのスキーマをDBメタ情報により定義する

DBメタ情報からCSVの列を定義したい

EmbulkCSV parserを使うときは、ymlファイルに列の定義を書く必要がある。

Embulkにはguessという便利な機能があって、CSVファイルから列の定義を出力することもできる。

だけどCSVファイルをDBにロードする場合は、ロード先のDBのメタ情報から動的に列を定義できないかな?と考えたのだ。

  • 項目が追加されたりした場合でもymlを修正しなくてよい
  • テーブルがたくさんある場合でもymlをコピーしてテーブル名を直すだけでよい
  • guessがだまされる特殊なケース(一見YYYYMMDDで日付型っぽいけど実は文字列型で'99999999'のような値がまれに出現する列とか)でも大丈夫

のように、あると便利そうだ。

 

どうやって実現するか?

CSVパーサを一から書くのは大変なので、まずはCsvParserPluginを継承する。

transactionメソッドをオーバーライドすれば、列の定義(Schemaオブジェクト)を差し替えることができそうだ。

 

どうやってDBからSchemaオブジェクトを作るか?

これも一から書くのは大変過ぎるので、embulk-input-jdbcembulk-output-jdbcをうまいこと使おう。

どちらもDBから列の情報を取得しているが、embulk-input-jdbcのAbstractJdbcInputPluginクラスにSchemaを返してくれるメソッドがあった。

    private Schema setupTask(JdbcInputConnection con, PluginTask task) throws SQLException

privateだけど…。リフレクションで呼ぶか…。

DBに出力するんだから、embulk-output-jdbcの方が適切な気もするが、残念ながら適当なメソッドが無かった。

DB→CSV→別のDB のような使用法を考えれば、embulk-input-jdbcでもいいのかな。

と言うか、input側とoutput側で似たようなことをやってるので、一箇所にまとめられればよいのだが。

 

プラグインから別のプラグインを呼ぶ

    ExecSession session = Exec.session();
    InputPlugin input = session.newPlugin(InputPlugin.class, new PluginType(type));

のようにやると、別のプラグインインスタンス化できるようだ。

今回は呼びたいメソッドがprotectedとかprivateだったので、とりあえずリフレクションで呼ぶようにしてしまった。

embulk-input-jdbcが修正されたら動かなくなってしまうかも。。

 

embulk-parser-jdbc-schema-csv

という訳で、embulk-parser-jdbc-schema-csvを作ってみた。

ちょっと名前が長いけど…。

ともかく、使い方はこんな感じ。

in:
  type: file
  path_prefix: 'data/test.csv'
  parser:
    type: jdbc-schema-csv
    delimiter: ","
    header_line: false
    schema:
      type: mysql
      host: localhost
      database: embulk_test
      user: embulk_user
      password: embulk_pass
      table: input_test
out:
  type: mysql
  host: localhost
  database: embulk_test
  user: embulk_user
  password: embulk_pass
  table: input_test
  mode: insert

columnsの代わりにschemaを指定する。

すると、embulk-parser-jdbc-schema-csvがDBのメタ情報からCSV列を定義してくれる。

対応するinputプラグイン(この場合はembulk-input-mysql)もインストールされている必要がある。

 

でもこれだとschemaとoutでかなり定義が重複してしまう。

エイリアスを使うと、すっきりする。

in:
  type: file
  path_prefix: 'data/test.csv'
  parser:
    type: jdbc-schema-csv
    delimiter: ","
    header_line: false
    schema: &OUT
      type: mysql
      host: localhost
      database: embulk_test
      user: embulk_user
      password: embulk_pass
      table: input_test
      mode:  insert
out: *OUT

うん、いい感じだ。

 

Embulk 0.6.5
embulk-input-jdbc 0.4.0
embulk-output-jdbc 0.2.3
embulk-parser-jdbc-schema-csv 0.0.1

Embulkの設定ファイルでエイリアスを使う

Embulkの設定ファイルを書いていると、同じ値が繰り返し現れることがある。

例えば、CSVをTSVに変換する設定ファイルを書いてみると、

in:
  type: file
  path_prefix: 'data/test.csv'
  parser:
    type: csv
    charset: SJIS
    delimiter: ","
    header_line: false
    columns:
    - {name: ID, type: long}
    - {name: STR, type: string}
out:
  type: file
  path_prefix: result
  file_ext: tsv
  formatter:
    type: csv
    charset: SJIS
    delimiter: "\t"
    header_line: false
    columns:
    - {name: ID, type: long}
    - {name: STR, type: string}

のように、parserとformatterとで重複する内容が出てくる。

何とかできないのかなー、と考えていたら、YAMLにはエイリアスの仕組みがあるようだ。

以下のように書けば、OK。

in:
  type: file
  path_prefix: 'data/test.csv'
  parser:
    type: csv
    charset: &CHARSET SJIS
    delimiter: ","
    header_line: false
    columns: &COLUMNS
    - {name: ID, type: long}
    - {name: STR, type: string}
out:
  type: file
  path_prefix: result
  file_ext: tsv
  formatter:
    type: csv
    charset: *CHARSET
    delimiter: "\t"
    header_line: false
    columns: *COLUMNS

 

Oracleでembulk-input-jdbcを試してみた

※2016/6/23: この記事は古いので、 こちらをご参照ください。

 

embulk-output-oracleもだいぶ良くなってきたので、そろそろinputの方も試してみたい。

どうやらembulk-input-oracleは無いようなので、汎用的なembulk-input-jdbcを使ってみることにした。

準備

まず、テーブルを準備する。

とりあえずは、比較的よく使われそうな型を用意した。

CREATE TABLE INPUT_TEST (
    ID     NUMBER(8,0),
    NUM    NUMBER(12,2),
    STR    CHAR(8),
    VARSTR VARCHAR2(8),
    DT     DATE,
    TIME0  TIMESTAMP(0),
    TIME6  TIMESTAMP,
    TIME9  TIMESTAMP(9),
    PRIMARY KEY(ID)
);

で、テストデータを用意して、

1,,,,,,,
2,123.4,chr1,varchr1,2015-04-24,2015-04-24 01:02:03,2015-04-24 01:02:03.12345,2015-04-24 01:02:03.12345678
3,1234567890.12,chr12345,varchr12,2015-12-31,2015-12-31 23:59:59,2015-12-31 23:59:59.123456,2015-12-31 23:59:59.123456789

embulk-output-oracleで流し込む。

in:
  type: file
  path_prefix: 'data/output-oracle.csv'
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    header_line: false
    columns:
    - {name: ID, type: long}
    - {name: NUM, type: string}
    - {name: STR, type: string}
    - {name: VARSTR, type: string}
    - {name: DT, type: timestamp, format: '%Y-%m-%d'}
    - {name: TIME0, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: TIME6, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N'}
    - {name: TIME9, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N'}
out:
    type: oracle
    host: localhost
    database: TESTDB
    user: TEST_USER
    password: test_pw
    table: INPUT_TEST
    mode: insert
    insert_method: normal
    driver_path: 'driver/ojdbc7.jar'

一応SELECTして確認してみると…、あれ?TIME9のマイクロ秒未満が切り捨てられている?

org.embulk.spi.time.TimestampParserのソースを見た感じだと、マイクロ秒未満には対応していないようだ。

かといって、org.embulk.output.jdbc.setter.SqlTimestampColumnSetterを見ると、stringの値をTIMESTAMP列に設定するのにも対応していない。

仕方ないので、とりあえずは直接UPDATEしおくか。

UPDATE INPUT_TEST SET TIME9='2015-04-24 01:02:03.12345678' WHERE ID=2;
UPDATE INPUT_TEST SET TIME9='2015-12-31 23:59:59.123456789' WHERE ID=3;
COMMIT;

 

embulk-input-jdbcでデータを取り出す

embulk-input-jdbcのドキュメントを見ると、csv parserのように列を定義しなくていいみたいだ。

楽ちん。

in:
  type: jdbc
  driver_path: driver/ojdbc7.jar
  driver_class: oracle.jdbc.driver.OracleDriver
  url: jdbc:oracle:thin:@localhost:1521:TESTDB
  user: TEST_USER
  password: test_pw
  table: INPUT_TEST
  select: "*"

out:
  type: file
  path_prefix: input-oracle
  file_ext: csv
  formatter:
    type: csv

で、実行してみた結果がこれ。

ID,NUM,STR,VARSTR,DT,TIME0,TIME6,TIME9
1.0,,,,,,,
2.0,123.4,chr1    ,varchr1,2015-04-24 00:00:00,2015-04-24 01:02:03,2015-04-24 01:02:03,2015-04-23 16:02:03
3.0,1.23456789012E9,chr12345,varchr12,2015-12-31 00:00:00,2015-12-31 23:59:59,2015-12-31 23:59:59,2015-12-31 14:59:59

気になる点が幾つかあるなあ。

 

気になった点

整数なのに小数点以下が出力されている

NUM列はNUMBER(8,0)型なので整数だが、NUMBERはdoubleにマッピングされている。

doubleは小数点以下が0でも、toStringすると.0が付いてしまう。

doubleは有効数字の問題もあるので、本当はBigDecimalマッピングしたいんだけど、embulkにはBigDecimal型が無いので、とりあえずはStringにマッピングしておくのが一番正確か。

 

大きな数値が指数表記で出力されている

3行目のNUMは、元は1234567890.12なんだが、1.23456789012E9のように指数表記で出力された。

数値はdoubleで扱われているので、大きな数値や小さな数値は指数表記になってしまう。

 

DATE型なのに時分秒が出力されている

のだが…、ちょっとこの問題はややこしい。

OracleのDATE型は実は時分秒まで持っている。

例えば、

INSERT INTO INPUT_TEST(ID, DT) VALUES(11, TO_DATE('2015-04-24 11:11:11', 'YYYY-MM-DD HH24:MI:SS'));
INSERT INTO INPUT_TEST(ID, DT) VALUES(22, TO_DATE('2015-04-24 22:22:22', 'YYYY-MM-DD HH24:MI:SS'));

のように挿入して、単純にSELECTすると、

SELECT ID, DT FROM INPUT_TEST;
        ID DT
---------- --------
        11 15-04-24
        22 15-04-24

日付までしか表示されないのだが、TO_CHARを使うと、

SELECT ID, TO_CHAR(DT, 'YYYY-MM-DD HH24:MI:SS') FROM INPUT_TEST;
        ID DT
---------- --------
        11 2015-04-24 11:11:11
        22 2015-04-24 22:22:22

ちゃんと時分秒まで格納されているのが分かる。

デフォルトでは日付まで出力して、設定により秒まで出力する、とかできるといいのだが…。

 

なお、DATE型のデフォルトのフォーマットは、NLS_DATE_FORMATというパラメータで定義されるらしい。

 

TIMESTAMPの秒未満値が出力されない

org.embulk.standards.CsvFormatterPluginのソースを見ると、schemaの中にフォーマットの情報が格納されているようだ。

    TimestampType tt = (TimestampType) column.getType();
    builder.put(column.getIndex(), new TimestampFormatter(tt.getFormat(), task));

で、schemaを作っているのは、org.embulk.input.jdbc.AbstractJdbcInputPlugin#setupTaskあたり。

ここで、TimestampColumnGetter#getToTypeが呼ばれ、TimestampTypeが返される。

TimestampColumnGetter(org.embulk.input.jdbc.getter.ColumnGettersの内部クラス)のソースを見ると…、

    return Types.TIMESTAMP.withFormat("%Y-%m-%d %H:%M:%S")

となっており、固定で秒までのフォーマットを返していた。

列のメタデータから桁数をとって、それに応じたフォーマットを返すようにできないかな。

 

型の定義とかJDBCで取れるメタデータDBMS毎に異なるので、やはりOracle用のembulk-input-oracleを作った方がよいかもしれない。

 

embulk 0.6.5
embulk-input-jdbc 0.4.0

 

2015/4/27: 「大きな数値が指数表記で出力されている」を追記しました。

 

embulk-output-oracleのパフォーマンスを計測してみた

embulk-output-oracleとは?

embulk-output-oracleについては何度か書いているが、一応概要を。

embulkは、オープンソースのバルクデータ転送ツールで、プラグインにより様々な入出力に対応することができる。

embulk-output-oracleも出力プラグインの1つで、CSVファイルなどをOracleにロードするのに使える。

 

計測環境

計測にはAWSのm1.xlargeインスタンスを使った。

コア数は4で、メモリは7.5GB。

入力元のCSVファイルと出力先のOracleのデータファイルは、同じディスク(インスタンスストア)に置いた。

データ量は、50,000,000件(約21GB)。

 

目標値

別のあるツールでも同じデータをロードし、その時間を目標値として設定した。

 

計測パターン

embulk-output-oracleには、3つのINSERT方法がある。

  • normal - 通常のINSERT。
  • direct - ダイレクトパスINSERT。normalより速い。
  • oci - ネイティブライブラリを使ったダイレクトパスINSERT。directより更に速い。

まずはこれらについて計測した。

 

更に、ファイルを分割して入力するembulk-input-filesplitプラグインを組み合わせると、もっと速くなる。

デフォルトでは8分割(コア数×2)だが、2分割、4分割についても計測してみた。

3つのINSERT方法全てと組み合わせるのは面倒なので…、ociだけと組み合わせた。

 

それから、入力ファイルを別のディスク(EBS)に置くパターンもやってみた。

後で説明するが、ファイルの入力にかなり時間が掛かっていたためである。

 

計測結果

入力元と出力先が同一ディスクの場合。

INSERT方法embulk-input-filesplit実行時間(分)
normal - 60
direct - 32
oci - 26
oci 2 15
oci 4 12
oci 8 13
目標値 - 9

 

入力元と出力先が異なるディスクの場合。

INSERT方法embulk-input-filesplit実行時間(分)
oci 8 10
目標値 - 8

 

normalに比べると、oci+embulk-input-filesplitは劇的に速くなっている!

とは言え、目標値には届いていないので、もう少し速くしたいところだ。

 

分割数ほどには速くなっていないのは、ファイル入力やOracleへのINSERTがボトルネックになるからだろう。

ここをもうちょっと突っ込んで調べてみたい。

 

プロファイル結果

という訳で、YourKitを使って分析してみた。

ありがたいことに、Open source project licenseという無料のライセンスがある。

 

同一ディスクと別ディスクの2パターンで、10%以上時間が掛かっているメソッドを抽出してみた。

ディスクINSERT方法embulk-input-filesplitメソッド時間説明
同一 oci 8 PushbackInputStream#read 46 入力ファイルの読み込み
OCI#loadBuffer 20 OracleへのダイレクトパスINSERT
Charset#encode 12 入力文字列を出力バッファに書き込む際の文字コード変換
 
oci 8 Charset#encode 25 入力文字列を出力バッファに書き込む際の文字コード変換
OCI#loadBuffer 19 OracleへのダイレクトパスINSERT
CSVTokenizer#nextColumn 13 CSVの次の項目の取り出し
PushbackInputStream#read 10 入力ファイルの読み込み

やはり入出力まわりに時間が掛かっている。

OracleへのINSERTはこれ以上無理っぽい気もするが、ファイルの分割入力の方はまだ改善の余地があるかもしれない。OSによる違いもあるかもしれないなあ。

文字コード変換にも結構時間が掛かっているが、これも何か高速化テクニックがあるのかも。

CSVTokenizerももう少し改善できそうだ(なお、同一ディスクの方では6%くらいだった)。

 

ちなみに、同時にとったWindowsのパフォーマンスカウンタはこんな感じ。

ディスクProcessor TimeAvg. Disk Read Queue LengthAvg. Disk Write Queue Length
同一 60% 5.7 0.8
80% 1.2 0.5

 

(4/22追記)

参考まで、PushbackInputStreamとCharsetの呼び出し階層はこんな感じ。

org.embulk.spi.util.LineDecoder#poll
→ java.io.BufferedReader#readLine
  → org.embulk.spi.util.FileInputInputStream#read
    → org.embulk.spi.util.FileInputInputStream#nextBuffer
      → org.embulk.spi.util.InputStreamFileInput#poll
        → org.embulk.input.filesplit.PartialFileInputStream#read
          → java.io.PushbackInputStream#read
org.embulk.output.jdbc.AbstractJdbcOutputPlugin$PluginPageOutput#add
→ org.embulk.spi.Column#visit
  → org.embulk.output.jdbc.setter.ColumnSetter#stringColumn
    → org.embulk.output.jdbc.setter.StringColumnSetter#stringValue
      → org.embulk.output.oracle.DirectBatchInsert#setString
        → org.embulk.output.oracle.oci.RowBuffer#addValue
          → java.nio.charset.Charset#encode

 

まとめ

以前よりはだいぶ速くなったけど、まだ目標値には差がある。

何とかあと2割くらい速くしたい!

 

Embulk 0.6.0
embulk-output-oracle 0.2.2
Oracle 12c

 

Hadoopでプロキシ経由でAmazon S3にアクセスする

Hadoopと言うか、HadoopのFileSystemクラスを利用してS3にアクセスしたいのである。

AWS SDKがあるのに、なんで?かと言うと、FileSystemを使うとローカルファイルとかHDFSとかと同じインターフェイスでアクセスできるので、汎用的なツールを作るのに便利だからだ。

 

Hadoop 2.6.0のhadoop-tools/hadoop-awsの下を探してみると、S3FileSystem(s3:)、NativeS3FileSystem(s3n:)、S3AFileSystem(s3a:)の3つのFileSystemクラスがあった。

このうちs3は特殊な用途らしいので、s3n:とs3a:について調べてみる。

 

NativeS3FileSystem(s3n:)の場合

まず、何も考えずに

Configuration configuration = new Configuration();
Path path = new Path("s3n://xxx/yyy/zzz.txt");
FileSystem fs = path.getFileSystem(configuration);
System.out.println(fs.getFileStatus(path));

とすると、

Exception in thread "main" java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
    at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:80)

というエラーが出た。

当然だが、AWSにアクセスするには、キーが必要である。

ので、キーを設定して再度実行してみる。

Configuration configuration = new Configuration();
configuration.set("fs.s3n.awsAccessKeyId", "XXXXXXXXXXXXXX");
configuration.set("fs.s3n.awsSecretAccessKey", "XXXXXXXXXXXXXX");
Path path = new Path("s3n://xxx/yyy/zzz.txt");
FileSystem fs = path.getFileSystem(configuration);
System.out.println(fs.getFileStatus(path));

今度は、

Exception in thread "main" java.net.UnknownHostException: xxx.s3.amazonaws.com

というエラーだ。プロキシの設定をしていないので、当然アクセスできない。

s3n:ではjets3tというライブラリを使っているので、そいつのプロキシ設定について調べてみる。

ググってみたところ、このサイトに説明があった。

jets3t.propertiesというファイルを作って、

httpclient.proxy-autodetect=false
httpclient.proxy-host=プロキシサーバのアドレス
httpclient.proxy-port=プロキシのポート番号
httpclient.proxy-user=プロキシのユーザ名
httpclient.proxy-password=プロキシのパスワード

のよう設定すればよい。

再度試したら、無事成功した!

(実は以前はhttpclient.proxy-autodetect=falseが無くてもOKだったのだが、変わっていたようで、はまった…)

 

S3AFileSystem(s3a:)の場合

同じように

Configuration configuration = new Configuration();
configuration.set("fs.s3a.awsAccessKeyId", "XXXXXXXXXXXXXX");
configuration.set("fs.s3a.awsSecretAccessKey", "XXXXXXXXXXXXXX");
Path path = new Path("s3a://xxx/yyy/zzz.txt");
FileSystem fs = path.getFileSystem(configuration);
System.out.println(fs.getFileStatus(path));

として実行してみると、

Exception in thread "main" com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
    at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
    at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
    at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:154)

というようなエラーが出た。

調べてみたところ、どうもキーを設定する場所が変わったようで、

configuration.set("fs.s3a.access.key", "XXXXXXXXXXXXXX");
configuration.set("fs.s3a.secret.key", "XXXXXXXXXXXXXX");

のようにするようだ。

修正して実行すると、………帰ってこないな。

まあ、プロキシの設定をしていないから、アクセスできないのは当然なんだけど。

 

ソースを見ると、s3a:ではAWS SDKを使ってアクセスするようだ。

初期化部はこんな感じ。

public void initialize(URI name, Configuration conf) throws IOException {
    super.initialize(name, conf);
    ...

    ClientConfiguration awsConf = new ClientConfiguration();
    awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS, 
      DEFAULT_MAXIMUM_CONNECTIONS));
    awsConf.setProtocol(conf.getBoolean(SECURE_CONNECTIONS, 
      DEFAULT_SECURE_CONNECTIONS) ?  Protocol.HTTPS : Protocol.HTTP);
    awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES, 
      DEFAULT_MAX_ERROR_RETRIES));
    awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT, 
      DEFAULT_SOCKET_TIMEOUT));

    s3 = new AmazonS3Client(credentials, awsConf);

AWS SDK自体はプロキシに対応していて、

    awsConf.setProxyHost(プロキシサーバのアドレス);
    awsConf.setProxyPort(プロキシのポート番号);
    awsConf.setProxyUsername(プロキシのユーザ名);
    awsConf.setProxyPassword(プロキシのパスワード);

とすればよいのだが…、この処理が入っていない…、ということは、プロキシには対応していないということだな。

 

まとめ

  • s3n:は、jets3t.propertiesというファイルにプロキシ設定を書くとアクセスできる
  • s3a:は、プロキシに対応していないようだ

 

Embulkのエラー処理について調べてみる

自分はEmbulkを業務システムに組み込むことを考えている。

となると、気になるのはエラー処理。

エラー時にどんな挙動になるかを確認しておきたい。

Embulkのプラグインの組み合わせは多数あるが、とりあえず想定しているのはCSVファイルを読み込んでRDB(例えばOracle)に突っ込む処理なので、この組み合わせで確認してみる。

 

正常終了

一応正常終了時の終了コードも確認しておく。

当然、0だった。

 

CSVファイルのエラー

列が多い行がある場合

こんなログが出力された。

2015-04-08 22:48:46.731 +0900 [WARN] (task-0000): Skipped (line 18): 17,1360,...
org.embulk.standards.CsvTokenizer$TooManyColumnsException: Too many columns
        at org.embulk.standards.CsvTokenizer.nextRecord(CsvTokenizer.java:84)
        at org.embulk.standards.CsvParserPlugin.run(CsvParserPlugin.java:146)
        at org.embulk.spi.FileInputRunner.run(FileInputRunner.java:145)
        ...
2015-04-08 22:48:46.760 +0900 [INFO] (task-0000): Loading 100 rows
2015-04-08 22:48:46.783 +0900 [INFO] (task-0000): > 0.02 seconds (loaded 100 row
s in total)
2015-04-08 22:48:46.784 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-04-08 22:48:46.808 +0900 [INFO] (main): Committed.

列の多い行はスキップされてWARNログが出力され、それ以外の行はコミットされている。

ん?よく見ると100行全部ロードされているな。

DBの中を見ると、列が多い行も入っている。

ソースを見たところでは、定義された列数分は正常な行として読み込まれ、余分な列のみがエラーとなるようだ。

終了コードを確認したところ、0だった。

 

列が少ない行がある場合

こんなログが出力された。

2015-04-08 22:55:15.677 +0900 [WARN] (task-0000): Skipped (line 18): 17,1360,...
org.embulk.standards.CsvTokenizer$TooFewColumnsException: Too few columns
        at org.embulk.standards.CsvTokenizer.nextColumn(CsvTokenizer.java:124)
        at org.embulk.standards.CsvParserPlugin.nextColumn(CsvParserPlugin.java:236)
        at org.embulk.standards.CsvParserPlugin.access$000(CsvParserPlugin.java:26)
        at org.embulk.standards.CsvParserPlugin$1.stringColumn(CsvParserPlugin.java:193)
        at org.embulk.spi.Column.visit(Column.java:57)
        at org.embulk.spi.Schema.visitColumns(Schema.java:48)
        at org.embulk.standards.CsvParserPlugin.run(CsvParserPlugin.java:150)
        at org.embulk.spi.FileInputRunner.run(FileInputRunner.java:145)
        ...
2015-04-08 22:55:15.715 +0900 [INFO] (task-0000): Loading 99 rows
2015-04-08 22:55:15.731 +0900 [INFO] (task-0000): > 0.01 seconds (loaded 99 rows
 in total)
2015-04-08 22:55:15.732 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-04-08 22:55:15.758 +0900 [INFO] (main): Committed.

こちらの場合は、列の少ない行はスキップされ、それ以外の行がコミットされた。

終了コードは、やはり0だった。

 

embulk-input-filesplitを使う場合

ふと気になって、embulk-input-filesplitを使って試してみた。

やはり、ログに出る行番号は、分割位置からの行番号になっていた。

embulk-input-filesplitでファイルを読み込み→CSV Parserでエラー という順序なので、本当の(ファイルの先頭からの)行番号を出すのは無理っぽいな…。

各タスクの開始行番号をログに出すくらいか。

2箇所ログを見なくてはいけないが、一応スキップ位置は特定できる。

 

数値列が数値でない場合

こんなログが出力された。

2015-04-08 22:59:12.985 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-04-08 22:59:13.008 +0900 [INFO] (main): Transaction partially failed. Cleaning up the intermediate data. Use -r option to make it resumable.
org.embulk.exec.PartialExecutionException: org.embulk.standards.CsvParserPlugin$CsvRecordValidateException: java.lang.NumberFormatException: For input string: "
a1360"
        at org.embulk.exec.LocalExecutor$ProcessState.buildPartialExecuteException(org/embulk/exec/LocalExecutor.java:333)
        at org.embulk.exec.LocalExecutor.doRun(org/embulk/exec/LocalExecutor.java:483)
        ...
        at org.embulk.cli.Main.main(org/embulk/cli/Main.java:13)
Caused by: org.embulk.standards.CsvParserPlugin$CsvRecordValidateException: java.lang.NumberFormatException: For input string: "a1360"
        at org.embulk.standards.CsvParserPlugin$1.longColumn(CsvParserPlugin.java:171)
        at org.embulk.spi.Column.visit(Column.java:53)
        at org.embulk.spi.Schema.visitColumns(Schema.java:48)
        at org.embulk.standards.CsvParserPlugin.run(CsvParserPlugin.java:150)
        at org.embulk.spi.FileInputRunner.run(FileInputRunner.java:145)
        ...

Error: org.embulk.standards.CsvParserPlugin$CsvRecordValidateException: java.lang.NumberFormatException: For input string: "a1360"

この場合は、例外がcatchされず、異常終了した。

残念ながら、ログを見てもエラーの行番号は分からない。

トランザクションはコミットされておらず、DBは0件だった。

未捕捉例外で終了したので、終了コードは1だった。

 

なお、embulk-output-jdbcでは、ある程度レコードがたまったらコミットする。

上は100件しかないので全件でコミットされたが、もっと件数が多い場合はどうなるんだろうか?

10万件で試したところ、エラー行の無いトランザクションまではコミットされ、エラー行を含むトランザクションロールバックされて終了、という結果になった。

DBを見ると、ログ通り、33,390件入っていた。

2015-04-08 23:11:05.483 +0900 [INFO] (task-0000): Loading 16,704 rows
2015-04-08 23:11:06.073 +0900 [INFO] (task-0000): > 0.58 seconds (loaded 16,704rows in total)
2015-04-08 23:11:06.323 +0900 [INFO] (task-0000): Loading 16,686 rows
2015-04-08 23:11:06.773 +0900 [INFO] (task-0000): > 0.45 seconds (loaded 33,390rows in total)
2015-04-08 23:11:06.793 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-04-08 23:11:06.803 +0900 [INFO] (main): Transaction partially failed. Cleaning up the intermediate data. Use -r option to make it resumable.
org.embulk.exec.PartialExecutionException: org.embulk.standards.CsvParserPlugin$CsvRecordValidateException: java.lang.NumberFormatException: For input string: "
a34750"
        at org.embulk.exec.LocalExecutor$ProcessState.buildPartialExecuteExcepti
        ...

 

DBでのエラー

embulk-output-oracleの場合は、普通にJDBCでINSERTするモード(normal)、ダイレクト・パス・インサートするモード(direct)と、ネイティブライブラリのOCI(Oracle Call Interface)経由でダイレクト・パス・インサートするモード(oci)がある。

それぞれについて確認した。

通常のINSERT(normal)

桁数オーバー

未捕捉例外により異常終了した。

2015-04-09 09:13:48.906 +0900 [INFO] (task-0000): Loading 16,704 rows
2015-04-09 09:13:49.476 +0900 [INFO] (task-0000): > 0.57 seconds (loaded 16,704rows in total)
2015-04-09 09:13:49.716 +0900 [INFO] (task-0000): Loading 16,686 rows
2015-04-09 09:13:50.116 +0900 [INFO] (task-0000): > 0.40 seconds (loaded 33,390rows in total)
2015-04-09 09:13:50.346 +0900 [INFO] (task-0000): Loading 16,686 rows
2015-04-09 09:13:50.396 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-04-09 09:13:50.416 +0900 [INFO] (main): Transaction partially failed. Cleaning up the intermediate data. Use -r option to make it resumable.
org.embulk.exec.PartialExecutionException: java.lang.RuntimeException: java.sql.BatchUpdateException: ORA-01438: この列に許容される指定精度より大きな値です

        at org.embulk.exec.LocalExecutor$ProcessState.buildPartialExecuteException(org/embulk/exec/LocalExecutor.java:333)
        at org.embulk.exec.LocalExecutor.doRun(org/embulk/exec/LocalExecutor.java:483)
        ...
        at org.embulk.cli.Main.main(org/embulk/cli/Main.java:13)
Caused by: java.lang.RuntimeException: java.sql.BatchUpdateException: ORA-01438: この列に許容される指定精度より大きな値です

        at org.embulk.output.jdbc.AbstractJdbcOutputPlugin$PluginPageOutput.add(AbstractJdbcOutputPlugin.java:584)
        at org.embulk.spi.PageBuilder.doFlush(PageBuilder.java:197)
        at org.embulk.spi.PageBuilder.flush(PageBuilder.java:203)
        at org.embulk.spi.PageBuilder.addRecord(PageBuilder.java:182)
        at org.embulk.standards.CsvParserPlugin.run(CsvParserPlugin.java:216)
        at org.embulk.spi.FileInputRunner.run(FileInputRunner.java:145)
        ...

Error: java.lang.RuntimeException: java.sql.BatchUpdateException: ORA-01438: この列に許容される指定精度より大きな値です

DBを見ると、途中まではコミットされていた。

エラーの行番号は出ていない。

たぶん、BatchUpdateException#getUpdateCountsを使えば出せそう。

 

キー重複

あらかじめ1レコード入れてから実行してみた。

桁数オーバーのときと同様、未捕捉例外により異常終了した。

 

ダイレクト・パス・インサート(direct)

normalと同じだった。

 

OCI経由でのダイレクト・パス・インサート(oci)

桁数オーバー

未捕捉例外により異常終了した。

2015-04-09 18:57:34.298 +0900 [INFO] (task-0000): Loading 26,886 rows
2015-04-09 18:57:34.471 +0900 [INFO] (task-0000): > 0.17 seconds (loaded 26,886rows in total)
2015-04-09 18:57:34.876 +0900 [INFO] (task-0000): Loading 26,886 rows
2015-04-09 18:57:34.929 +0900 [ERROR] (task-0000): OCI : OCIDirPathColArrayToStream failed. ORA-01438: value larger than specified precision allowed for this column

2015-04-09 18:57:34.930 +0900 [INFO] (task-0000): OCI : close for [localhost:1521/TESTDB, TEST_USER, EXAMPLE].
2015-04-09 18:57:34.930 +0900 [INFO] (task-0000): OCI : start to rollback.
2015-04-09 18:57:34.936 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-04-09 18:57:34.967 +0900 [INFO] (main): Transaction partially failed. Cleaning up the intermediate data. Use -r option to make it resumable.
org.embulk.exec.PartialExecutionException: java.lang.RuntimeException: java.sql.SQLException: OCI : OCIDirPathColArrayToStream failed. ORA-01438: value larger than specified precision allowed for this column

DBの件数を見ると、0件だった。

ociモードでは、途中ではコミットせず、最後にコミットまたはロールバックをするようにしているためである。

こちらもエラー行番号は出ていないが、たぶんOCIにエラー位置を取れるAPIはあると思う。

 

キー重複

これも異常終了…、と思いきや、正常終了してしまった。。

DBを見ると、確かに同一キーのレコードが入っている。

SQL> SELECT ID,NUM FROM EXAMPLE WHERE ID=34751;

        ID        NUM
---------- ----------
     34751          1
     34751       7038

ただし、この後にINSERT文を実行したりすると、エラーになってしまう。

INSERT INTO EXAMPLE VALUES(99999999,1,'1','1','1','1','1','1','1','1','1','1')
*
行1でエラーが発生しました。:
ORA-01502:
索引'TEST_USER.SYS_C009854'またはそのパーティションが使用不可の状態です。

Oracleのドキュメントを見ると、以下のような記述があった。

ダイレクト・パス・ロード時にUNIQUE制約が使用可能であっても、その制約に違反する行もロードされます。
(これは、そのような行が拒否される従来型パスとは異なります。)
UNIQUE制約は、ダイレクト・パス・ロードの最後で索引が再作成されるときに検証されます。
違反が検出されると、索引は索引使用禁止状態のままになります。
詳細は、「使用禁止状態(Index Unusable)のままの索引」を参照してください。

なお、SQL*LoaderでもAPPENDでロードすると、同じ現象が発生することがある。

 

ほしいものリスト

調べた結果、ほしくなったものをまとめてみる。

  1. 列の過不足がある場合にスキップするかエラーにするかを選べるようにしたい。
  2. 警告がある場合は終了コードを変えた方がよいかも。
  3. どの行でエラーが発生したか分かるようにしたい。
  4. OCIモードでキー重複が発生した場合に検知したい(この辺見るとできそう)。

1ができれば、2は無くてもいいかな?

大抵はTRUNCATEしてからロードするから、4もあまり必要無いかも。

3はぜひほしいけど、結構めんどいかも。。

がんばってコード書いてみるか…。

 

Embulk 0.5.4
embulk-input-filesplit 0.1.2
embulk-output-oracle 0.2.2