JDynamo
Java用 の Dynamo ライブラリです。 Java8 以上が必要です。
このツールでできること
- put, update, query, scan, batchPut などの一通りの処理
- Fluent interface による直感的なコード
- メタ情報を利用した、Java Class とのマッピング処理(DynamoDBMapper を使わなくて済む)
- Exponential Backoff Algorithm による各処理時のリトライ機構
- 開発環境、本番環境などに応じてテーブル名にprefixを自動で付けてくれる機能
- 文字列を圧縮してDynamoに格納する機能
コードサンプル
// パーティーションキーを指定してクエリを実行
List<Message> list = client.query(table).key("ABC").getAsList();
// フィルタ処理、limitを指定
client.query(table).key("ABC").filter(table.readFlg.eq(true)).limit(20).getAsListDesc();
// 並列スキャン処理を実行
client.scan(table).executeSync(5, (messages, scanContext) -> {
for (Message m : messages) {
log.info(m.toString());
}
});
使い方
Maven の場合は以下の dependency に追加してください(versionは最新に合わせて)。
<dependencies>
<dependency>
<groupId>jp.co.bizreach</groupId>
<artifactId>jdynamo</artifactId>
<version>0.0.8</version>
</dependency>
</dependencies>
Gradle の場合は以下で。
dependencies {
compile 'jp.co.bizreach:jdynamo:0.0.8'
}
事前準備
まず、Dynamo のテーブル定義に応じて2つのクラスを作成する必要があります。
- テーブルの1レコードに対応するJavaBeanクラス
- テーブルメタ情報(インデックスや属性名など)用のクラス
ここでは例として、以下のテーブルを想定します。
- パーティーションキーが文字列型、ソートキーが数値型
まず、JavaBean クラスを以下のように定義します。
例では Lombok を使っていますが、使わなくても可
@Data
@EqualsAndHashCode(callSuper = false)
@NoArgsConstructor
public class Message {
@PartitionKey
private String messageThreadId;
@SortKey
private Long messageId;
private String text;
}
こんな感じで、POJO で作成してDynamoのキー属性および任意属性をフィールドとして定義します(キー属性は必須)。
パーティーションキーとソートキーには、@PartitionKey, @SortKey のアノテーションを付けます。
フィールドの型は、今のところ以下に対応しています。
Java データ型 | Dynamo データ型 | Dynamo データ型(略記) |
---|---|---|
String | 文字列 | S |
Integer | 数値 | N |
Long | 数値 | N |
BigDecimal | 文字列 | S |
Boolean | Boolean | BOOL |
LocalDate | 数値 | N |
LocalDateTime | 数値 | N |
Set<Integer> | 数値セット | NN |
Set<Long> | 数値セット | NN |
List<Long> | リスト | L |
今後も準備追加予定です。
次に、テーブル定義用クラスを作成します。
@Table(baseName = "message")
public class MessageTable extends DynamoMetaTable<Message> {
private static final MessageTable instance = new MessageTable();
public static MessageTable as() {
return instance;
}
//--------------------------------------------------------------- Field
public DynamoConditionAttribute messageThreadId = createStringAttribute("messageThreadId");
public DynamoConditionAttribute messageId = createLongAttribute("messageId");
public DynamoConditionAttribute text = createStringAttribute("text");
}
DynamoMetaTable を継承します。このとき Generics の型は、Beanクラスに合わせます。
@Table のアノテーションも付けてください。 ちなみにここで定義したテーブル名がDynamoテーブル名となりますが 実際には環境に応じてprefixが付きます。
例えば、
- ローカル環境だと「local_message」
- 本番環境だと「rel_message」
のように、です。
あとは各属性毎にフィールドを定義します。 ここちょっと面倒ですがいずれもう少しスマートにする予定です!
これで、下準備は完了です。
使い方
まずは、DynamoClient を初期化します。
AWSStaticCredentialsProvider credentials = new AWSStaticCredentialsProvider(
new BasicAWSCredentials("xxx", "yyy"));
AmazonDynamoDBClient dynamoClient = new AmazonDynamoDBClient(
credentials, new ClientConfiguration());
// 動作環境に応じて DynamoClient を初期化
// AWSに繋ぐ場合
dynamoClient.withRegion(Regions.AP_NORTHEAST_1);
DynamoClient client = new DynamoClient(dynamoClient, DynamoAppEnvirionment.AWS);
client.setTableNameResolver(new DynamoTableNameResolver.TableNamePrefixResolver("rel_"));
// DynamoLocalに繋ぐ場合
dynamoClient.withEndpoint("http://localhost:8000");
DynamoClient client = new DynamoClient(dynamoClient, DynamoAppEnvirionment.LOCAL);
client.setTableNameResolver(new DynamoTableNameResolver.TableNamePrefixResolver("local_"));
先ほど説明したテーブル名のprefix処理をするのが、最後の TableNameResolver になります。
putItem
いわゆるINSERT処理です。
MessageTable table = MessageTable.as();
Message message = new Message();
message.setMessageThreadId("ABC"); // キー情報は必須
message.setMessageId(1L); // キー情報は必須
message.setXXX(); // その他の属性は任意で
client.put(table, message);
updateItem
UPDATE処理です。
client.updateChain(table, new DynamoKey("ABC", 10L))
.set(table.text.set("updated"))
.set(table.readFlg.set(false))
.set(table.count.add(12)) // 数値のインクリメント
.set(table.messageIds.append(123L)) // LongList へ数値を追加
.set(table.subjects.append("abcde")) // StringSet へ文字列を追加
.set(table.numbers.delete(123L)) // LongSet から数値を削除
.set(table.targetDate.setIfNotExists(LocalDate.of(2017, 7, 1))) // 属性が存在しなかった場合のみ値を設定
.execute();
put とちょっと書き方が違います。 updateChain の引数にテーブルと更新するキーをそれぞれ指定し、チェーンで更新値をセットして最後に execute() を呼び出します。
指定したキーのレコードが無かった場合、新規でレコードを作成します(DynamoDB の仕様)。
レコードの既存属性値はそのまま残ります。 属性を削除したい場合は、clear() を使ってください。
chain.set(table.text.set(null)); // NG。set(null)は無視する仕様です
chain.set(table.text.clear()); // OK
条件付き UPDATE もできます。
chain
.set(table.text.set(null))
.condition(table.text.eq("abcde")) // text 属性 = "abcde" のときのみ、更新を実行
getItem
単一キー指定による1レコード取得処理。
Message message = client.get(table, "ABC", 10L);
batchGetItem
複数キー指定による複数レコード取得処理。
List<DynamoKey> keys = new ArrayList<>();
IntStream.range(1, 10).forEach(i -> {
keys.add(new DynamoKey("ABC" + i, 10L * i));
});
List<Message> messages = client.batchGet(table, keys);
query
キーを指定してのクエリ処理。 パーティーションキーは必須、ソートキーは省略可能です。
// パーティーションキー = "ABC" でクエリ実行
List<Message> messages = client.query(table).key("ABC").getAsList();
// パーティーションキー = "ABC"、ソートキー BETWEEN 10 AND 16 でクエリ実行。取得結果はソートキーの降順
List<Message> messages = client.query(table).keyRange("ABC", 10L, 16L).getAsListDesc();
query filter, limit
クエリ時にフィルタやLIMITを指定できます。
// フィルタ条件「readFlg = true AND applyCount = 3」、最大20レコードまで取得
client.query(table).key("ABC")
.filter(table.readFlg.eq(true))
.filter(table.appltCouny.eq(3))
.limit(20).getAsListDesc();
// readFlg 属性が存在するレコードでのフィルタ
client.query(table).key("ABC")
.filter(table.readFlg.exists())
.getAsListDesc();
query large set
Dynamo の仕様上、一回の Query で取得できるのは最大1MBとなります。 これ以上データを取得したい場合、getAsList() を複数回呼び出します。
List<Message> results = query.getAsList();
while (query.isMoreRecords()) {
results.addAll(query.getAsList());
}
scan
テーブルの一括スキャン処理。
DynamoScanResult scanResult = client.scan(table).executeSync(5, (messages, scanContext) -> {
for (Message m : messages) {
log.info(m.toString());
}
});
第1引数には並列スキャン時のスレッド数、Closure 内にはレコード取得時の処理を記述します。
レコード数が多い場合 Closure は複数回呼び出されます。 スキャンが全て完了するまで executeSync の呼び出しはブロックされます。
スキャン時に例外が発生すると、同じ Closure が呼び出されます。その際、第二引数のscanContext に発生した例外が格納されています。
DynamoScanResult scanResult = client.scan(table).executeSync(5, (messages, scanContext) -> {
if (scanContext.isError()) {
Throwable cause = scanContext.getCauseError();
// ここに例外処理を記述。ほとんどの場合、cause はスループット超過などのException
} else {
for (Message m : messages) {
log.info(m.toString());
}
}
});
また、Closure 内で発生した例外はライブラリで吸収しているので、呼び出し後の ScanResult インスタンスより参照できます。
DynamoScanResult scanResult = client.scan(table).executeSync(5, (messages, scanContext) -> {
// ここの処理内で例外が発生するパターン
});
List<Exception> exceptions = scanResult.getOccurExceptions(); // ここで例外を補足できる
グローバルセカンダリインデックスの使用
これまでの例は全てメインインデックスを使った処理でしたが、 グローバルセカンダリインデックスにも対応しています。
現状、ローカルセカンダリインデックスには未対応
まず、インデックス一つにつきテーブル定義クラスにフィールドを追加します。
public class MessageTable extends DynamoTable<Message> {
...
//--------------------------------------------------------------- Field
public DynamoConditionAttribute targetDate = createDateAttribute("targetDate");
public DynamoConditionAttribute targetTime = createDateTimeAttribute("targetTime");
//--------------------------------------------------------------- Index
public DynamoIndex indexDate = DynamoIndex.create(this, "indexDate", "targetDate");
public DynamoIndex indexDateId = DynamoIndex.create(this, "indexDateId", "targetDate", "messageId");
...
}
上の例では二つインデックスを定義しています。
- "indexDate" という名前のインデックス。パーティーションキーは "targetDate"
- "indexDateId" という名前のインデックス。パーティーションキーは "targetDate"、ソートキーは "messageId"
あとは、クエリを投げるときに以下のようにすればOKです。
client.query(table, table.indexDate).key(LocalDate.of(2016, 7, 7)).getAsListDesc();
このように、query の第二引数に使用するインデックスを指定します。 これだけでOKですが、その際に指定できるキーはインデックスに対応する型である必要があります。 ※ 上の例では LocalDate
Dynamo 属性名の指定
デフォルトでは、テーブル定義クラスのフィールド名がそのままDynamoの属性名になります。 これを変えたい場合は、以下のようにします。
public class MessageTable extends DynamoTable<Message> {
...
//--------------------------------------------------------------- Field
public DynamoConditionAttribute longAttrName = createStringAttribute("longAttrName", "lan");
...
}
このようにすると、Dynamo の属性名は "lan"、Java のフィールド名は longAttrName となります。
その他
リトライ機構
クエリ・スキャンなど各処理の実行時、スループット超過があったときに処理をリトライする機構を用意しています。
client.query(table).key("ABC").withAdjustThroughput().getAsList();
client.scan(table).withAdjustThroughput().executeSync(...);
client.putWithAdjustThroughput(table, record);
このように、チェーン中に withAdjustThroughput() を通すとリトライ機構が発動します。
リトライは Exponential Backoff 形式で行われます。
sleep (ms) = random_between(100, 100 ** attempt)
これによりスループット超過エラーが発生する可能性が減りますが、その分処理に時間が掛かることがあるので それを考慮した上で利用してください。
また、BatchGet, BatchPut にも同じくこの機構がありますが Dynamo の仕様上、この機構を使わずにコマンドを発行した場合にはアイテムの一部がGET/PUTされない可能性があります。 確実に GET/PUT したい場合は、リトライ機構を使ってください。
文字列の圧縮
以下の記述をすることで、文字列を圧縮してDynamoに格納します。
JavaBean側
@Compress
private String compressStr;
メタクラス側
public DynamoConditionAttribute compressStr = createCompressStringAttribute("compressStr");
これにより、文字列をGZIP圧縮してDynamoのBinary型に格納するようになります。
開発ツール
テーブル定義クラスに応じたテーブル(およびインデックス)のCREATE/DELETE機能を提供しています。 この機能はローカル環境でのみ使えるようになっています。
client.useAdminClient().deleteTableByLocal(table);
client.useAdminClient().createTableByLocal(table);
開発方法
こちらの開発環境は以下の通りです。
- OS X El Capitan
- IntelliJ IDEA ULTIMATE 2016.1
- Gradle 3.1
- Maven 3.0.5 (IntelliJ Bundled)
- Groovy 2.4.7, Spock (For Test)
コンパイル、テスト
動作確認、テスト時にはローカルでDynamoLocalを起動しておきます。 ユニットテストは 18000 番ポートに繋ぎにいくようになってますので注意してください。
Docker 環境を持ってる人は、docker-compose 用のymlを用意してあるので以下のコマンドを叩けばDynamoLocalが18000番で起動します。
docker-compose -f docker-compose-for-test.yml up -d
カバレッジ計測
Jacoco を利用してテストカバレッジを計測できます。
Gradle ファイルを用意しています。 ただし設定ファイルの名前(build.gradle)を変更してあるので、実行は以下のようにしてください。
./gradlew -b build.gradle.local test --tests jp.co.bizreach.jdynamo.local.* && gradle -b build.gradle.local jacocoTestReport
上記はローカルの DynamoLocal のみを使ってテストを行いますが 正確なカバレッジを測るためには実際の AWS DynamoDB に繋ぐ必要があります。
- /tmp/accesskey
- /tmp/secretkey
に、AWSのキー情報を格納して以下を実行してください。
./gradlew -b build.gradle.local test --tests jp.co.bizreach.jdynamo.* && gradle -b build.gradle.local jacocoTestReport
- テスト結果は build/reports/tests/index.html で見ることができます
- カバレッジ結果は build/reports/jacoco/test/html/index.html で見ることができます
コード品質を確認する
Checkstyle, PMD, Findbugs, JDepend によるコードチェックができます。
./gradlew -b build.gradle.local check -x test
チェック結果は build/reports/ 以下に格納されます。
Checkstyle の設定ファイルは config/checkstyle/checkstyle.xml にあります。 その他のチェックはデフォルトの設定をそのまま使っています。
コード品質を確認する(SonarQube)
Docker でローカルに SonarQube を立てて各種Qualityを確認するには以下のようにします。
docker run -d --name sonarqube -p 9000:9000 -p 9092:9092 sonarqube
./gradlew -b build.gradle.local sonar -x test
open http://192.168.99.100:9000
Javadoc を生成する
普通に javadoc 作ろうとすると Lombok のところがうまくいかないので delombok を使っています。
./gradlew -b build.gradle.local generateJavadoc && open build/docs/javadoc/index.html
Deploy to Maven Repository
デプロイ先は oss.sonatype.org です。
mvn deploy -DskipTests=true
レポート
テストレポートなどは以下に載せています。