Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,60 @@ class Scratch {
}
```

#### Using custom sharding function

A custom sharding function can be used to determine the bucket number - location in the cluster - and used further in the cluster operations.
For this purpose you need:
1) a hash function
As an example, a default function from tarantool/vshard - [crc32](https://www.tarantool.io/en/doc/latest/reference/reference_lua/digest/#lua-function.digest.crc32) with specific polynomial value.
Java doesn't have crc32 out of the box with the ability to pass a polynomial value, so we'll implement our own:
```java
private static long crc32(byte[] data) {
BitSet bitSet = BitSet.valueOf(data);
int crc32 = 0xFFFFFFFF; // initial value
for (int i = 0; i < data.length * 8; i++) {
if (((crc32 >>> 31) & 1) != (bitSet.get(i) ? 1 : 0)) {
crc32 = (crc32 << 1) ^ 0x1EDC6F41; // xor with polynomial
} else {
crc32 = crc32 << 1;
}
}
crc32 = Integer.reverse(crc32); // result reflect
return crc32 & 0x00000000ffffffffL; // the unsigned java problem
}
```
2) the number of buckets
This number can be obtained from Tarantool via `vshard.router.bucket_count` function out of [vshard module](https://github.com/tarantool/vshard)
```java
public static <T extends Packable, R extends Collection<T>> Integer getBucketCount(
TarantoolClient<T, R> client) throws ExecutionException, InterruptedException {
if (!bucketCount.isPresent()) {
bucketCount = Optional.ofNullable(
client.callForSingleResult("vshard.router.bucket_count", Integer.class).get()
);
}
bucketCount.orElseThrow(() -> new TarantoolClientException("Failed to get bucket count"));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return is missing

}
```

Then we can determine bucket id by passing your key through hash function and get the remainder of the division by number of buckets:
```java
TarantoolTuple tarantoolTuple = tupleFactory.create(1, null, "FIO", 50, 100);
byte[] key = getBytesFromList(Arrays.asList(tarantoolTuple.getInteger(0), tarantoolTuple.getInteger(2)));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be good to place a link to this getBytesFromList method in the tests to the comment above, because it is not a built-in and therefore produces questions

Integer bucketId = (crc32(key) % getBucketCount(client)) + 1;
```

After that we may apply it in operations:
```java
InsertOptions insertOptions = ProxyInsertOptions.create().withBucketId(bucketId);
insertResult = profileSpace.insert(tarantoolTuple, insertOptions).get();

ProxySelectOptions selectOptions = ProxySelectOptions.create().withBucketId(bucketId);
selectResult = profileSpace.select(condition, selectOptions).get();
```

You can see the sources of this example in the [tests](src/test/java/io/tarantool/driver/integration/proxy/options/ProxySpaceInsertOptionsIT.java)

### Retrying Tarantool client

For the cases of reliable communication with a Cartridge cluster under heavy load or in a case of some failure causing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,32 @@ public interface TarantoolMetadataOperations {

/**
* Refresh metadata cache
*
* @return future with empty value for tracking the refresh progress
* @throws TarantoolClientException if fetching data failed with error
*/
CompletableFuture<Void> refresh() throws TarantoolClientException;

/**
* Get metadata for the space specified by name
*
* @param spaceName the space name, must not be null or empty
* @return nullable space metadata wrapped in {@link Optional}
*/
Optional<TarantoolSpaceMetadata> getSpaceByName(String spaceName);

/**
* Get metadata for index from the specified space by name
* @param spaceId the space ID, must be greater than 0
*
* @param spaceId the space ID, must be greater than 0
* @param indexName index name, must not be null or empty
* @return nullable index metadata wrapped in {@link Optional}
*/
Optional<TarantoolIndexMetadata> getIndexByName(int spaceId, String indexName);

/**
* Get metadata for index from the specified space by name
*
* @param spaceName the space name, must not be null or empty
* @param indexName index name, must not be null or empty
* @return nullable index metadata wrapped in {@link Optional}
Expand All @@ -49,14 +53,16 @@ public interface TarantoolMetadataOperations {

/**
* Get metadata for index from the specified space by index ID
*
* @param spaceName the space name, must not be null or empty
* @param indexId index ID, must not be must be greater or equal than 0
* @param indexId index ID, must not be must be greater or equal than 0
* @return nullable index metadata wrapped in {@link Optional}
*/
Optional<TarantoolIndexMetadata> getIndexById(String spaceName, int indexId);

/**
* Get metadata for index from the specified space by index ID
*
* @param spaceId the space ID, must be greater than 0
* @param indexId index ID, must not be must be greater or equal than 0
* @return nullable index metadata wrapped in {@link Optional}
Expand All @@ -65,20 +71,23 @@ public interface TarantoolMetadataOperations {

/**
* Get metadata for the space specified by id
*
* @param spaceId the space ID, must be greater than 0
* @return nullable space metadata wrapped in {@link Optional}
*/
Optional<TarantoolSpaceMetadata> getSpaceById(int spaceId);

/**
* Get metadata for all indexes for space specified by id
*
* @param spaceId the space ID, must be greater than 0
* @return nullable map of index names to index metadata wrapped in {@link Optional}
*/
Optional<Map<String, TarantoolIndexMetadata>> getSpaceIndexes(int spaceId);

/**
* Get metadata for all indexes for space specified by name
*
* @param spaceName the space name, must not be null or empty
* @return nullable map of index names to index metadata wrapped in {@link Optional}
*/
Expand Down
92 changes: 85 additions & 7 deletions src/test/java/io/tarantool/driver/integration/Utils.java
Original file line number Diff line number Diff line change
@@ -1,30 +1,108 @@
package io.tarantool.driver.integration;

import io.tarantool.driver.api.TarantoolClient;
import io.tarantool.driver.api.TarantoolResult;
import io.tarantool.driver.api.conditions.Conditions;
import io.tarantool.driver.api.space.TarantoolSpaceOperations;
import io.tarantool.driver.api.tuple.TarantoolTuple;
import io.tarantool.driver.exceptions.TarantoolClientException;
import io.tarantool.driver.exceptions.TarantoolException;
import io.tarantool.driver.exceptions.TarantoolSpaceFieldNotFoundException;
import io.tarantool.driver.protocol.Packable;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.BitSet;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

/**
* @author Ivan Dneprov
* @author Artyom Dubinin
*/
public final class Utils {
private static Optional<Integer> bucketCount = Optional.empty();

private Utils() {
}

/**
* Checks if the space is empty.
*
* @param testSpace space to check
*/
static void checkSpaceIsEmpty(TarantoolSpaceOperations<TarantoolTuple,
TarantoolResult<TarantoolTuple>> testSpace) {
* Checks if the space is empty.
*
* @param testSpace space to check
*/
static void checkSpaceIsEmpty(TarantoolSpaceOperations<TarantoolTuple, TarantoolResult<TarantoolTuple>> testSpace) {
assertEquals(0, testSpace.select(Conditions.any()).thenApply(List::size).join());
}

/**
* Get number of buckets in vshard cluster.
*
* @param client Tarantool client for with access to vshard router
* @param <T> target tuple type
* @param <R> target tuple collection type
* @return number of buckets
*/
public static <T extends Packable, R extends Collection<T>> Integer getBucketCount(
TarantoolClient<T, R> client) throws ExecutionException, InterruptedException {
if (!bucketCount.isPresent()) {
bucketCount = Optional.ofNullable(
client.callForSingleResult("vshard.router.bucket_count", Integer.class).get()
);
}
return bucketCount.orElseThrow(() -> new TarantoolClientException("Failed to get bucket count"));
}

/**
* Get bucket_id via crc32 hash function.
* You can't use null, because null is packed to box.NULL((void *) 0) and java doesn't have equivalent.
*
* @param client Tarantool client for with access to vshard router
* @param key key that will be used to calculate bucketId
* @param <T> target tuple type
* @param <R> target tuple collection type
* @return bucketId number determining the location in the cluster
*/
public static <T extends Packable, R extends Collection<T>> Integer getBucketIdStrCRC32(
TarantoolClient<T, R> client, List<Object> key) throws ExecutionException, InterruptedException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (Object part : key) {
try {
if (part != null) {
outputStream.write(part.toString().getBytes());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return Math.toIntExact(
(crc32(outputStream.toByteArray()) % getBucketCount(client)) + 1
);
}

/**
* Implementation of crc32 partially was taken from
* <a href="https://github.com/TheAlgorithms/Java/blob/master/src/main/java/com/thealgorithms/others/CRC32.java">
* github.com/TheAlgorithms</a>
*
* @param data input bytes array
* @return hash response in decimal view
*/
private static long crc32(byte[] data) {
BitSet bitSet = BitSet.valueOf(data);
int crc32 = 0xFFFFFFFF; // initial value
for (int i = 0; i < data.length * 8; i++) {
if (((crc32 >>> 31) & 1) != (bitSet.get(i) ? 1 : 0)) {
crc32 = (crc32 << 1) ^ 0x1EDC6F41; // xor with polynomial
} else {
crc32 = crc32 << 1;
}
}
crc32 = Integer.reverse(crc32); // result reflect
return crc32 & 0x00000000ffffffffL; // the unsigned java problem
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.tarantool.driver.api.conditions.Conditions;
import io.tarantool.driver.api.space.TarantoolSpaceOperations;
import io.tarantool.driver.api.space.options.InsertOptions;
import io.tarantool.driver.api.space.options.SelectOptions;
import io.tarantool.driver.api.space.options.proxy.ProxySelectOptions;
import io.tarantool.driver.api.tuple.DefaultTarantoolTupleFactory;
import io.tarantool.driver.api.tuple.TarantoolTuple;
Expand All @@ -14,14 +15,15 @@
import io.tarantool.driver.core.ClusterTarantoolTupleClient;
import io.tarantool.driver.core.ProxyTarantoolTupleClient;
import io.tarantool.driver.api.space.options.proxy.ProxyInsertOptions;
import io.tarantool.driver.exceptions.TarantoolConnectionException;
import io.tarantool.driver.exceptions.TarantoolInternalException;
import io.tarantool.driver.integration.SharedCartridgeContainer;
import io.tarantool.driver.integration.Utils;
import io.tarantool.driver.mappers.DefaultMessagePackMapperFactory;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -135,13 +137,56 @@ public void withBucketIdTest() throws ExecutionException, InterruptedException {
}

@Test
public void withBucketIdMoreThanLimitTest() throws ExecutionException, InterruptedException {
public void withBucketIdFromClientTest() throws ExecutionException, InterruptedException {
TarantoolSpaceOperations<TarantoolTuple, TarantoolResult<TarantoolTuple>> profileSpace =
client.space(TEST_SPACE_NAME);

TarantoolTuple tarantoolTuple = tupleFactory.create(1, null, "FIO", 50, 100);
Conditions condition = Conditions.equals(PK_FIELD_NAME, 1);

Integer bucketId = Utils.getBucketIdStrCRC32(client,
Collections.singletonList(tarantoolTuple.getInteger(0)));
InsertOptions insertOptions = ProxyInsertOptions.create().withBucketId(bucketId);

TarantoolResult<TarantoolTuple> insertResult = profileSpace.insert(tarantoolTuple, insertOptions).get();
assertEquals(1, insertResult.size());

TarantoolResult<TarantoolTuple> selectResult = profileSpace.select(condition).get();
assertEquals(1, selectResult.size());

SelectOptions selectOptions = ProxySelectOptions.create().withBucketId(bucketId);
selectResult = profileSpace.select(condition, selectOptions).get();
assertEquals(1, selectResult.size());
}

private Integer getBucketIdFromTarantool(List<Object> key) throws ExecutionException, InterruptedException {
return client.callForSingleResult(
"vshard.router.bucket_id_strcrc32",
Collections.singletonList(key),
Integer.class
).get();
}

@Test
public void withBucketIdClientComputationTest() throws ExecutionException, InterruptedException {
List<List<Object>> keys = Arrays.asList(
Collections.singletonList(1),
Arrays.asList(1, "FIO"),
Arrays.asList(1, true, "FIO", 'm', 100.123)
);

for (List<Object> key : keys) {
assertEquals(Utils.getBucketIdStrCRC32(client, key), getBucketIdFromTarantool(key));
}
}

@Test
public void withBucketIdMoreThanLimitTest() throws ExecutionException, InterruptedException {
TarantoolSpaceOperations<TarantoolTuple, TarantoolResult<TarantoolTuple>> profileSpace =
client.space(TEST_SPACE_NAME);

TarantoolTuple tarantoolTuple = tupleFactory.create(1, null, "FIO", 50, 100);

Integer bucketsCount = client.callForSingleResult("vshard.router.bucket_count", Integer.class).get();
InsertOptions insertOptions = ProxyInsertOptions.create().withBucketId(bucketsCount * 2);

Expand Down