-
Notifications
You must be signed in to change notification settings - Fork 1k
Description
Bug Report
Current Behavior
event loop thread gets blocked during disconnection/reconnection
Input Code
public class MultiThreadSyncGet {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultiThreadSyncGet.class);
private static final int LOOP_NUM = 10_000_000;
private static final int BATCH_SIZE = 1000;
private static final int DIGIT_NUM = 9;
private static final String KEY_FORMATTER = String.format("key-%%0%dd", DIGIT_NUM);
static {
// noinspection ConstantValue
LettuceAssert.assertState(DIGIT_NUM >= String.valueOf(LOOP_NUM).length() + 1, "digit num is not large enough");
}
void test() {
RedisURI uri = RedisURI.create("redis.dev-d-okex.svc.dev.local", 6379);
uri.setCredentialsProvider(new StaticCredentialsProvider(null, "123qweasd!@#".toCharArray()));
try (RedisClient redisClient = RedisClient.create(uri)) {
final ClientOptions.Builder optsBuilder = ClientOptions.builder()
.timeoutOptions(TimeoutOptions.builder().fixedTimeout(Duration.ofSeconds(60)).build());
redisClient.setOptions(optsBuilder.build());
final StatefulRedisConnection<byte[], byte[]> connection = redisClient.connect(ByteArrayCodec.INSTANCE);
connection.setAutoFlushCommands(false);
new Thread(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
logger.error("interrupted", e);
}
connection.close();
}).start();
for (int j = 0; j < LOOP_NUM && connection.isOpen(); j++) {
for (int i = 0; i < BATCH_SIZE; i++) {
connection.async().get(genKey(j));
}
connection.flushCommands();
}
}
}
private byte[] genKey(int j) {
return String.format(KEY_FORMATTER, j).getBytes();
}
public static void main(String[] args) {
new MultiThreadSyncGet().test();
logger.info("=====================================");
}
}
I manipulate network using MacOS's Network Link Conditioner
, set Downlink Delay
to 1000ms, then set debug breakpoint at stack.remove(command)
public void operationComplete(Future<Void> future) {
try {
if (!future.isSuccess()) {
stack.remove(command);
}
} finally {
recycle();
}
}
Then evaluate
new ArrayList<>(stack).indexOf(command)
got 3971
.
evaluate
stack.size()
got 85000
.
we can conclude the compute complexity to remove all the failed-to-flush commands is O(3971*(85000-3971)), which is super big. If there are more successfully flushed commands before the first failed-to-flush command, the complexity could be even higher.
Expected behavior/code
Thread not blocked
Environment
- Lettuce version(s): git_sha_dee8020d92e6bff562cef723dcacdea714b89982
- Redis version: 7.0.0
Possible Solution
Use HashIndexedQueue instead which provides O(1) complexity to remove an element from the queue.
This is due to ArrayDeque#remove is O(n), if there are lots of tasks failing at AddToStask#operationComplete, then the event loop thread may get blocked for too long
Additional context
problematic code:
static class AddToStack implements GenericFutureListener<Future<Void>> {
...
AddToStack(Recycler.Handle<AddToStack> handle) {
this.handle = handle;
}
@SuppressWarnings("unchecked")
@Override
public void operationComplete(Future<Void> future) {
try {
if (!future.isSuccess()) {
stack.remove(command);
}
} finally {
recycle();
}
}
}