Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
*/
package io.streamnative.pulsar.handlers.kop.storage;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.streamnative.pulsar.handlers.kop.SystemTopicClient;
import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -51,30 +54,73 @@ public class PulsarTopicProducerStateManagerSnapshotBuffer implements ProducerSt

private CompletableFuture<Void> currentReadHandle;

private synchronized CompletableFuture<Reader<ByteBuffer>> ensureReaderHandle() {
@VisibleForTesting
public synchronized CompletableFuture<Reader<ByteBuffer>> ensureReaderHandle() {
if (reader == null) {
reader = pulsarClient.newReaderBuilder()
CompletableFuture<Reader<ByteBuffer>> newReader = pulsarClient.newReaderBuilder()
.topic(topic)
.startMessageId(MessageId.earliest)
.readCompacted(true)
.createAsync();
reader = newReader;

newReader.whenComplete((r, error) -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a unit test for this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for not getting back to you sooner. I found this change may cause NPE problems, I add NPE checks and unit tests, maybe we don't need to merge this PR immediately.

if (error != null) {
discardReader(newReader);
}
});
}
return reader;
}

private synchronized CompletableFuture<Producer<ByteBuffer>> ensureProducerHandle() {
private synchronized void discardReader(CompletableFuture<Reader<ByteBuffer>> oldReader) {
if (reader == oldReader || (reader != null && reader.isCompletedExceptionally())) {
reader = null;
log.info("discard broken reader for {}", topic);
}
}

@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION")
private synchronized void discardReader(Reader<ByteBuffer> oldReader) {
if (reader == null) {
return;
}
if (reader.isCompletedExceptionally() || (reader.isDone()
&& !reader.isCompletedExceptionally()
&& reader.getNow(null) == oldReader)) {
log.info("discard broken reader for {}", topic);
reader = null;
}
}

@VisibleForTesting
public synchronized CompletableFuture<Producer<ByteBuffer>> ensureProducerHandle() {
if (producer == null) {
producer = pulsarClient.newProducerBuilder()
CompletableFuture<Producer<ByteBuffer>> newProducer = pulsarClient.newProducerBuilder()
.enableBatching(false)
.topic(topic)
.blockIfQueueFull(true)
.createAsync();

producer = newProducer;

newProducer.whenComplete((r, error) -> {
if (error != null) {
discardProducer(newProducer);
}
});
}
return producer;
}

private synchronized void discardProducer(CompletableFuture<Producer<ByteBuffer>> oldProducer) {
if (producer == oldProducer) {
producer = null;
}
}

private CompletableFuture<Void> readNextMessageIfAvailable(Reader<ByteBuffer> reader) {
return reader
CompletableFuture<Void> result = reader
.hasMessageAvailableAsync()
.thenCompose(hasMessageAvailable -> {
if (hasMessageAvailable == null
Expand All @@ -88,11 +134,19 @@ private CompletableFuture<Void> readNextMessageIfAvailable(Reader<ByteBuffer> re
});
}
});

result.whenComplete((r, error) -> {
if (error != null) {
discardReader(reader);
}
});

return result;
}


private synchronized CompletableFuture<Void> ensureLatestData(boolean beforeWrite) {
if (currentReadHandle != null) {
if (currentReadHandle != null && !currentReadHandle.isCompletedExceptionally()) {
if (beforeWrite) {
// we are inside a write loop, so
// we must ensure that we start to read now
Expand All @@ -110,9 +164,19 @@ private synchronized CompletableFuture<Void> ensureLatestData(boolean beforeWrit
// please note that the read operation is async,
// and it is not execute inside this synchronized block
CompletableFuture<Reader<ByteBuffer>> readerHandle = ensureReaderHandle();
if (readerHandle == null) {
return CompletableFuture.failedFuture(
new KoPTopicException("Failed to create reader handle for " + topic));
}
final CompletableFuture<Void> newReadHandle =
readerHandle.thenCompose(this::readNextMessageIfAvailable);
currentReadHandle = newReadHandle;

newReadHandle.exceptionally(___ -> {
endReadLoop(newReadHandle);
return null;
});

return newReadHandle.thenApply((__) -> {
endReadLoop(newReadHandle);
return null;
Expand All @@ -132,7 +196,12 @@ public CompletableFuture<Void> write(ProducerStateManagerSnapshot snapshot) {
// cannot serialise, skip
return CompletableFuture.completedFuture(null);
}
return ensureProducerHandle().thenCompose(opProducer -> {
CompletableFuture<Producer<ByteBuffer>> producerFuture = ensureProducerHandle();
if (producerFuture == null) {
return CompletableFuture.failedFuture(
new KoPTopicException("Failed to create producer handle for " + topic));
}
return producerFuture.thenCompose(opProducer -> {
// nobody can write now to the topic
// wait for local cache to be up-to-date
return ensureLatestData(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,26 @@
*/
package io.streamnative.pulsar.handlers.kop.storage;

import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.streamnative.pulsar.handlers.kop.SystemTopicClient;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.testng.annotations.Test;

/**
Expand Down Expand Up @@ -69,4 +78,48 @@ public void testSerializeAndDeserialize() {
}
}

@Test(timeOut = 5_000)
public void ensureReaderHandleCaughtExceptionTest() {
SystemTopicClient sysTopicClient = spy(new SystemTopicClient(pulsar, conf));
ReaderBuilder<ByteBuffer> readerBuilder = spy(sysTopicClient.newReaderBuilder());
when(readerBuilder.createAsync()).thenReturn(CompletableFuture.failedFuture(new RuntimeException("inject")));
when(sysTopicClient.newReaderBuilder()).thenReturn(readerBuilder);

PulsarTopicProducerStateManagerSnapshotBuffer snapshotBuffer =
new PulsarTopicProducerStateManagerSnapshotBuffer("snapshot-test-topic", sysTopicClient);
CompletableFuture<Reader<ByteBuffer>> readerFuture = snapshotBuffer.ensureReaderHandle();
if (readerFuture != null) {
try {
readerFuture.get();
fail("should fail");
} catch (Exception e) {
assertEquals(e.getCause().getMessage(), "inject");
}
} else {
log.info("This is expected behavior.");
}
}

@Test(timeOut = 5_000)
public void ensureProducerCaughtExceptionTest() {
SystemTopicClient sysTopicClient = spy(new SystemTopicClient(pulsar, conf));
ProducerBuilder<ByteBuffer> producerBuilder = spy(sysTopicClient.newProducerBuilder());
when(producerBuilder.createAsync()).thenReturn(CompletableFuture.failedFuture(new RuntimeException("inject")));
when(sysTopicClient.newProducerBuilder()).thenReturn(producerBuilder);

PulsarTopicProducerStateManagerSnapshotBuffer snapshotBuffer =
new PulsarTopicProducerStateManagerSnapshotBuffer("snapshot-test-topic", sysTopicClient);
CompletableFuture<Producer<ByteBuffer>> producerFuture = snapshotBuffer.ensureProducerHandle();
if (producerFuture != null) {
try {
producerFuture.get();
fail("should fail");
} catch (Exception e) {
assertEquals(e.getCause().getMessage(), "inject");
}
} else {
log.info("This is expected behavior.");
}
}

}