diff --git a/README.md b/README.md index 80a709c18..f5d0c16d7 100644 --- a/README.md +++ b/README.md @@ -244,6 +244,60 @@ class Scratch { } ``` +#### Using custom sharding function + +A custom sharding function can be used to determine the bucket number - location in the cluster - and used further in the cluster operations. +For this purpose you need: +1) a hash function + As an example, a default function from tarantool/vshard - [crc32](https://www.tarantool.io/en/doc/latest/reference/reference_lua/digest/#lua-function.digest.crc32) with specific polynomial value. + Java doesn't have crc32 out of the box with the ability to pass a polynomial value, so we'll implement our own: + ```java + private static long crc32(byte[] data) { + BitSet bitSet = BitSet.valueOf(data); + int crc32 = 0xFFFFFFFF; // initial value + for (int i = 0; i < data.length * 8; i++) { + if (((crc32 >>> 31) & 1) != (bitSet.get(i) ? 1 : 0)) { + crc32 = (crc32 << 1) ^ 0x1EDC6F41; // xor with polynomial + } else { + crc32 = crc32 << 1; + } + } + crc32 = Integer.reverse(crc32); // result reflect + return crc32 & 0x00000000ffffffffL; // the unsigned java problem + } + ``` +2) the number of buckets + This number can be obtained from Tarantool via `vshard.router.bucket_count` function out of [vshard module](https://github.com/tarantool/vshard) + ```java + public static > Integer getBucketCount( + TarantoolClient client) throws ExecutionException, InterruptedException { + if (!bucketCount.isPresent()) { + bucketCount = Optional.ofNullable( + client.callForSingleResult("vshard.router.bucket_count", Integer.class).get() + ); + } + bucketCount.orElseThrow(() -> new TarantoolClientException("Failed to get bucket count")); + } + ``` + +Then we can determine bucket id by passing your key through hash function and get the remainder of the division by number of buckets: +```java +TarantoolTuple tarantoolTuple = tupleFactory.create(1, null, "FIO", 50, 100); +byte[] key = getBytesFromList(Arrays.asList(tarantoolTuple.getInteger(0), tarantoolTuple.getInteger(2))); +Integer bucketId = (crc32(key) % getBucketCount(client)) + 1; +``` + +After that we may apply it in operations: +```java +InsertOptions insertOptions = ProxyInsertOptions.create().withBucketId(bucketId); +insertResult = profileSpace.insert(tarantoolTuple, insertOptions).get(); + +ProxySelectOptions selectOptions = ProxySelectOptions.create().withBucketId(bucketId); +selectResult = profileSpace.select(condition, selectOptions).get(); +``` + +You can see the sources of this example in the [tests](src/test/java/io/tarantool/driver/integration/proxy/options/ProxySpaceInsertOptionsIT.java) + ### Retrying Tarantool client For the cases of reliable communication with a Cartridge cluster under heavy load or in a case of some failure causing diff --git a/src/main/java/io/tarantool/driver/api/metadata/TarantoolMetadataOperations.java b/src/main/java/io/tarantool/driver/api/metadata/TarantoolMetadataOperations.java index cf98466e4..82825753e 100644 --- a/src/main/java/io/tarantool/driver/api/metadata/TarantoolMetadataOperations.java +++ b/src/main/java/io/tarantool/driver/api/metadata/TarantoolMetadataOperations.java @@ -19,6 +19,7 @@ public interface TarantoolMetadataOperations { /** * Refresh metadata cache + * * @return future with empty value for tracking the refresh progress * @throws TarantoolClientException if fetching data failed with error */ @@ -26,6 +27,7 @@ public interface TarantoolMetadataOperations { /** * Get metadata for the space specified by name + * * @param spaceName the space name, must not be null or empty * @return nullable space metadata wrapped in {@link Optional} */ @@ -33,7 +35,8 @@ public interface TarantoolMetadataOperations { /** * Get metadata for index from the specified space by name - * @param spaceId the space ID, must be greater than 0 + * + * @param spaceId the space ID, must be greater than 0 * @param indexName index name, must not be null or empty * @return nullable index metadata wrapped in {@link Optional} */ @@ -41,6 +44,7 @@ public interface TarantoolMetadataOperations { /** * Get metadata for index from the specified space by name + * * @param spaceName the space name, must not be null or empty * @param indexName index name, must not be null or empty * @return nullable index metadata wrapped in {@link Optional} @@ -49,14 +53,16 @@ public interface TarantoolMetadataOperations { /** * Get metadata for index from the specified space by index ID + * * @param spaceName the space name, must not be null or empty - * @param indexId index ID, must not be must be greater or equal than 0 + * @param indexId index ID, must not be must be greater or equal than 0 * @return nullable index metadata wrapped in {@link Optional} */ Optional getIndexById(String spaceName, int indexId); /** * Get metadata for index from the specified space by index ID + * * @param spaceId the space ID, must be greater than 0 * @param indexId index ID, must not be must be greater or equal than 0 * @return nullable index metadata wrapped in {@link Optional} @@ -65,6 +71,7 @@ public interface TarantoolMetadataOperations { /** * Get metadata for the space specified by id + * * @param spaceId the space ID, must be greater than 0 * @return nullable space metadata wrapped in {@link Optional} */ @@ -72,6 +79,7 @@ public interface TarantoolMetadataOperations { /** * Get metadata for all indexes for space specified by id + * * @param spaceId the space ID, must be greater than 0 * @return nullable map of index names to index metadata wrapped in {@link Optional} */ @@ -79,6 +87,7 @@ public interface TarantoolMetadataOperations { /** * Get metadata for all indexes for space specified by name + * * @param spaceName the space name, must not be null or empty * @return nullable map of index names to index metadata wrapped in {@link Optional} */ diff --git a/src/test/java/io/tarantool/driver/integration/Utils.java b/src/test/java/io/tarantool/driver/integration/Utils.java index 8255be6e7..aa51f836b 100644 --- a/src/test/java/io/tarantool/driver/integration/Utils.java +++ b/src/test/java/io/tarantool/driver/integration/Utils.java @@ -1,30 +1,108 @@ package io.tarantool.driver.integration; +import io.tarantool.driver.api.TarantoolClient; import io.tarantool.driver.api.TarantoolResult; import io.tarantool.driver.api.conditions.Conditions; import io.tarantool.driver.api.space.TarantoolSpaceOperations; import io.tarantool.driver.api.tuple.TarantoolTuple; import io.tarantool.driver.exceptions.TarantoolClientException; +import io.tarantool.driver.exceptions.TarantoolException; +import io.tarantool.driver.exceptions.TarantoolSpaceFieldNotFoundException; +import io.tarantool.driver.protocol.Packable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.BitSet; +import java.util.Collection; import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.fail; /** * @author Ivan Dneprov + * @author Artyom Dubinin */ public final class Utils { + private static Optional bucketCount = Optional.empty(); + private Utils() { } /** - * Checks if the space is empty. - * - * @param testSpace space to check - */ - static void checkSpaceIsEmpty(TarantoolSpaceOperations> testSpace) { + * Checks if the space is empty. + * + * @param testSpace space to check + */ + static void checkSpaceIsEmpty(TarantoolSpaceOperations> testSpace) { assertEquals(0, testSpace.select(Conditions.any()).thenApply(List::size).join()); } + + /** + * Get number of buckets in vshard cluster. + * + * @param client Tarantool client for with access to vshard router + * @param target tuple type + * @param target tuple collection type + * @return number of buckets + */ + public static > Integer getBucketCount( + TarantoolClient client) throws ExecutionException, InterruptedException { + if (!bucketCount.isPresent()) { + bucketCount = Optional.ofNullable( + client.callForSingleResult("vshard.router.bucket_count", Integer.class).get() + ); + } + return bucketCount.orElseThrow(() -> new TarantoolClientException("Failed to get bucket count")); + } + + /** + * Get bucket_id via crc32 hash function. + * You can't use null, because null is packed to box.NULL((void *) 0) and java doesn't have equivalent. + * + * @param client Tarantool client for with access to vshard router + * @param key key that will be used to calculate bucketId + * @param target tuple type + * @param target tuple collection type + * @return bucketId number determining the location in the cluster + */ + public static > Integer getBucketIdStrCRC32( + TarantoolClient client, List key) throws ExecutionException, InterruptedException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + for (Object part : key) { + try { + if (part != null) { + outputStream.write(part.toString().getBytes()); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return Math.toIntExact( + (crc32(outputStream.toByteArray()) % getBucketCount(client)) + 1 + ); + } + + /** + * Implementation of crc32 partially was taken from + * + * github.com/TheAlgorithms + * + * @param data input bytes array + * @return hash response in decimal view + */ + private static long crc32(byte[] data) { + BitSet bitSet = BitSet.valueOf(data); + int crc32 = 0xFFFFFFFF; // initial value + for (int i = 0; i < data.length * 8; i++) { + if (((crc32 >>> 31) & 1) != (bitSet.get(i) ? 1 : 0)) { + crc32 = (crc32 << 1) ^ 0x1EDC6F41; // xor with polynomial + } else { + crc32 = crc32 << 1; + } + } + crc32 = Integer.reverse(crc32); // result reflect + return crc32 & 0x00000000ffffffffL; // the unsigned java problem + } } diff --git a/src/test/java/io/tarantool/driver/integration/proxy/options/ProxySpaceInsertOptionsIT.java b/src/test/java/io/tarantool/driver/integration/proxy/options/ProxySpaceInsertOptionsIT.java index 1f9d7daca..abc45b174 100644 --- a/src/test/java/io/tarantool/driver/integration/proxy/options/ProxySpaceInsertOptionsIT.java +++ b/src/test/java/io/tarantool/driver/integration/proxy/options/ProxySpaceInsertOptionsIT.java @@ -6,6 +6,7 @@ import io.tarantool.driver.api.conditions.Conditions; import io.tarantool.driver.api.space.TarantoolSpaceOperations; import io.tarantool.driver.api.space.options.InsertOptions; +import io.tarantool.driver.api.space.options.SelectOptions; import io.tarantool.driver.api.space.options.proxy.ProxySelectOptions; import io.tarantool.driver.api.tuple.DefaultTarantoolTupleFactory; import io.tarantool.driver.api.tuple.TarantoolTuple; @@ -14,14 +15,15 @@ import io.tarantool.driver.core.ClusterTarantoolTupleClient; import io.tarantool.driver.core.ProxyTarantoolTupleClient; import io.tarantool.driver.api.space.options.proxy.ProxyInsertOptions; -import io.tarantool.driver.exceptions.TarantoolConnectionException; import io.tarantool.driver.exceptions.TarantoolInternalException; import io.tarantool.driver.integration.SharedCartridgeContainer; +import io.tarantool.driver.integration.Utils; import io.tarantool.driver.mappers.DefaultMessagePackMapperFactory; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -135,13 +137,56 @@ public void withBucketIdTest() throws ExecutionException, InterruptedException { } @Test - public void withBucketIdMoreThanLimitTest() throws ExecutionException, InterruptedException { + public void withBucketIdFromClientTest() throws ExecutionException, InterruptedException { TarantoolSpaceOperations> profileSpace = client.space(TEST_SPACE_NAME); TarantoolTuple tarantoolTuple = tupleFactory.create(1, null, "FIO", 50, 100); Conditions condition = Conditions.equals(PK_FIELD_NAME, 1); + Integer bucketId = Utils.getBucketIdStrCRC32(client, + Collections.singletonList(tarantoolTuple.getInteger(0))); + InsertOptions insertOptions = ProxyInsertOptions.create().withBucketId(bucketId); + + TarantoolResult insertResult = profileSpace.insert(tarantoolTuple, insertOptions).get(); + assertEquals(1, insertResult.size()); + + TarantoolResult selectResult = profileSpace.select(condition).get(); + assertEquals(1, selectResult.size()); + + SelectOptions selectOptions = ProxySelectOptions.create().withBucketId(bucketId); + selectResult = profileSpace.select(condition, selectOptions).get(); + assertEquals(1, selectResult.size()); + } + + private Integer getBucketIdFromTarantool(List key) throws ExecutionException, InterruptedException { + return client.callForSingleResult( + "vshard.router.bucket_id_strcrc32", + Collections.singletonList(key), + Integer.class + ).get(); + } + + @Test + public void withBucketIdClientComputationTest() throws ExecutionException, InterruptedException { + List> keys = Arrays.asList( + Collections.singletonList(1), + Arrays.asList(1, "FIO"), + Arrays.asList(1, true, "FIO", 'm', 100.123) + ); + + for (List key : keys) { + assertEquals(Utils.getBucketIdStrCRC32(client, key), getBucketIdFromTarantool(key)); + } + } + + @Test + public void withBucketIdMoreThanLimitTest() throws ExecutionException, InterruptedException { + TarantoolSpaceOperations> profileSpace = + client.space(TEST_SPACE_NAME); + + TarantoolTuple tarantoolTuple = tupleFactory.create(1, null, "FIO", 50, 100); + Integer bucketsCount = client.callForSingleResult("vshard.router.bucket_count", Integer.class).get(); InsertOptions insertOptions = ProxyInsertOptions.create().withBucketId(bucketsCount * 2);