Skip to content

Commit d534cf5

Browse files
committed
Add async call in retrying client
Closes #299
1 parent ad6203b commit d534cf5

4 files changed

Lines changed: 79 additions & 38 deletions

File tree

src/main/java/io/tarantool/driver/api/retry/RequestRetryPolicy.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -49,28 +49,31 @@ default long getRequestTimeout() {
4949
* See {@link TarantoolRequestRetryPolicies.InfiniteRetryPolicy} for example of implementation.
5050
*
5151
* @param operation supplier for the operation to perform. Must return a new operation instance
52-
* @param executor executor in which the retry callbacks will be scheduled
5352
* @param <T> operation result type
5453
* @return {@link CompletableFuture} with the same type as the operation result type
5554
*/
5655
default <T> CompletableFuture<T> wrapOperation(Supplier<CompletableFuture<T>> operation, Executor executor) {
5756
Assert.notNull(operation, "Operation must not be null");
5857
Assert.notNull(executor, "Executor must not be null");
5958

60-
return CompletableFuture.supplyAsync(() -> {
61-
Throwable ex;
62-
do {
63-
CompletableFuture<T> f = operation.get();
64-
try {
65-
return f.get(getRequestTimeout(), TimeUnit.MILLISECONDS);
66-
} catch (TimeoutException | InterruptedException e) {
67-
ex = e;
68-
} catch (ExecutionException e) {
69-
ex = e.getCause();
70-
}
71-
72-
} while (this.canRetryRequest(ex));
73-
throw new CompletionException(ex);
59+
CompletableFuture<T> result = new CompletableFuture<>();
60+
CompletableFuture.runAsync(() -> {
61+
doRequest(operation, result);
7462
}, executor);
63+
return result;
64+
}
65+
66+
default <T> void doRequest(Supplier<CompletableFuture<T>> operation, CompletableFuture<T> resultFuture) {
67+
CompletableFuture<T> future = operation.get();
68+
future.handle((res, ex) -> {
69+
if (ex == null) {
70+
return resultFuture.complete(res);
71+
}
72+
if (!this.canRetryRequest(ex)) {
73+
resultFuture.completeExceptionally(ex);
74+
}
75+
doRequest(operation, resultFuture);
76+
return null;
77+
});
7578
}
7679
}

src/main/java/io/tarantool/driver/api/retry/TarantoolRequestRetryPolicies.java

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
package io.tarantool.driver.api.retry;
22

3+
import io.tarantool.driver.core.TarantoolDaemonThreadFactory;
34
import io.tarantool.driver.exceptions.TarantoolAttemptsLimitException;
45
import io.tarantool.driver.exceptions.TarantoolClientException;
56
import io.tarantool.driver.exceptions.TarantoolConnectionException;
67
import io.tarantool.driver.exceptions.TarantoolInternalNetworkException;
78
import io.tarantool.driver.exceptions.TarantoolNoSuchProcedureException;
8-
import io.tarantool.driver.exceptions.TarantoolTimeoutException;
99
import io.tarantool.driver.utils.Assert;
1010

1111
import java.util.concurrent.CompletableFuture;
1212
import java.util.concurrent.CompletionException;
1313
import java.util.concurrent.ExecutionException;
1414
import java.util.concurrent.Executor;
15+
import java.util.concurrent.Executors;
16+
import java.util.concurrent.ScheduledExecutorService;
1517
import java.util.concurrent.TimeUnit;
1618
import java.util.concurrent.TimeoutException;
1719
import java.util.function.Predicate;
@@ -69,6 +71,7 @@ public static final class InfiniteRetryPolicy<T extends Predicate<Throwable>> im
6971
private final long operationTimeout; //ms
7072
private final long delay; //ms
7173
private final T exceptionCheck;
74+
private final ScheduledExecutorService timeoutScheduler;
7275

7376
/**
7477
* Basic constructor
@@ -89,6 +92,8 @@ public InfiniteRetryPolicy(long requestTimeout, long operationTimeout, long dela
8992
this.operationTimeout = operationTimeout;
9093
this.delay = delay;
9194
this.exceptionCheck = exceptionCheck;
95+
this.timeoutScheduler =
96+
Executors.newSingleThreadScheduledExecutor(new TarantoolDaemonThreadFactory("tarantool-retry-timeout"));
9297
}
9398

9499
@Override
@@ -113,33 +118,20 @@ public long getOperationTimeout() {
113118

114119
@Override
115120
public <R> CompletableFuture<R> wrapOperation(Supplier<CompletableFuture<R>> operation, Executor executor) {
116-
117121
Assert.notNull(operation, "Operation must not be null");
118122
Assert.notNull(executor, "Executor must not be null");
119123

120-
return CompletableFuture.supplyAsync(() -> {
121-
long timeElapsed = 0;
122-
long tStart;
123-
Throwable ex;
124-
do {
125-
tStart = System.nanoTime();
126-
try {
127-
return operation.get().get(getRequestTimeout(), TimeUnit.MILLISECONDS);
128-
} catch (TimeoutException | InterruptedException e) {
129-
ex = e;
130-
} catch (ExecutionException e) {
131-
ex = e.getCause();
124+
CompletableFuture<R> resultFuture = new CompletableFuture<>();
125+
CompletableFuture.runAsync(() -> {
126+
doRequest(operation, resultFuture);
127+
timeoutScheduler.schedule(() -> {
128+
if (!resultFuture.isDone()) {
129+
resultFuture.completeExceptionally(new TimeoutException(String.format(
130+
"Failed to get retrying response within %d ms", operationTimeout)));
132131
}
133-
timeElapsed = timeElapsed + (System.nanoTime() - tStart) / 1_000_000L;
134-
if (timeElapsed >= getOperationTimeout()) {
135-
ex = new TarantoolTimeoutException(
136-
timeElapsed,
137-
ex);
138-
break;
139-
}
140-
} while (this.canRetryRequest(ex));
141-
throw new CompletionException(ex);
132+
}, operationTimeout, TimeUnit.MILLISECONDS);
142133
}, executor);
134+
return resultFuture;
143135
}
144136
}
145137

src/test/java/io/tarantool/driver/integration/RetryingTarantoolTupleClientIT.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
package io.tarantool.driver.integration;
22

3+
import io.tarantool.driver.api.TarantoolClient;
34
import io.tarantool.driver.api.TarantoolClientConfig;
5+
import io.tarantool.driver.api.TarantoolClientFactory;
6+
import io.tarantool.driver.api.TarantoolResult;
7+
import io.tarantool.driver.api.retry.RequestRetryPolicy;
8+
import io.tarantool.driver.api.retry.RequestRetryPolicyFactory;
49
import io.tarantool.driver.api.retry.TarantoolRequestRetryPolicies;
10+
import io.tarantool.driver.api.tuple.TarantoolTuple;
511
import io.tarantool.driver.auth.SimpleTarantoolCredentials;
612
import io.tarantool.driver.core.ClusterTarantoolTupleClient;
713
import io.tarantool.driver.core.ProxyTarantoolTupleClient;
@@ -13,6 +19,7 @@
1319
import org.junit.jupiter.api.BeforeAll;
1420
import org.junit.jupiter.api.Test;
1521

22+
import java.util.ArrayList;
1623
import java.util.Collections;
1724
import java.util.List;
1825
import java.util.concurrent.CompletableFuture;
@@ -218,4 +225,37 @@ void testInfiniteRetryTimeoutReached_withTimeout() throws Exception {
218225
assertEquals(TimeoutException.class, e.getCause().getCause().getClass());
219226
}
220227
}
228+
229+
@Test
230+
void testAsync() throws Exception {
231+
// RequestRetryPolicy policy = throwable -> true;
232+
// RequestRetryPolicyFactory policyFactory = () -> policy;
233+
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> client = TarantoolClientFactory.createClient()
234+
.withCredentials(new SimpleTarantoolCredentials(USER_NAME, PASSWORD))
235+
.withAddress(container.getRouterHost(), container.getRouterPort())
236+
.withConnectTimeout(1000)
237+
.withReadTimeout(1000)
238+
// .withRetrying(policyFactory)
239+
.withRetryingIndefinitely(policy -> policy.withDelay(500)
240+
.withRequestTimeout(2000)
241+
.withOperationTimeout(2000))
242+
.build();
243+
244+
client.call("reset_request_counters").join();
245+
246+
ArrayList<CompletableFuture<List<?>>> list = new ArrayList<>(1000);
247+
int batchSize = 1000;
248+
249+
for (int i = 0; i < 5; i++) {
250+
for (int j = 0; j < batchSize; j++) {
251+
CompletableFuture<List<?>> future = client
252+
.call("simple_long_running_function", Collections.singletonList(0.01));
253+
list.add(j, future);
254+
}
255+
for (int j = 0; j < batchSize; j++) {
256+
assertEquals(list.get(j).join().get(0), true);
257+
}
258+
list.clear();
259+
}
260+
}
221261
}

src/test/resources/cartridge/app/roles/api_router.lua

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ local function get_router_name()
108108
return string.sub(box.cfg.custom_proc_title, 9)
109109
end
110110

111+
local function simple_long_running_function(seconds_to_sleep)
112+
fiber.sleep(seconds_to_sleep)
113+
return true
114+
end
115+
111116
local function long_running_function(values)
112117
local seconds_to_sleep = 0
113118
local disabled_router_name = ""
@@ -203,6 +208,7 @@ local function init()
203208
rawset(_G, 'reset_request_counters', reset_request_counters)
204209
rawset(_G, 'get_router_name', get_router_name)
205210
rawset(_G, 'long_running_function', long_running_function)
211+
rawset(_G, 'simple_long_running_function', simple_long_running_function)
206212
rawset(_G, 'get_request_count', get_request_count)
207213
rawset(_G, 'box_error_unpack_no_connection', box_error_unpack_no_connection)
208214
rawset(_G, 'box_error_unpack_timeout', box_error_unpack_timeout)

0 commit comments

Comments
 (0)