今日もプログラミング

IT技術とかプログラミングのこととか特にJavaを中心に書いていきます

EmbulkでRedshiftにデータをロードしてみる

今回は、EmbulkRedshiftにデータをロードするのを試してみたい。

 

マシンの準備

まずはRedshiftのインスタンスを準備する。

とりあえずは動作確認なので、一番安いdw2.largeにした。

 

うちの社内からデータをロードしようとすると、ユーザ認証が必要なプロキシを越えなきゃいけないので、めんどい。

そこで、EC2のインスタンスを立ち上げ、そこからロードすることにした。

 

テーブルの準備

JDBCドライバをダウンロードし、Redshiftのセキュリティ設定をして、SQLクライアントから接続する。

そこそこいろいろな型があるテーブルを作ってみた。

create table test1 (
    id             char(4),
    varchar_item   varchar(20),
    integer_item   int,
    numeric_item   numeric(10,2),
    date_item      date,
    timestamp_item timestamp,
    primary key (id)
);

 

なお、テーブル名とかカラム名は小文字にするのがよいみたいだ。

というか、大文字にしても自動的に小文字に変換されてしまう。

ので、大文字でEmbulkを実行すると、テーブルのメタデータが取得できなくて失敗してしまう。

 

Embulkを使わないデータのロード

まずは普通にINSERT文を実行してみる。

insert into test1 values('A001', 'あいうえお', 123, 123456.78, '2015-05-20', '2015-05-20 12:01:02');
insert into test1 values('A002', '12345678901234567890', 123456789, 12345678.12, '2015-12-31', '2015-12-31 23:59:59');
insert into test1 values('A003', null, null, null, null, null);

うん、無事成功した。

 

また、Redshiftでは、copyコマンドによりS3上のファイルをロードできるらしい。

このようなファイルをS3に上げて、

A001,あいうえお,123,123456.78,2015-05-20,2015-05-20 12:01:02
A002,12345678901234567890,123456789,12345678.12,2015-12-31,2015-12-31 23:59:59
A003,,,,,

SQLクライアントでcopyを実行する。

copy test1
from 's3://xxx/data/data1.csv'
credentials 'aws_access_key_id=XXXXXXXX;aws_secret_access_key=XXXXXXXXXXXXXXXX'
csv;

これも無事成功した。

 

embulk-output-redshiftでロードしてみる

Embulkには、embulk-output-redshiftというRedshift用のプラグインがあるので、これを使ってみる。

embulk-output-redshiftは、一旦S3にファイルをアップロードしてから、copyでRedshiftにロードする仕組みらしい。

ので、ymlファイルにはAWSのキーも記述する必要がある。

in:
  type: file
  path_prefix: 'data/data1.csv'
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    columns:
    - {name: id, type: string}
    - {name: varchar_item, type: string}
    - {name: integer_item, type: long}
    - {name: NUMERIC_ITEM, type: string}
    - {name: date_item, type: timestamp, format: '%Y-%m-%d'}
    - {name: timestamp_item, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
out:
    type: redshift
    host: xxxxxxxx.us-east-1.redshift.amazonaws.com
    database: dev
    user: xxxxxxxx
    password: xxxxxxxx
    table: test1
    mode: insert_direct
    access_key_id: XXXXXXXX
    secret_access_key: XXXXXXXXXXXXXXXX
    s3_bucket: xxx

で、実行してみたのだが、エラーだ。。

iam_user_nameは必須のようだ。

そこで、IAMユーザを準備する。

よく分からないが、とりあえずS3とRedshiftのアクセス権限は必要だろう。

in:
  type: file
  path_prefix: 'data/data1.csv'
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    columns:
    - {name: id, type: string}
    - {name: varchar_item, type: string}
    - {name: integer_item, type: long}
    - {name: NUMERIC_ITEM, type: string}
    - {name: date_item, type: timestamp, format: '%Y-%m-%d'}
    - {name: timestamp_item, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
out:
    type: redshift
    host: xxxxxxxx.us-east-1.redshift.amazonaws.com
    database: dev
    user: xxxxxxxx
    password: xxxxxxxx
    table: test1
    mode: insert_direct
    iam_user_name: redshift
    access_key_id: XXXXXXXX
    secret_access_key: XXXXXXXXXXXXXXXX
    s3_bucket: xxx

で、もう一度実行してみたのだが、またエラーだ。。

com.amazonaws.AmazonServiceException:
 User: arn:aws:iam::xxxxxxxx:user/redshift is not authorized to perform: 
 sts:GetFederationToken on resource: arn:aws:sts::xxxxxxxx:federated-user/redshift
 (Service: AWSSecurityTokenService; Status Code: 403; Error Code: AccessDenied; Request ID: xxxxxxxx)

どうやらIAMユーザの権限が足りないようだ。

いろいろ調べた結果、実はRedshiftの権限は不要(AWSAPIは使わずJDBCドライバでアクセスするだけなので)だったりして、結局以下のようなPolicyを定義した。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject",
        "s3:ListBucket",
        "sts:GetFederationToken"
      ],
      "Resource": "*"
    }
  ]
}

再度実行したら…、今度は成功だ!

 

embulk-output-jdbcでロードしてみる

embulk-output-redshiftはcopyを使うが、普通にinsertでもロードできるのではなかろうか?

と思って、embulk-output-jdbcも試してみることにした。

embulk-output-jdbcの場合は、JDBCドライバのパスやドライバクラス名、URLを全て記述する必要がある。

in:
  type: file
  path_prefix: 'data/data1.csv'
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    columns:
    - {name: id, type: string}
    - {name: varchar_item, type: string}
    - {name: integer_item, type: long}
    - {name: numeric_item, type: string}
    - {name: date_item, type: timestamp, format: '%Y-%m-%d'}
    - {name: timestamp_item, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
out:
    type: jdbc
    driver_path: driver/RedshiftJDBC41.jar
    driver_class: com.amazon.redshift.jdbc41.Driver
    url: jdbc:redshift://xxxxxxxx.us-east-1.redshift.amazonaws.com:5439/dev
    database: dev
    user: xxxxxxxx
    password: xxxxxxxx
    table: test1
    mode: insert_direct

で、実行してみると…、

2015-05-21 02:30:27.421 +0000 [INFO] (task-0000): Prepared SQL: INSERT INTO "test1" ("id", "varchar_item", "integer_item", "numeric_item", "date_item", "timesta
mp_item") VALUES (?, ?, ?, ?, ?, ?)
2015-05-21 02:30:27.733 +0000 [INFO] (task-0000): Loading 3 rows

…止まってるな…。

 

いろいろ調べたところ、どうもデータに日本語が含まれてると止まるようだ。

また、PreparedStatement#addBatchを使わずにPreparedStatement#executeUpdateで1件ずつ更新しようとすると、以下のようなエラーが出た。

Caused by: java.sql.SQLException: [Amazon](500310) Invalid operation: String contains invalid or unsupported UTF8 codepoints. Bad UTF8 hex sequence: 82 (error 3);
        at com.amazon.redshift.client.messages.inbound.ErrorResponse.toErrorException(Unknown Source)
        ...
        at com.amazon.jdbc.common.SPreparedStatement.executeUpdate(Unknown Source)

82?

"あ"はUTF8だとE38182だが…、

シフトJISだと82A0だ。

ひょっとして、UTF8ではなく、WindowsデフォルトのシフトJISエンコーディングされている?

試しに、エンコーディングにUTF8を指定して実行してみるか。

embulk -J-Dfile.encoding=UTF-8 run jdbc-redshift.yml

のようにすればよいはずだが、Windows版はまだ-Jオプションに対応していないようだ。

仕方ないので、以下のようにしてみた。

java -Dfile.encoding=UTF-8 -jar embulk.bat run jdbc-redshift.yml

ようやく成功した!

※なんかRedshiftのJDBCドライバのバグっぽい気がするなあ。もう少し調べるか…

 

embulk-output-postgresqlではロードできなかった

RedshiftはPostgreSQLベースで作られているらしいので、embulk-output-postgresqlも試してみることにした。

PostgreSQLのデフォルトポート番号は5432だが、Redshiftは5439なので、ポート番号は明示的に指定する必要がある。

in:
  type: file
  path_prefix: 'data/data1.csv'
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    columns:
    - {name: id, type: string}
    - {name: varchar_item, type: string}
    - {name: integer_item, type: long}
    - {name: NUMERIC_ITEM, type: string}
    - {name: date_item, type: timestamp, format: '%Y-%m-%d'}
    - {name: timestamp_item, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
out:
    type: postgresql
    host: xxxxxxxx.us-east-1.redshift.amazonaws.com
    port: 5439
    database: dev
    user: xxxxxxxx
    password: xxxxxxxx
    table: test1
    mode: insert_direct

で、実行してみると、エラーだ…。

2015-05-28 05:59:49.431 +0000 [INFO] (task-0000): Copy SQL: COPY "test1" ("id", "varchar_item", "integer_item", "date_item", "timestamp_item") FROM STDIN
2015-05-28 05:59:49.556 +0000 [INFO] (task-0000): Loading 3 rows (144 bytes)
2015-05-28 05:59:50.711 +0000 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-05-28 05:59:50.726 +0000 [INFO] (main): Transaction partially failed. Cleaning up the intermediate data. Use -r option to make it resumable.
org.embulk.exec.PartialExecutionException: java.lang.RuntimeException: org.postgresql.util.PSQLException: ERROR: LOAD source is not supported. (Hint: only S3 or DynamoDB or EMR based load is allowed)
        at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(org/embulk/exec/BulkLoader.java:331)
        at org.embulk.exec.BulkLoader.doRun(org/embulk/exec/BulkLoader.java:526)
        ...

embulk-output-postgresqlではCOPY ... FROM STDINによりデータをロードするが、どうやらRedshiftはサポートしていないようだ。

残念。

 

まとめ

Redshiftには、embulk-output-redshiftはもちろんのこと、embulk-output-jdbcでもロードすることができた。

 

次回はこれらのパフォーマンス等を比較してみたい。

 

OS Windows Server 2008 R2
Embulk 0.6.8
embulk-output-jdbc 0.3.0
embulk-output-redshift 0.3.0
embulk-output-postgresql 0.3.0

 

2015/5/28: embulk-output-postgresqlでの実行結果を追記しました。