Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,19 +242,22 @@ public long forceWrite(boolean forceMetadata) throws IOException {
}

@Override
public synchronized int read(ByteBuf dest, long pos, int length) throws IOException {
public synchronized int read(ByteBuf dest, long pos, int length, int readExtraBytes) throws IOException {
if (dest.writableBytes() < length) {
throw new IllegalArgumentException("dest buffer remaining capacity is not enough"
+ "(must be at least as \"length\"=" + length + ")");
}

long prevPos = pos;
while (length > 0) {
// check if it is in the write buffer
if (writeBuffer != null && writeBufferStartPosition.get() <= pos) {
int positionInBuffer = (int) (pos - writeBufferStartPosition.get());
int bytesToCopy = Math.min(writeBuffer.writerIndex() - positionInBuffer, dest.writableBytes());

if (bytesToCopy == 0 && length <= readExtraBytes) {
// try to read next entry position, but we have reached the last entry
break;
}
if (bytesToCopy == 0) {
throw new IOException("Read past EOF");
}
Expand All @@ -278,6 +281,10 @@ public synchronized int read(ByteBuf dest, long pos, int length) throws IOExcept

int readBytes = fileChannel.read(readBuffer.internalNioBuffer(0, readCapacity),
readBufferStartPosition);
if (readBytes <= 0 && length <= readExtraBytes) {
// we have reached the last entry
break;
}
if (readBytes <= 0) {
throw new IOException("Reading from filechannel returned a non-positive value. Short read.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,8 @@ public BufferedReadChannel(FileChannel fileChannel, int readCapacity, boolean se
this.readBuffer = Unpooled.buffer(readCapacity);
}

/**
* Read as many bytes into dest as dest.capacity() starting at position pos in the
* FileChannel. This function can read from the buffer or the file channel
* depending on the implementation..
* @param dest
* @param pos
* @return The total number of bytes read.
* -1 if the given position is greater than or equal to the file's current size.
* @throws IOException if I/O error occurs
*/
public int read(ByteBuf dest, long pos) throws IOException {
return read(dest, pos, dest.writableBytes());
return read(dest, pos, dest.writableBytes(), 0);
}

@Override
Expand All @@ -88,6 +78,22 @@ public long size() throws IOException {
}

public synchronized int read(ByteBuf dest, long pos, int length) throws IOException {
return read(dest, pos, length, 0);
}

/**
* Read as many bytes into dest as dest.capacity() starting at position pos in the
* FileChannel. This function can read from the buffer or the file channel
* depending on the implementation..
* @param dest
* @param pos
* @param length
* @param readExtraBytes
* @return The total number of bytes read.
* -1 if the given position is greater than or equal to the file's current size.
* @throws IOException if I/O error occurs
*/
public synchronized int read(ByteBuf dest, long pos, int length, int readExtraBytes) throws IOException {
invocationCount++;
long currentPosition = pos;
long eof = size();
Expand All @@ -111,9 +117,12 @@ public synchronized int read(ByteBuf dest, long pos, int length) throws IOExcept
} else {
// We don't have it in the buffer, so put necessary data in the buffer
readBufferStartPosition = currentPosition;
int readBytes = 0;
if ((readBytes = validateAndGetFileChannel().read(readBuffer.internalNioBuffer(0, readCapacity),
currentPosition)) <= 0) {
int readBytes = validateAndGetFileChannel().read(readBuffer.internalNioBuffer(0, readCapacity),
currentPosition);
if (readBytes <= 0 && length <= readExtraBytes) {
break;
}
if (readBytes <= 0) {
throw new IOException("Reading from filechannel returned a non-positive value. Short read.");
}
readBuffer.writerIndex(readBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,15 +384,21 @@ void addListener(EntryLogListener listener) {
*/
private int readFromLogChannel(long entryLogId, BufferedReadChannel channel, ByteBuf buff, long pos)
throws IOException {
return readFromLogChannel(entryLogId, channel, buff, pos, 0);
}

private int readFromLogChannel(long entryLogId, BufferedReadChannel channel, ByteBuf buff, long pos,
int readExtraBytes)
throws IOException {
BufferedLogChannel bc = entryLogManager.getCurrentLogIfPresent(entryLogId);
if (null != bc) {
synchronized (bc) {
if (pos + buff.writableBytes() >= bc.getFileChannelPosition()) {
return bc.read(buff, pos);
return bc.read(buff, pos, buff.writableBytes(), readExtraBytes);
}
}
}
return channel.read(buff, pos);
return channel.read(buff, pos, buff.writableBytes(), readExtraBytes);
}

/**
Expand Down Expand Up @@ -828,16 +834,27 @@ private void validateEntry(long ledgerId, long entryId, long entryLogId, long po
@Override
public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation)
throws IOException, Bookie.NoEntryException {
return internalReadEntry(ledgerId, entryId, entryLocation, true /* validateEntry */);
return internalReadEntry(ledgerId, entryId, entryLocation, true /* validateEntry */,
0).getRight();
}

@Override
public Pair<Integer, ByteBuf> readEntryAndExtraBytes(long ledgerId, long entryId, long entryLocation,
int extraBytes)
throws IOException, Bookie.NoEntryException {
return internalReadEntry(ledgerId, entryId, entryLocation, true /* validateEntry */,
extraBytes);
}

@Override
public ByteBuf readEntry(long location) throws IOException, Bookie.NoEntryException {
return internalReadEntry(-1L, -1L, location, false /* validateEntry */);
return internalReadEntry(-1L, -1L, location, false /* validateEntry */,
0).getRight();
}


private ByteBuf internalReadEntry(long ledgerId, long entryId, long location, boolean validateEntry)
private Pair<Integer, ByteBuf> internalReadEntry(long ledgerId, long entryId, long location, boolean validateEntry,
int readExtraBytes)
throws IOException, Bookie.NoEntryException {
long entryLogId = logIdForOffset(location);
long pos = posForOffset(location);
Expand All @@ -857,18 +874,20 @@ private ByteBuf internalReadEntry(long ledgerId, long entryId, long location, bo
throw new IOException("Bad entry read from log file id: " + entryLogId, e);
}

ByteBuf data = allocator.buffer(entrySize, entrySize);
int rc = readFromLogChannel(entryLogId, fc, data, pos);
if (rc != entrySize) {
int readSize = entrySize + readExtraBytes;
ByteBuf data = allocator.buffer(readSize, readSize);
int rc = readFromLogChannel(entryLogId, fc, data, pos, readExtraBytes);
if (rc < entrySize) {
ReferenceCountUtil.release(data);
throw new IOException("Bad entry read from log file id: " + entryLogId,
new EntryLookupException("Short read for " + ledgerId + "@"
+ entryId + " in " + entryLogId + "@"
+ pos + "(" + rc + "!=" + entrySize + ")"));
+ pos + "(" + rc + "<" + entrySize + "@"
+ rc + "!=" + readSize + ")"));
}
data.writerIndex(entrySize);
data.writerIndex(rc);

return data;
return Pair.of(entrySize, data);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ public LedgerDescriptor getReadOnlyHandle(final long ledgerId) throws IOExceptio
LedgerDescriptor handle = readOnlyLedgers.get(ledgerId);

if (handle == null) {
handle = LedgerDescriptor.createReadOnly(ledgerId, ledgerStorage);
readOnlyLedgers.putIfAbsent(ledgerId, handle);
synchronized (this) {
if ((handle = readOnlyLedgers.get(ledgerId)) == null) {
handle = LedgerDescriptor.createReadOnly(ledgerId, ledgerStorage);
readOnlyLedgers.putIfAbsent(ledgerId, handle);
}
}
}

return handle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import java.io.IOException;
import java.util.Collection;
import org.apache.bookkeeper.bookie.AbstractLogCompactor;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
import org.apache.bookkeeper.bookie.EntryLogMetadata;
import org.apache.commons.lang3.tuple.Pair;


/**
Expand Down Expand Up @@ -56,6 +58,10 @@ public interface EntryLogger extends AutoCloseable {
*/
ByteBuf readEntry(long entryLocation)
throws IOException, NoEntryException;
Pair<Integer, ByteBuf> readEntryAndExtraBytes(long ledgerId, long entryId, long entryLocation,
int extraBytes)
throws IOException, Bookie.NoEntryException;

/**
* Read an entry from an entrylog location, and verify that is matches the
* expected ledger and entry ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.regex.Matcher;
import java.util.stream.Collectors;
import org.apache.bookkeeper.bookie.AbstractLogCompactor;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
import org.apache.bookkeeper.bookie.EntryLogMetadata;
import org.apache.bookkeeper.bookie.storage.CompactionEntryLog;
Expand All @@ -59,6 +60,7 @@
import org.apache.bookkeeper.slogger.Slogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.LedgerDirUtil;
import org.apache.commons.lang3.tuple.Pair;

/**
* DirectEntryLogger.
Expand Down Expand Up @@ -211,6 +213,13 @@ public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation)
return internalReadEntry(ledgerId, entryId, entryLocation, true);
}

@Override
public Pair<Integer, ByteBuf> readEntryAndExtraBytes(long ledgerId, long entryId, long entryLocation,
int readBufferSize)
throws IOException, Bookie.NoEntryException {
throw new UnsupportedOperationException("readEntryAndExtraBytes is not supported in DirectEntryLogger");
}

private LogReader getReader(int logId) throws IOException {
Cache<Integer, LogReader> cache = caches.get();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class DbLedgerStorage implements LedgerStorage {

public static final String WRITE_CACHE_MAX_SIZE_MB = "dbStorage_writeCacheMaxSizeMb";
public static final String READ_AHEAD_CACHE_MAX_SIZE_MB = "dbStorage_readAheadCacheMaxSizeMb";
public static final String DISABLE_READ_CACHE = "dbStorage_disableReadCache";
public static final String ENABLE_LOCATION_CACHE = "dbStorage_enableLocationCache";
public static final String DIRECT_IO_ENTRYLOGGER = "dbStorage_directIOEntryLogger";
public static final String DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB =
"dbStorage_directIOEntryLoggerTotalWriteBufferSizeMB";
Expand All @@ -101,6 +103,8 @@ public class DbLedgerStorage implements LedgerStorage {
private static final long DEFAULT_READ_CACHE_MAX_SIZE_MB =
(long) (0.25 * PlatformDependent.estimateMaxDirectMemory()) / MB;

private static final boolean DEFAULT_DISABLE_READ_CACHE = false;
private static final boolean DEFAULT_ENABLE_LOCATION_CACHE = false;
static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize";
static final String READ_AHEAD_CACHE_BATCH_BYTES_SIZE = "dbStorage_readAheadCacheBatchBytesSize";
private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
Expand Down Expand Up @@ -155,13 +159,20 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le
DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
boolean directIOEntryLogger = getBooleanVariableOrDefault(conf, DIRECT_IO_ENTRYLOGGER, false);

boolean disableReadCache = getBooleanVariableOrDefault(conf, DISABLE_READ_CACHE, DEFAULT_DISABLE_READ_CACHE);
boolean enableLocationCache = getBooleanVariableOrDefault(conf, ENABLE_LOCATION_CACHE,
DEFAULT_ENABLE_LOCATION_CACHE);
this.allocator = allocator;
this.numberOfDirs = ledgerDirsManager.getAllLedgerDirs().size();

log.info("Started Db Ledger Storage");
log.info(" - Number of directories: {}", numberOfDirs);
log.info(" - Write cache size: {} MB", writeCacheMaxSize / MB);
log.info(" - Read Cache: {} MB", readCacheMaxSize / MB);
if (disableReadCache) {
log.info(" - Read Cache: DISABLED");
} else {
log.info(" - Read Cache: {} MB", readCacheMaxSize / MB);
}

if (readCacheMaxSize + writeCacheMaxSize > PlatformDependent.estimateMaxDirectMemory()) {
throw new IOException("Read and write cache sizes exceed the configured max direct memory size");
Expand Down Expand Up @@ -242,7 +253,7 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le
idm, entrylogger,
statsLogger, perDirectoryWriteCacheSize,
perDirectoryReadCacheSize,
readAheadCacheBatchSize, readAheadCacheBatchBytesSize));
readAheadCacheBatchSize, readAheadCacheBatchBytesSize, disableReadCache, enableLocationCache));
ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener);
if (!lDirs[0].getPath().equals(iDirs[0].getPath())) {
idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener);
Expand Down Expand Up @@ -281,11 +292,13 @@ public Long getSample() {
protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf,
LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
EntryLogger entryLogger, StatsLogger statsLogger, long writeCacheSize, long readCacheSize,
int readAheadCacheBatchSize, long readAheadCacheBatchBytesSize)
int readAheadCacheBatchSize, long readAheadCacheBatchBytesSize, boolean disableReadCache,
boolean enableLocationCache)
throws IOException {
return new SingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager, entryLogger,
statsLogger, allocator, writeCacheSize, readCacheSize,
readAheadCacheBatchSize, readAheadCacheBatchBytesSize);
readAheadCacheBatchSize, readAheadCacheBatchBytesSize, disableReadCache,
enableLocationCache);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ class DbLedgerStorageStats {
help = "time spent reading entries from the locations index of the db ledger storage engine",
parent = READ_ENTRY
)
private final Counter readFromLocationIndexTime;
private final OpStatsLogger readFromLocationIndexTime;
@StatsDoc(
name = READ_ENTRYLOG_TIME,
help = "time spent reading entries from the entry log files of the db ledger storage engine",
parent = READ_ENTRY
)
private final Counter readFromEntryLogTime;
private final OpStatsLogger readFromEntryLogTime;
@StatsDoc(
name = WRITE_CACHE_HITS,
help = "number of write cache hits (on reads)",
Expand Down Expand Up @@ -133,7 +133,7 @@ class DbLedgerStorageStats {
name = READAHEAD_TIME,
help = "Time spent on readahead operations"
)
private final Counter readAheadTime;
private final OpStatsLogger readAheadTime;
@StatsDoc(
name = FLUSH,
help = "operation stats of flushing write cache to entry log files"
Expand Down Expand Up @@ -203,15 +203,15 @@ class DbLedgerStorageStats {
Supplier<Long> readCacheCountSupplier) {
addEntryStats = stats.getThreadScopedOpStatsLogger(ADD_ENTRY);
readEntryStats = stats.getThreadScopedOpStatsLogger(READ_ENTRY);
readFromLocationIndexTime = stats.getThreadScopedCounter(READ_ENTRY_LOCATIONS_INDEX_TIME);
readFromEntryLogTime = stats.getThreadScopedCounter(READ_ENTRYLOG_TIME);
readFromLocationIndexTime = stats.getThreadScopedOpStatsLogger(READ_ENTRY_LOCATIONS_INDEX_TIME);
readFromEntryLogTime = stats.getThreadScopedOpStatsLogger(READ_ENTRYLOG_TIME);
readCacheHitCounter = stats.getCounter(READ_CACHE_HITS);
readCacheMissCounter = stats.getCounter(READ_CACHE_MISSES);
writeCacheHitCounter = stats.getCounter(WRITE_CACHE_HITS);
writeCacheMissCounter = stats.getCounter(WRITE_CACHE_MISSES);
readAheadBatchCountStats = stats.getOpStatsLogger(READAHEAD_BATCH_COUNT);
readAheadBatchSizeStats = stats.getOpStatsLogger(READAHEAD_BATCH_SIZE);
readAheadTime = stats.getThreadScopedCounter(READAHEAD_TIME);
readAheadTime = stats.getOpStatsLogger(READAHEAD_TIME);
flushStats = stats.getOpStatsLogger(FLUSH);
flushEntryLogStats = stats.getOpStatsLogger(FLUSH_ENTRYLOG);
flushLocationIndexStats = stats.getOpStatsLogger(FLUSH_LOCATIONS_INDEX);
Expand Down
Loading
Loading