diff --git a/ambry-api/src/main/java/com/github/ambry/clustermap/FileStoreException.java b/ambry-api/src/main/java/com/github/ambry/clustermap/FileStoreException.java new file mode 100644 index 0000000000..7f50e96c38 --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/clustermap/FileStoreException.java @@ -0,0 +1,21 @@ +package com.github.ambry.clustermap; + +public class FileStoreException extends RuntimeException{ + + private static final long serialVersionUID = 1L; + private final FileStoreErrorCode error; + + public FileStoreException(String s, FileStoreErrorCode error) { + super(s); + this.error = error; + } + + public FileStoreException(String s, FileStoreErrorCode error, Throwable throwable) { + super(s, throwable); + this.error = error; + } + + public enum FileStoreErrorCode{ + FileStoreRunningFailure + } +} diff --git a/ambry-api/src/main/java/com/github/ambry/clustermap/ReplicaSyncUpManager.java b/ambry-api/src/main/java/com/github/ambry/clustermap/ReplicaSyncUpManager.java index dc9531ccd7..c0ae6ae445 100644 --- a/ambry-api/src/main/java/com/github/ambry/clustermap/ReplicaSyncUpManager.java +++ b/ambry-api/src/main/java/com/github/ambry/clustermap/ReplicaSyncUpManager.java @@ -35,6 +35,8 @@ public interface ReplicaSyncUpManager { * @throws InterruptedException */ void waitBootstrapCompleted(String partitionName) throws InterruptedException; + void initiateFileCopy(ReplicaId replicaId); + void waitForFileCopyCompleted(String partitionName) throws InterruptedException; /** * Update replica lag (in byte) between two replicas (local and peer replica) and check sync-up status. @@ -64,6 +66,8 @@ boolean updateReplicaLagAndCheckSyncStatus(ReplicaId localReplica, ReplicaId pee */ void onBootstrapComplete(ReplicaId replicaId); + void onFileCopyComplete(ReplicaId replicaId); + /** * Deactivation on given replica is complete. * @param replicaId the replica which completes deactivation. diff --git a/ambry-api/src/main/java/com/github/ambry/clustermap/StateModelListenerType.java b/ambry-api/src/main/java/com/github/ambry/clustermap/StateModelListenerType.java index 83c5f43618..c43bccd5e1 100644 --- a/ambry-api/src/main/java/com/github/ambry/clustermap/StateModelListenerType.java +++ b/ambry-api/src/main/java/com/github/ambry/clustermap/StateModelListenerType.java @@ -41,5 +41,13 @@ public enum StateModelListenerType { * leadership hand-off occurs. For example, if any replica becomes LEADER from STANDBY, it is supposed to replicate * data from VCR nodes. This is part of two-way replication between Ambry and cloud. */ - CloudToStoreReplicationManagerListener + CloudToStoreReplicationManagerListener, + + /** + * The partition state change listener owned by Helix participant. It takes actions when partition state transition + * occurs. + */ + FileCopyManagerListener + + } diff --git a/ambry-api/src/main/java/com/github/ambry/clustermap/StateTransitionException.java b/ambry-api/src/main/java/com/github/ambry/clustermap/StateTransitionException.java index 7df1ee7a94..6f89632e40 100644 --- a/ambry-api/src/main/java/com/github/ambry/clustermap/StateTransitionException.java +++ b/ambry-api/src/main/java/com/github/ambry/clustermap/StateTransitionException.java @@ -73,6 +73,8 @@ public enum TransitionErrorCode { /** * If the resource name is not a numeric number. */ - InvalidResourceName + InvalidResourceName, + + FileCopyFailure } } diff --git a/ambry-api/src/main/java/com/github/ambry/config/FileCopyConfig.java b/ambry-api/src/main/java/com/github/ambry/config/FileCopyConfig.java new file mode 100644 index 0000000000..d4fdb1b156 --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/config/FileCopyConfig.java @@ -0,0 +1,37 @@ +package com.github.ambry.config; + +public class FileCopyConfig { + + public static final String PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK = "parallel.partition.hydration.count.per.disk"; + @Config(PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK) + public final int parallelPartitionHydrationCountPerDisk; + + public static final String NUMBER_OF_FILE_COPY_THREADS = "number.of.file.copy.threads"; + @Config(NUMBER_OF_FILE_COPY_THREADS) + public final int numberOfFileCopyThreads; + + public static final String FILE_CHUNK_TIMEOUT_IN_MINUTES = "file.chunk.timeout.in.minutes"; + @Config(FILE_CHUNK_TIMEOUT_IN_MINUTES) + public final long fileChunkTimeoutInMins; + + /** + * The frequency at which the data gets flushed to disk + */ + public static final String STORE_DATA_FLUSH_INTERVAL_IN_MBS = "store.data.flush.interval.In.MBs"; + @Config(STORE_DATA_FLUSH_INTERVAL_IN_MBS) + @Default("1000") + public final long storeDataFlushIntervalInMbs; + + public static final String File_COPY_META_DATA_FILE_NAME = "file.copy.meta.data.file.name"; + @Config(File_COPY_META_DATA_FILE_NAME) + @Default("sealed_logs_metadata_file") + public final String fileCopyMetaDataFileName; + + public FileCopyConfig(VerifiableProperties verifiableProperties) { + fileCopyMetaDataFileName = verifiableProperties.getString(File_COPY_META_DATA_FILE_NAME, "sealed_logs_metadata_file"); + parallelPartitionHydrationCountPerDisk = verifiableProperties.getInt(PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK, 1); + numberOfFileCopyThreads = verifiableProperties.getInt(NUMBER_OF_FILE_COPY_THREADS, 4); + fileChunkTimeoutInMins = verifiableProperties.getInt(FILE_CHUNK_TIMEOUT_IN_MINUTES, 5); + storeDataFlushIntervalInMbs = verifiableProperties.getLong(STORE_DATA_FLUSH_INTERVAL_IN_MBS, 1000); + } +} diff --git a/ambry-api/src/main/java/com/github/ambry/config/ServerReplicationMode.java b/ambry-api/src/main/java/com/github/ambry/config/ServerReplicationMode.java new file mode 100644 index 0000000000..5d3b6a2ae1 --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/config/ServerReplicationMode.java @@ -0,0 +1,6 @@ +package com.github.ambry.config; + +public enum ServerReplicationMode { + BLOB_BASED, + FILE_BASED; +} diff --git a/ambry-api/src/main/java/com/github/ambry/config/StoreConfig.java b/ambry-api/src/main/java/com/github/ambry/config/StoreConfig.java index 32b48e5dab..42199ebed8 100644 --- a/ambry-api/src/main/java/com/github/ambry/config/StoreConfig.java +++ b/ambry-api/src/main/java/com/github/ambry/config/StoreConfig.java @@ -674,15 +674,38 @@ public class StoreConfig { public final boolean storeBlockStaleBlobStoreToStart; public final static String storeBlockStaleBlobStoreToStartName = "store.block.stale.blob.store.to.start"; + /** + * Config to Decide Replication Protocol For Hydration Of Newly Added Replicas + */ + public static final String SERVER_REPLICATION_PROTOCOL_FOR_HYDRATION = "server.replication.protocol.for.hydration"; + @Config(SERVER_REPLICATION_PROTOCOL_FOR_HYDRATION) + public final ServerReplicationMode serverReplicationProtocolForHydration; + + /** * Whether to attempt reshuffling of reordered disks and subsequent process termination. */ @Config("store.reshuffle.disks.on.reorder") @Default("false") public final boolean storeReshuffleDisksOnReorder; + + public static final String FILE_COPY_IN_PROGRESS_FILE_NAME = "file.copy.in.progress.file.name"; + @Config(FILE_COPY_IN_PROGRESS_FILE_NAME) + @Default("file_copy_in_progress") + public final String fileCopyInProgressFileName; + + public static final String BOOTSTRAP_IN_PROGRESS_FILE = "bootstrap.in.progress.file.name"; + @Config(BOOTSTRAP_IN_PROGRESS_FILE) + @Default("bootstrap_in_progress") + public final String bootstrapInProgressFile; + public final static String storeReshuffleDisksOnReorderName = "store.reshuffle.disks.on.reorder"; public StoreConfig(VerifiableProperties verifiableProperties) { + bootstrapInProgressFile = verifiableProperties.getString(BOOTSTRAP_IN_PROGRESS_FILE, "bootstrap_in_progress"); + fileCopyInProgressFileName = verifiableProperties.getString(FILE_COPY_IN_PROGRESS_FILE_NAME, "file_copy_in_progress"); + serverReplicationProtocolForHydration = verifiableProperties.getEnum(SERVER_REPLICATION_PROTOCOL_FOR_HYDRATION, + ServerReplicationMode.class, ServerReplicationMode.BLOB_BASED); storeKeyFactory = verifiableProperties.getString("store.key.factory", "com.github.ambry.commons.BlobIdFactory"); storeDataFlushIntervalSeconds = verifiableProperties.getLong("store.data.flush.interval.seconds", 60); storeIndexMaxMemorySizeBytes = verifiableProperties.getInt("store.index.max.memory.size.bytes", 20 * 1024 * 1024); diff --git a/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java b/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java index 1c268fd9eb..b393a53f9f 100644 --- a/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java +++ b/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java @@ -89,6 +89,13 @@ public interface RequestAPI { */ void handleReplicaMetadataRequest(NetworkRequest request) throws IOException, InterruptedException; + /** + * + * @param request + * @throws IOException + * @throws InterruptedException + */ + /** * Replicate one specific Blob from a remote host to the local store. * @param request The request that contains the remote host information and the blob id to be replicated. @@ -116,4 +123,12 @@ default void handleAdminRequest(NetworkRequest request) throws InterruptedExcept default void handleUndeleteRequest(NetworkRequest request) throws InterruptedException, IOException { throw new UnsupportedOperationException("Undelete request not supported on this node"); } + + default void handleFileMetaDataRequest(NetworkRequest request) throws InterruptedException, IOException{ + throw new UnsupportedOperationException("File Meta Data request not supported on this node"); + } + + default void handleFileChunkRequest(NetworkRequest request) throws InterruptedException, IOException{ + throw new UnsupportedOperationException("File Chunk request not supported on this node"); + } } diff --git a/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java b/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java index c25fb82092..418305b8ad 100644 --- a/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java +++ b/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.Collection; import java.util.List; +import com.github.ambry.store.FileStore; + /** @@ -34,6 +36,17 @@ public interface StoreManager { */ boolean addBlobStore(ReplicaId replica); + boolean addFileStore(ReplicaId replicaId); + + void setUpReplica(String partitionName); + + + /** + * Build state after filecopy is completed + * @param partitionName the partition id for which state is to be built.. + */ + void buildStateForFileCopy(String partitionName); + /** * Remove store from storage manager. * @param id the {@link PartitionId} associated with store @@ -62,6 +75,8 @@ public interface StoreManager { */ Store getStore(PartitionId id); + FileStore getFileStore(PartitionId id); + /** * Get replicaId on current node by partition name. (There should be at most one replica belonging to specific * partition on single node) diff --git a/ambry-api/src/main/java/com/github/ambry/store/FileMetaData.java b/ambry-api/src/main/java/com/github/ambry/store/FileMetaData.java new file mode 100644 index 0000000000..7ccafd11e8 --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/store/FileMetaData.java @@ -0,0 +1,34 @@ +package com.github.ambry.store; + +import java.util.List; + + +public class FileMetaData { + SealedFileInfo sealedSegments; + List indexSegments; + List bloomFilters; + + public SealedFileInfo getSealedSegments() { + return sealedSegments; + } + + public void setSealedSegments(SealedFileInfo sealedSegments) { + this.sealedSegments = sealedSegments; + } + + public List getIndexSegments() { + return indexSegments; + } + + public void setIndexSegments(List indexSegments) { + this.indexSegments = indexSegments; + } + + public List getBloomFilters() { + return bloomFilters; + } + + public void setBloomFilters(List bloomFilters) { + this.bloomFilters = bloomFilters; + } +} diff --git a/ambry-api/src/main/java/com/github/ambry/store/SealedFileInfo.java b/ambry-api/src/main/java/com/github/ambry/store/SealedFileInfo.java new file mode 100644 index 0000000000..058d184952 --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/store/SealedFileInfo.java @@ -0,0 +1,19 @@ +package com.github.ambry.store; + +public class SealedFileInfo { + private String fileName; + private final long fileSize; + + public SealedFileInfo(String fileName, Long fileSize) { + this.fileName = fileName; + this.fileSize = fileSize; + } + public String getFileName() { + return fileName; + } + + public Long getFileSize() { + return fileSize; + } + +} diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java index a33cf80c16..bcc9fa87af 100644 --- a/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java +++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java @@ -56,6 +56,10 @@ public CloudStorageManager(VerifiableProperties properties, VcrMetrics vcrMetric public boolean addBlobStore(ReplicaId replica) { return createAndStartBlobStoreIfAbsent(replica.getPartitionId()) != null; } + @Override + public void buildStateForFileCopy(String partitionName){ + // no-op + } @Override public boolean shutdownBlobStore(PartitionId id) { diff --git a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java index bda9b7af0b..6e6e7cedb1 100644 --- a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java +++ b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java @@ -41,6 +41,9 @@ public class AmbryReplicaSyncUpManager implements ReplicaSyncUpManager { private static final Logger logger = LoggerFactory.getLogger(AmbryReplicaSyncUpManager.class); private final ConcurrentHashMap partitionToBootstrapLatch = new ConcurrentHashMap<>(); + + private final ConcurrentHashMap partitionToFileCopyLatch = new ConcurrentHashMap<>(); + private final ConcurrentHashMap partitionToFileCopySuccessLatch = new ConcurrentHashMap<>(); private final ConcurrentHashMap partitionToDeactivationLatch = new ConcurrentHashMap<>(); private final ConcurrentHashMap partitionToDisconnectionLatch = new ConcurrentHashMap<>(); private final ConcurrentHashMap partitionToBootstrapSuccess = new ConcurrentHashMap<>(); @@ -63,6 +66,12 @@ public void initiateBootstrap(ReplicaId replicaId) { ReplicaState.BOOTSTRAP)); } + @Override + public void initiateFileCopy(ReplicaId replicaId){ + partitionToFileCopyLatch.put(replicaId.getPartitionId().toPathString(), new CountDownLatch(1)); + partitionToFileCopySuccessLatch.put(replicaId.getPartitionId().toPathString(), false); + } + @Override public void initiateDeactivation(ReplicaId replicaId) { partitionToDeactivationLatch.put(replicaId.getPartitionId().toPathString(), new CountDownLatch(1)); @@ -101,6 +110,22 @@ public void waitBootstrapCompleted(String partitionName) throws InterruptedExcep } } + @Override + public void waitForFileCopyCompleted(String partitionName) throws InterruptedException { + CountDownLatch latch = partitionToFileCopyLatch.get(partitionName); + if(latch == null) { + logger.info("Skipping file copy for existing partition {}", partitionName); + } else{ + logger.info("Waiting for new partition to {} to comeplete FileCopy", partitionName); + latch.await(); + partitionToFileCopyLatch.remove(partitionName); + if(!partitionToFileCopySuccessLatch.remove(partitionName)){ + throw new StateTransitionException("Partition " + partitionName + " failed to copy files.", FileCopyFailure); + } + logger.info("File Copy is complete on partition {}", partitionName); + } + } + @Override public void waitDeactivationCompleted(String partitionName) throws InterruptedException { CountDownLatch latch = partitionToDeactivationLatch.get(partitionName); @@ -192,6 +217,12 @@ public void onBootstrapComplete(ReplicaId replicaId) { countDownLatch(partitionToBootstrapLatch, replicaId.getPartitionId().toPathString()); } + @Override + public void onFileCopyComplete(ReplicaId replicaId){ + partitionToFileCopySuccessLatch.put(replicaId.getPartitionId().toPathString(), true); + countDownLatch(partitionToFileCopyLatch, replicaId.getPartitionId().toPathString()); + } + @Override public void onDeactivationComplete(ReplicaId replicaId) { partitionToDeactivationSuccess.put(replicaId.getPartitionId().toPathString(), true); diff --git a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java index e94b5b9260..eeab9e76bc 100644 --- a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java +++ b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java @@ -870,6 +870,18 @@ public void onPartitionBecomeBootstrapFromOffline(String partitionName) { if (storageManagerListener != null) { storageManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); } + + /** + * Should be invoked after storage manager listener to ensure that the replica is added to the store. + * Conditional execution based on requirement for File Copy. + */ + PartitionStateChangeListener fileCopyManagerListener = + partitionStateChangeListeners.get(StateModelListenerType.FileCopyManagerListener); + if(fileCopyManagerListener != null){ + fileCopyManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); + replicaSyncUpManager.waitForFileCopyCompleted(partitionName); + } + // 2. take actions in replication manager (add new replica if necessary) PartitionStateChangeListener replicationManagerListener = partitionStateChangeListeners.get(StateModelListenerType.ReplicationManagerListener); @@ -882,6 +894,11 @@ public void onPartitionBecomeBootstrapFromOffline(String partitionName) { if (statsManagerListener != null) { statsManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); } + } catch (InterruptedException e) { + //TODO: Handle the exception more gracefully. + logger.error("Bootstrap was interrupted on partition {}", partitionName); + localPartitionAndState.put(partitionName, ReplicaState.ERROR); + throw new StateTransitionException("Bootstrap failed or was interrupted", BootstrapFailure); } catch (Exception e) { localPartitionAndState.put(partitionName, ReplicaState.ERROR); throw e; diff --git a/ambry-file-transfer/build.gradle b/ambry-file-transfer/build.gradle new file mode 100644 index 0000000000..3f5b11c5d3 --- /dev/null +++ b/ambry-file-transfer/build.gradle @@ -0,0 +1,19 @@ +plugins { + id 'java' +} + +group = 'com.github.ambry' +version = '0.4.512' + +repositories { + mavenCentral() +} + +dependencies { + testImplementation platform('org.junit:junit-bom:5.9.1') + testImplementation 'org.junit.jupiter:junit-jupiter' +} + +test { + useJUnitPlatform() +} \ No newline at end of file diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/AmbryFileCopyThread.java b/ambry-file-transfer/src/main/java/com/github/ambry/AmbryFileCopyThread.java new file mode 100644 index 0000000000..44234690d3 --- /dev/null +++ b/ambry-file-transfer/src/main/java/com/github/ambry/AmbryFileCopyThread.java @@ -0,0 +1,113 @@ +package com.github.ambry; + +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.DataNodeId; +import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.clustermap.ReplicaSyncUpManager; +import com.github.ambry.config.FileCopyConfig; +import com.github.ambry.network.NetworkClient; +import com.github.ambry.network.RequestInfo; +import com.github.ambry.network.ResponseInfo; +import com.github.ambry.protocol.RequestOrResponse; +import com.github.ambry.protocol.RequestOrResponseType; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + + +public class AmbryFileCopyThread implements Runnable{ + private final ReentrantLock lock = new ReentrantLock(); + + private volatile boolean running; + private final Map replicaToReplicaOperationTrackerMap; + private final String threadName; + private final NetworkClient networkClient; + private final ClusterMap clusterMap; + private final DataNodeId dataNodeId; + private final FileCopyConfig fileCopyConfig; + private final FileCopyMetrics fileCopyMetrics; + private final ReplicaSyncUpManager replicaSyncUpManager; + private final AtomicInteger correlationIdGenerator; + + public AmbryFileCopyThread(String threadName, NetworkClient networkClient, ClusterMap clusterMap, + AtomicInteger correlationIdGenerator, DataNodeId dataNodeId, FileCopyConfig fileCopyConfig, + FileCopyMetrics fileCopyMetrics, ReplicaSyncUpManager replicaSyncUpManager) { + this.threadName = threadName; + this.networkClient = networkClient; + this.clusterMap = clusterMap; + this.dataNodeId = dataNodeId; + this.fileCopyConfig = fileCopyConfig; + this.fileCopyMetrics = fileCopyMetrics; + this.replicaSyncUpManager = replicaSyncUpManager; + this.correlationIdGenerator = correlationIdGenerator; + this.replicaToReplicaOperationTrackerMap = new HashMap<>(); + if(!running) + running = true; + } + @Override + public void run() { + while (running) { + doChunkBasedReplication(); + } + // This is a dummy implementation. You can replace this with your own implementation. + System.out.println("Copying file from source to destination"); + } + //write thread safe code for add replica method + public void addReplica(ReplicaId replicaId, ReplicaOperationTracker replicaOperationTracker) { + lock.lock(); + try { + if(replicaToReplicaOperationTrackerMap.containsKey(replicaId)){ + throw new IllegalArgumentException("Replica already exists: " + replicaId); + }else{ + replicaToReplicaOperationTrackerMap.put(replicaId, replicaOperationTracker); + } + } finally { + System.out.println("Adding replica: " + replicaId); + lock.unlock(); + } + } + + + + private void doChunkBasedReplication(){ + + List requestInfos = pollForRequests(); + if(requestInfos.isEmpty()){ + return; + } + } + + private void onResponses(List responseInfos, Map correlationIdToRequest) { + lock.lock(); + try { + for (ResponseInfo responseInfo : responseInfos) { +// RequestInfo requestInfo = correlationIdToRequest.remove(responseInfo.getRequestInfo().getRequest().getCorrelationId()); +// if (requestInfo == null) { +// throw new IllegalStateException("Received a response that does not have a corresponding request"); +// } +// ReplicaOperationTracker replicaOperationTracker = replicaToReplicaOperationTrackerMap.get(requestInfo.getReplicaId()); +// if (replicaOperationTracker == null) { +// throw new IllegalStateException("ReplicaOperationTracker not found for replica " + requestInfo.getReplicaId()); +// } +// replicaOperationTracker.onResponse(responseInfo); + } + } finally { + lock.unlock(); + } + } + + private List pollForRequests(){ + lock.lock(); + try { + return replicaToReplicaOperationTrackerMap.values().stream().map(ReplicaOperationTracker::pollForRequests) + .collect(Collectors.toList()); + } finally { + lock.unlock(); + } + } +} diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/FileCopyController.java b/ambry-file-transfer/src/main/java/com/github/ambry/FileCopyController.java new file mode 100644 index 0000000000..5185d2dfe2 --- /dev/null +++ b/ambry-file-transfer/src/main/java/com/github/ambry/FileCopyController.java @@ -0,0 +1,77 @@ +package com.github.ambry; + +import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.commons.ResponseHandler; +import com.github.ambry.server.StoreManager; +import com.github.ambry.store.StoreKeyConverter; +import com.github.ambry.store.Transformer; +import com.github.ambry.utils.Utils; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + + +public class FileCopyController { + private Map diskToInFlightPartitionsMap; + + private Map replicaToReplicaOperationTracker; + + private final List listOfDisksToBeHydrated; + + private boolean isRunning = false; + PrioritisationManager _prioritisationManager; + + public FileCopyController(PrioritisationManager prioritisationManager, StoreManager storeManager) { + this._prioritisationManager = prioritisationManager; + diskToInFlightPartitionsMap = new HashMap<>(); + if (!_prioritisationManager.isRunning()) { + //TODO: throw Exception. + } + listOfDisksToBeHydrated = new ArrayList<>(); + } + + public void start() throws InterruptedException { + if (!isRunning) isRunning = true; + + while (true) { + Thread.sleep(60000); + /** + * Check if a new disk is to be added to the list of disks to be hydrated. + */ + if (_prioritisationManager.getListOfDisks().size() > 0) { + for (String disk : _prioritisationManager.getListOfDisks()) { + if (!listOfDisksToBeHydrated.contains(disk)) { + listOfDisksToBeHydrated.add(disk); + } + } + } + + //iterate over each item in diskToInFlightPartitionsMap + for (Map.Entry entry : diskToInFlightPartitionsMap.entrySet()) { + ReplicaId replicaId = entry.getValue(); + + if (listOfDisksToBeHydrated.contains(replicaId)) { + + } + } + } + } + + public void fileCopyAndStateBuild(String partitionName){ + //complete file copy For partitionName + //build state for partitionName + } + + protected List createThreadPool(String datacenter, int numberOfThreads, boolean startThread) { + + return new ArrayList<>(); + } + public void shutdown() { + for(Map.Entry entry : replicaToReplicaOperationTracker.entrySet()) { + entry.getValue().shutdown(); + } + isRunning = false; + } +} diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/FileCopyManager.java b/ambry-file-transfer/src/main/java/com/github/ambry/FileCopyManager.java new file mode 100644 index 0000000000..83dc14efad --- /dev/null +++ b/ambry-file-transfer/src/main/java/com/github/ambry/FileCopyManager.java @@ -0,0 +1,91 @@ +package com.github.ambry; + +import com.codahale.metrics.MetricRegistry; +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.ClusterParticipant; +import com.github.ambry.clustermap.DataNodeId; +import com.github.ambry.clustermap.PartitionStateChangeListener; +import com.github.ambry.clustermap.StateModelListenerType; +import com.github.ambry.config.ClusterMapConfig; +import com.github.ambry.config.FileCopyConfig; +import com.github.ambry.config.StoreConfig; +import com.github.ambry.network.NetworkClientFactory; +import com.github.ambry.server.StoreManager; +import com.github.ambry.store.Store; +import com.github.ambry.store.StoreKeyFactory; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +public class FileCopyManager { + + protected final Logger logger = LoggerFactory.getLogger(getClass()); + protected final PrioritisationManager prioritisationManager; + + private final FileCopyController fileCopyController; + + private final StoreManager storeManager; + + public FileCopyManager(PrioritisationManager prioritisationManager, FileCopyConfig fileCopyConfig, ClusterMapConfig clusterMapConfig, + StoreConfig storeConfig, StoreManager storeManager, StoreKeyFactory storeKeyFactory, ClusterMap clusterMap, + ScheduledExecutorService scheduler, DataNodeId dataNode, NetworkClientFactory networkClientFactory, + MetricRegistry metricRegistry, ClusterParticipant clusterParticipant) throws InterruptedException { + if (clusterParticipant != null) { + clusterParticipant.registerPartitionStateChangeListener(StateModelListenerType.FileCopyManagerListener, + new PartitionStateChangeListenerImpl()); + logger.info("File Copy Manager's state change listener registered!"); + } + this.prioritisationManager = prioritisationManager; + if(!prioritisationManager.isRunning()) { + prioritisationManager.start(); + } + + fileCopyController = new FileCopyController(prioritisationManager, storeManager); + fileCopyController.start(); + this.storeManager = storeManager; + } + public void start() throws InterruptedException, IOException { + + } + class PartitionStateChangeListenerImpl implements PartitionStateChangeListener { + + @Override + public void onPartitionBecomeBootstrapFromOffline(String partitionName) { + if(storeManager.getReplica(partitionName) == null){ + storeManager.setUpReplica(partitionName); + } + //prioritisationManager.addReplica(partitionName); + } + + @Override + public void onPartitionBecomeStandbyFromBootstrap(String partitionName) { + + } + + @Override + public void onPartitionBecomeLeaderFromStandby(String partitionName) { + + } + + @Override + public void onPartitionBecomeStandbyFromLeader(String partitionName) { + + } + + @Override + public void onPartitionBecomeInactiveFromStandby(String partitionName) { + + } + + @Override + public void onPartitionBecomeOfflineFromInactive(String partitionName) { + + } + + @Override + public void onPartitionBecomeDroppedFromOffline(String partitionName) { + + } + } +} diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/FileCopyMetrics.java b/ambry-file-transfer/src/main/java/com/github/ambry/FileCopyMetrics.java new file mode 100644 index 0000000000..33614356ab --- /dev/null +++ b/ambry-file-transfer/src/main/java/com/github/ambry/FileCopyMetrics.java @@ -0,0 +1,4 @@ +package com.github.ambry; + +public class FileCopyMetrics { +} diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/ReplicaOperationTracker.java b/ambry-file-transfer/src/main/java/com/github/ambry/ReplicaOperationTracker.java new file mode 100644 index 0000000000..b99b26e09a --- /dev/null +++ b/ambry-file-transfer/src/main/java/com/github/ambry/ReplicaOperationTracker.java @@ -0,0 +1,131 @@ +package com.github.ambry; + +import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.network.NetworkClientErrorCode; +import com.github.ambry.network.RequestInfo; +import com.github.ambry.network.ResponseInfo; +import com.github.ambry.protocol.FileMetaDataResponse; +import com.github.ambry.protocol.ReplicaMetadataResponse; +import com.github.ambry.protocol.RequestOrResponse; +import com.github.ambry.protocol.RequestOrResponseType; +import com.github.ambry.replication.ReplicationException; +import com.github.ambry.server.ServerErrorCode; +import com.github.ambry.server.StoreManager; +import com.github.ambry.store.FileMetaData; +import com.github.ambry.store.FileStore; +import com.github.ambry.utils.FileLock; +import com.github.ambry.utils.NettyByteBufDataInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ReplicaOperationTracker { + private final ReplicaId replicaId; + private boolean isRunning =false; + private final HashMap fileToSegmentOperationTrackerMap; + private RequestInfo requestsToSend; + + + + private String dataNode; + private static final Logger logger = LoggerFactory.getLogger(ReplicaOperationTracker.class); + private FileCopyBasedReplicationState state; + + private final StoreManager storeManager; + + public ReplicaOperationTracker(ReplicaId replicaId, StoreManager storeManager, FileCopyMetrics fileCopyMetrics) { + this.replicaId = replicaId; + requestsToSend = new RequestInfo(); + fileToSegmentOperationTrackerMap = new HashMap<>(); + this.storeManager = storeManager; + } + + public void start(){ + isRunning = true; + + } + + public boolean isCompleted(){ + return false; + } + + public RequestInfo pollForRequests(){ + return requestsToSend; + } + + public void handleMetaDataResponse(ResponseInfo responseInfo) { + if (state != FileCopyBasedReplicationState.META_DATA_REQUESTED) { +// logger.error("Remote node: {} Thread name: {} ReplicaMetadataResponse comes back after wrong state {}", +// remoteDataNode, threadName, state); + return; + } + + NetworkClientErrorCode networkClientErrorCode = responseInfo.getError(); + //Todo: Log with remoteDataNode + /** + * logger.trace( + * "Remote node: {} Thread name: {} RemoteReplicaGroup {} ReplicaMetadataResponse come back for correlation id {}, NetworkClientError {}", + * responseInfo.getRequestInfo().getRequest()., responseInfo.getRequestInfo().getRequest().getCorrelationId(), + * networkClientErrorCode); + */ + if(networkClientErrorCode == null){ + //update metric + try{ + DataInputStream dis = new NettyByteBufDataInputStream(responseInfo.content()); + FileMetaDataResponse response = FileMetaDataResponse.readFrom(dis); + FileStore fileStore = storeManager.getFileStore(replicaId.getPartitionId()); + //fileStore.persistMetaDataToFile(x.getFileMetaData()); + /*if(!fileStore.persistMetaDataToFile()){ + throw new IOException("Failed to persist metadata to file"); + state = FileCopyBasedReplicationState.META_DATA_REQUESTED_RESPONSE_HANDLED; + }*/ + + + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + public void handleChunkResponse(ResponseInfo responseInfo){ + SegmentOperationTracker segmentOperationTracker = fileToSegmentOperationTrackerMap.get(responseInfo.getRequestInfo().getRequest().getCorrelationId()); + if(segmentOperationTracker == null){ + throw new IllegalStateException("SegmentOperationTracker not found for correlationId " + responseInfo.getRequestInfo().getRequest().getCorrelationId()); + } + segmentOperationTracker.onResponse(responseInfo); + } + + + public void onResponse(ResponseInfo responseInfo){ + RequestOrResponseType type = ((RequestOrResponse) responseInfo.getRequestInfo().getRequest()).getRequestType(); + switch (type) { + case FileMetaDataRequest: + handleMetaDataResponse(responseInfo); + break; + case FileChunkRequest: + //handleChunkResponse(responseInfo); + break; + } + } + public void shutdown(){ + isRunning = false; + } + + enum FileCopyBasedReplicationState{ + STARTED, + META_DATA_REQUESTED, + META_DATA_REQUESTED_RESPONSE_HANDLED, + META_DATA_REQUESTED_RESPONSE_ERROR, + CHUNK_REQUESTED, + CHUNK_REQUESTED_RESPONSE_HANDLED, + DONE + } +} diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/SegmentController.java b/ambry-file-transfer/src/main/java/com/github/ambry/SegmentController.java new file mode 100644 index 0000000000..8a34bf1592 --- /dev/null +++ b/ambry-file-transfer/src/main/java/com/github/ambry/SegmentController.java @@ -0,0 +1,45 @@ +package com.github.ambry; + +import com.github.ambry.network.RequestInfo; +import java.io.File; + + +public class SegmentController { + private final SegmentOperationTracker segmentOperationTracker; + private final ReplicaOperationTracker replicaOperationTracker; + + private final String dataDir; + + public SegmentController(SegmentOperationTracker segmentOperationTracker, ReplicaOperationTracker replicaOperationTracker) { + this.segmentOperationTracker = segmentOperationTracker; + this.replicaOperationTracker = replicaOperationTracker; + this.dataDir = ""; + } + + public void start() { + replicaOperationTracker.start(); + } + + public boolean isCompleted() { + return replicaOperationTracker.isCompleted(); + } + + //public boolean write + + public boolean createNewFile(String dataDir, String fileName){ + File newFile= new File(dataDir, fileName); + try { + return newFile.createNewFile(); + } catch (Exception e) { + return false; + } + } + + public RequestInfo pollForRequests() { + return replicaOperationTracker.pollForRequests(); + } + + public void handleMetaDataResponse(ResponseInfo responseInfo) { + replicaOperationTracker.handleMetaDataResponse(responseInfo); + } +} diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/SegmentOperationTracker.java b/ambry-file-transfer/src/main/java/com/github/ambry/SegmentOperationTracker.java new file mode 100644 index 0000000000..c1ddb7322a --- /dev/null +++ b/ambry-file-transfer/src/main/java/com/github/ambry/SegmentOperationTracker.java @@ -0,0 +1,49 @@ +package com.github.ambry; + +import com.github.ambry.network.ResponseInfo; +import com.sun.org.apache.xpath.internal.operations.Bool; + + +public class SegmentOperationTracker { + private String fileName; + private String partitionName; + private boolean isFileHydrationComplete = false; + + private long currentOffset; + + public SegmentOperationTracker(String fileName, String partitionName){ + this.fileName = fileName; + this.partitionName = partitionName; + + } + + public String getFileName(){ + return fileName; + } + + public void setResponse(){} + + public void onResponse(ResponseInfo responseInfo){ + responseInfo.getResponse(); + + } + public String getPartitionName(){ + return partitionName; + } + + public void setFileHydrationComplete(){ + isFileHydrationComplete = true; + } + + public boolean isFileHydrationComplete(){ + return isFileHydrationComplete; + } + + public void writeToFile(){ + + } + + + + +} diff --git a/ambry-prioritisation/build.gradle b/ambry-prioritisation/build.gradle new file mode 100644 index 0000000000..48a562dddd --- /dev/null +++ b/ambry-prioritisation/build.gradle @@ -0,0 +1,19 @@ +plugins { + id 'java' +} + +group = 'com.github.ambry' +version = '0.4.514' + +repositories { + mavenCentral() +} + +dependencies { + testImplementation platform('org.junit:junit-bom:5.9.1') + testImplementation 'org.junit.jupiter:junit-jupiter' +} + +test { + useJUnitPlatform() +} \ No newline at end of file diff --git a/ambry-prioritisation/src/main/java/com/github/ambry/PrioritisationManager.java b/ambry-prioritisation/src/main/java/com/github/ambry/PrioritisationManager.java new file mode 100644 index 0000000000..6b08d4f291 --- /dev/null +++ b/ambry-prioritisation/src/main/java/com/github/ambry/PrioritisationManager.java @@ -0,0 +1,69 @@ +package com.github.ambry; + +import com.github.ambry.clustermap.AmbryPartition; +import com.github.ambry.clustermap.DiskId; +import com.github.ambry.clustermap.ReplicaId; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +public class PrioritisationManager { + private Map diskToReplicaQueue; + + private final List listOfDisks; + private boolean running; + public PrioritisationManager() { + diskToReplicaQueue = new HashMap<>(); + running = false; + listOfDisks = new ArrayList<>(); + } + + public void start() { + running = true; + } + + public boolean isRunning(){ + return running; + // Start the PrioritisationManager + } + + public void shutdown() { + // Shutdown the PrioritisationManager + } + + public void addReplica(String partitionName) { + // Add a replica to the PrioritisationManager + } + + public void removeReplica(String partitionName) { + // Remove a task from the PrioritisationManager + } + + public void updatePartitionState(String partitionName) { + // Update the state of a task in the PrioritisationManager + } + + public void updatePartitionProgress(String partitionName) { + // Update the progress of a task in the PrioritisationManager + } + + public void updatePartitionResult() { + // Update the result of a task in the PrioritisationManager + } + + public List getListOfDisks(){ + return listOfDisks; + } + + public String getPartitionForDisk(DiskId diskId){ + // Get a partition from the PrioritisationManager + return null; + } + + public String getReplica(String partitionName) { + // Get a replica from the PrioritisationManager + return null; + } +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkRequest.java new file mode 100644 index 0000000000..a0483174a5 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkRequest.java @@ -0,0 +1,93 @@ +package com.github.ambry.protocol; + +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.utils.Utils; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.charset.Charset; + + +public class FileChunkRequest extends RequestOrResponse{ + + private String hostName; + private PartitionId partitionId; + private String fileName; + private long startOffset; + private long size; + + private static final short File_Chunk_Request_Version_V1 = 1; + + private static final int HostName_Field_Size_In_Bytes = 4; + private static final int FileName_Field_Size_In_Bytes = 4; + + public FileChunkRequest(short versionId, int correlationId, String clientId,PartitionId partitionId, String hostName, String fileName, long startOffset, long size) { + super(RequestOrResponseType.FileChunkRequest, versionId, correlationId, clientId); + this.hostName = hostName; + this.partitionId = partitionId; + this.fileName = fileName; + this.startOffset = startOffset; + this.size = size; + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("FileChunkRequest[").append(", HostName=").append(hostName).append("PartitionId=").append(partitionId) + .append(", FileName=").append(fileName).append(", StartOffset=").append(startOffset).append(", Size=").append(size).append("]"); + return sb.toString(); + } + + public String getHostName() { + return hostName; + } + + public PartitionId getPartitionId() { + return partitionId; + } + + public String getFileName() { + return fileName; + } + + public long getStartOffset() { + return startOffset; + } + + public long getSize() { + return size; + } + + public long sizeInBytes() { + return super.sizeInBytes() + HostName_Field_Size_In_Bytes + hostName.length() + partitionId.getBytes().length + + FileName_Field_Size_In_Bytes + fileName.length() + Long.BYTES + Long.BYTES; + } + + protected void prepareBuffer() { + super.prepareBuffer(); + + Utils.serializeString(bufferToSend, hostName, Charset.defaultCharset()); + Utils.serializeString(bufferToSend, fileName, Charset.defaultCharset()); + bufferToSend.writeBytes(partitionId.getBytes()); + bufferToSend.writeLong(startOffset); + bufferToSend.writeLong(size); + } + + protected static FileChunkRequest readFrom(DataInputStream stream, ClusterMap clusterMap) throws IOException { + Short versionId = stream.readShort(); + validateVersion(versionId); + int correlationId = stream.readInt(); + String clientId = Utils.readIntString(stream); + String hostName = Utils.readIntString(stream); + PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream); + String fileName = Utils.readIntString(stream); + long startOffset = stream.readLong(); + long size = stream.readLong(); + return new FileChunkRequest(versionId, correlationId, clientId, partitionId, hostName, fileName, startOffset, size); + } + + static void validateVersion(short version) { + if (version != File_Chunk_Request_Version_V1) { + throw new IllegalArgumentException("Unknown version for FileChunkRequest: " + version); + } + } +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkResponse.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkResponse.java new file mode 100644 index 0000000000..ff9ec85ee7 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkResponse.java @@ -0,0 +1,5 @@ +package com.github.ambry.protocol; + +public class FileChunkResponse { + //to be Filled After confirmation +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java new file mode 100644 index 0000000000..6574b18d68 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java @@ -0,0 +1,51 @@ +package com.github.ambry.protocol; + +import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.charset.Charset; + + +public class FileInfo{ + private String fileName; + private long fileSizeInBytes; + + private static final int FileName_Field_Size_In_Bytes = 4; + + private static final int FileSize_Field_Size_In_Bytes = 8; + + + public FileInfo(String fileName, long fileSize) { + this.fileName = fileName; + this.fileSizeInBytes = fileSize; + } + + public long sizeInBytes() { + return FileName_Field_Size_In_Bytes + fileName.length() + FileSize_Field_Size_In_Bytes; + } + public static FileInfo readFrom(DataInputStream stream) throws IOException { + String fileName = Utils.readIntString(stream); + long fileSize = stream.readLong(); + return new FileInfo(fileName, fileSize); + } + public void writeTo(ByteBuf buf) { + Utils.serializeString(buf, fileName, Charset.defaultCharset()); + buf.writeLong(fileSizeInBytes); + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("FileInfo[").append("FileName=").append(fileName).append(", FileSizeInBytes=").append(fileSizeInBytes) + .append("]"); + return sb.toString(); + } + + public String getFileName() { + return fileName; + } + + public long getFileSizeInBytes() { + return fileSizeInBytes; + } +} \ No newline at end of file diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataRequest.java new file mode 100644 index 0000000000..703ebbd1a4 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataRequest.java @@ -0,0 +1,68 @@ +package com.github.ambry.protocol; + +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.utils.Utils; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.charset.Charset; + + +public class FileMetaDataRequest extends RequestOrResponse{ + private PartitionId partitionId; + private String hostName; + + private static final short File_Metadata_Request_Version_V1 = 1; + private static final int HostName_Field_Size_In_Bytes = 4; + + public FileMetaDataRequest(short versionId, int correlationId, String clientId, + PartitionId partitionId, String hostName) { + super(RequestOrResponseType.FileMetaDataRequest, versionId, correlationId, clientId); + if (partitionId == null || hostName.isEmpty()) { + throw new IllegalArgumentException("Partition and Host Name cannot be null"); + } + this.partitionId = partitionId; + this.hostName = hostName; + } + + public String getHostName() { + return hostName; + } + + public PartitionId getPartitionId() { + return partitionId; + } + + protected static FileMetaDataRequest readFrom(DataInputStream stream, ClusterMap clusterMap) throws IOException { + Short versionId = stream.readShort(); + validateVersion(versionId); + int correlationId = stream.readInt(); + String clientId = Utils.readIntString(stream); + String hostName = Utils.readIntString(stream); + PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream); + return new FileMetaDataRequest(versionId, correlationId, clientId, partitionId, hostName); + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("FileMetaDataRequest[").append("PartitionId=").append(partitionId).append(", HostName=").append(hostName) + .append("]"); + return sb.toString(); + } + + public long sizeInBytes() { + return super.sizeInBytes() + HostName_Field_Size_In_Bytes + hostName.length() + partitionId.getBytes().length; + } + + protected void prepareBuffer() { + super.prepareBuffer(); + Utils.serializeString(bufferToSend, hostName, Charset.defaultCharset()); + bufferToSend.writeBytes(partitionId.getBytes()); + } + + static void validateVersion(short version) { + if (version != File_Metadata_Request_Version_V1) { + throw new IllegalArgumentException("Unknown version for FileMetadataRequest: " + version); + } + } +} \ No newline at end of file diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataResponse.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataResponse.java new file mode 100644 index 0000000000..c9d8deb791 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataResponse.java @@ -0,0 +1,64 @@ +package com.github.ambry.protocol; +import com.github.ambry.server.ServerErrorCode; +import com.github.ambry.utils.Utils; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + + +public class FileMetaDataResponse extends Response { + private int numberOfLogfiles; + private List logInfoList; + + public FileMetaDataResponse(short versionId, int correlationId, String clientId, int numberOfLogfiles, + List logInfoList, ServerErrorCode errorCode) { + super(RequestOrResponseType.FileMetaDataResponse, versionId, correlationId, clientId, errorCode); + this.numberOfLogfiles = numberOfLogfiles; + this.logInfoList = logInfoList; + } + + public static FileMetaDataResponse readFrom(DataInputStream stream) throws IOException { + RequestOrResponseType type = RequestOrResponseType.values()[stream.readShort()]; + if (type != RequestOrResponseType.FileMetaDataResponse) { + throw new IllegalArgumentException("The type of request response is not compatible"); + } + short versionId = stream.readShort(); + int correlationId = stream.readInt(); + String clientId = Utils.readIntString(stream); + ServerErrorCode errorCode = ServerErrorCode.values()[stream.readShort()]; + int numberOfLogfiles = stream.readInt(); + int logInfoListSize = stream.readInt(); + List logInfoList = new ArrayList<>(); + for (int i = 0; i < logInfoListSize; i++) { + logInfoList.add(LogInfo.readFrom(stream)); + } + return new FileMetaDataResponse(versionId, correlationId, clientId, numberOfLogfiles, logInfoList, errorCode); + } + protected void prepareBuffer() { + super.prepareBuffer(); + bufferToSend.writeInt(numberOfLogfiles); + bufferToSend.writeInt(logInfoList.size()); + for (LogInfo logInfo : logInfoList) { + logInfo.writeTo(bufferToSend); + } + } + + public long sizeInBytes() { + return super.sizeInBytes() + Integer.BYTES + Integer.BYTES + logInfoList.stream().mapToLong(LogInfo::sizeInBytes).sum(); + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("FileMetaDataResponse[NumberOfLogfiles=").append(numberOfLogfiles).append(", logInfoList").append(logInfoList.toString()).append("]"); + return sb.toString(); + } + + public int getNumberOfLogfiles() { + return numberOfLogfiles; + } + + public List getLogInfoList() { + return logInfoList; + } +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java new file mode 100644 index 0000000000..54a53045a5 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java @@ -0,0 +1,103 @@ +package com.github.ambry.protocol; + +import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + + +public class LogInfo { + private String fileName; + private long fileSizeInBytes; + List listOfIndexFiles; + List listOfBloomFilters; + + private static final int FileName_Field_Size_In_Bytes = 4; + private static final int FileSize_Field_Size_In_Bytes = 8; + + private static final int ListSize_In_Bytes = 4; + public LogInfo(String fileName, long fileSizeInBytes, List listOfIndexFiles, List listOfBloomFilters) { + this.fileName = fileName; + this.fileSizeInBytes = fileSizeInBytes; + this.listOfIndexFiles = listOfIndexFiles; + this.listOfBloomFilters = listOfBloomFilters; + } + + public String getFileName() { + return fileName; + } + + public long getFileSizeInBytes() { + return fileSizeInBytes; + } + + public List getListOfBloomFilters() { + return listOfBloomFilters; + } + + public List getListOfIndexFiles() { + return listOfIndexFiles; + } + + public long sizeInBytes() { + long size = FileName_Field_Size_In_Bytes + fileName.length() + FileSize_Field_Size_In_Bytes + ListSize_In_Bytes; + for (FileInfo fileInfo : listOfIndexFiles) { + size += fileInfo.sizeInBytes(); + } + size += ListSize_In_Bytes; + for (FileInfo fileInfo : listOfBloomFilters) { + size += fileInfo.sizeInBytes(); + } + return size; + } + + public static LogInfo readFrom(DataInputStream stream) throws IOException { + String fileName = Utils.readIntString(stream ); + long fileSize = stream.readLong(); + List listOfIndexFiles = new ArrayList<>(); + List listOfBloomFilters = new ArrayList<>(); + + int indexFilesCount = stream.readInt(); + for (int i = 0; i < indexFilesCount; i++) { + listOfIndexFiles.add(FileInfo.readFrom(stream)); + } + + int bloomFiltersCount = stream.readInt(); + for(int i= 0;i< bloomFiltersCount; i++){ + listOfBloomFilters.add(FileInfo.readFrom(stream)); + } + return new LogInfo(fileName, fileSize, listOfIndexFiles, listOfBloomFilters); + } + + public void writeTo(ByteBuf buf){ + Utils.serializeString(buf, fileName, Charset.defaultCharset()); + buf.writeLong(fileSizeInBytes); + buf.writeInt(listOfIndexFiles.size()); + for(FileInfo fileInfo : listOfIndexFiles){ + fileInfo.writeTo(buf); + } + buf.writeInt(listOfBloomFilters.size()); + for(FileInfo fileInfo: listOfBloomFilters){ + fileInfo.writeTo(buf); + } + } + + public String toString(){ + StringBuilder sb = new StringBuilder(); + sb.append("LogInfo["); + sb.append("FileName=").append(fileName).append(", FileSizeInBytes=").append(fileSizeInBytes).append(","); + for(FileInfo fileInfo : listOfIndexFiles) { + sb.append(fileInfo.toString()); + } + for(FileInfo fileInfo: listOfBloomFilters){ + sb.append(fileInfo.toString()); + } + sb.append("]"); + return sb.toString(); + } + + +} \ No newline at end of file diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java index 115562bd9a..89078977ec 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java @@ -37,5 +37,11 @@ public enum RequestOrResponseType { PurgeRequest, PurgeResponse, BatchDeleteRequest, - BatchDeleteResponse + BatchDeleteResponse, + + FileMetaDataRequest, + FileMetaDataResponse, + FileChunkRequest, + FileChunkResponse, + } diff --git a/ambry-replication/src/test/java/com/github/ambry/replication/InMemoryStore.java b/ambry-replication/src/test/java/com/github/ambry/replication/InMemoryStore.java index bf939b9d15..5521788b3a 100644 --- a/ambry-replication/src/test/java/com/github/ambry/replication/InMemoryStore.java +++ b/ambry-replication/src/test/java/com/github/ambry/replication/InMemoryStore.java @@ -452,6 +452,11 @@ public boolean isBootstrapInProgress() { throw new UnsupportedOperationException("Method not supported"); } + @Override + public boolean isFileCopyInProgress() { + return false; + } + @Override public boolean isDecommissionInProgress() { throw new UnsupportedOperationException("Method not supported"); diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java index a3d5553abc..73a4a182ba 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java @@ -234,6 +234,12 @@ public void handleRequests(NetworkRequest networkRequest) throws InterruptedExce case ReplicateBlobRequest: handleReplicateBlobRequest(networkRequest); break; + case FileMetaDataRequest: + handleFileMetaDataRequest(networkRequest); + break; + case FileChunkRequest: + handleFileChunkRequest(networkRequest); + break; default: throw new UnsupportedOperationException("Request type not supported"); } @@ -329,6 +335,8 @@ public void handlePutRequest(NetworkRequest request) throws IOException, Interru metrics); } + + @Override public void handleGetRequest(NetworkRequest request) throws IOException, InterruptedException { GetRequest getRequest; @@ -1672,6 +1680,15 @@ public void handleUndeleteRequest(NetworkRequest request) throws IOException, In metrics.undeleteBlobTotalTimeInMs, null, null, totalTimeSpent)); } + @Override + public void handleFileMetaDataRequest(NetworkRequest request) throws InterruptedException, IOException { + //geStore, Call Blob Store APIs here. + } + @Override + public void handleFileChunkRequest(NetworkRequest request) throws InterruptedException, IOException { + + } + /** * Get the formatted messages which needs to be written to Store. * @param receivedRequest received Put Request diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java index 428bb55af2..d2b68e4d7e 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java @@ -15,6 +15,8 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.jmx.JmxReporter; +import com.github.ambry.FileCopyManager; +import com.github.ambry.PrioritisationManager; import com.github.ambry.account.AccountService; import com.github.ambry.account.AccountServiceCallback; import com.github.ambry.account.AccountServiceFactory; @@ -43,6 +45,7 @@ import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.ConnectionPoolConfig; import com.github.ambry.config.DiskManagerConfig; +import com.github.ambry.config.FileCopyConfig; import com.github.ambry.config.Http2ClientConfig; import com.github.ambry.config.NettyConfig; import com.github.ambry.config.NetworkConfig; @@ -223,6 +226,8 @@ public void startup() throws InstantiationException { SSLConfig sslConfig = new SSLConfig(properties); ClusterMapConfig clusterMapConfig = new ClusterMapConfig(properties); StatsManagerConfig statsConfig = new StatsManagerConfig(properties); + FileCopyConfig fileCopyConfig = new FileCopyConfig(properties); + // verify the configs properties.verify(); @@ -288,6 +293,7 @@ public void startup() throws InstantiationException { new StorageManager(storeConfig, diskManagerConfig, scheduler, registry, storeKeyFactory, staticClusterManager, nodeId, new BlobStoreHardDelete(), clusterParticipants, time, new BlobStoreRecovery(), accountService); storageManager.start(); + /** * Backup integrity monitor here because vcr does not have code to store to disk. Only server does. * DataNodeId -> AmbryDataNode -> AmbryServerDataNode : for helix @@ -327,6 +333,13 @@ public void startup() throws InstantiationException { new BlobStoreHardDelete(), clusterParticipants, time, new BlobStoreRecovery(), accountService); storageManager.start(); + PrioritisationManager prioritisationManager= new PrioritisationManager(); + FileCopyManager + fileCopyManager = new FileCopyManager(prioritisationManager, fileCopyConfig, clusterMapConfig, storeConfig, storageManager, storeKeyFactory, + clusterMap, scheduler, nodeId, networkClientFactory, registry, clusterParticipant); + fileCopyManager.start(); + + // if there are more than one participant on local node, we create a consistency checker to monitor and alert any // mismatch in sealed/stopped replica lists that maintained by each participant. if (clusterParticipants != null && clusterParticipants.size() > 1 diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryServerRequests.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryServerRequests.java index b9b1331ec2..d185e52aa4 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryServerRequests.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryServerRequests.java @@ -128,7 +128,9 @@ public class AmbryServerRequests extends AmbryRequests { for (RequestOrResponseType requestType : EnumSet.of(RequestOrResponseType.PutRequest, RequestOrResponseType.GetRequest, RequestOrResponseType.DeleteRequest, RequestOrResponseType.BatchDeleteRequest, RequestOrResponseType.UndeleteRequest, RequestOrResponseType.ReplicaMetadataRequest, - RequestOrResponseType.TtlUpdateRequest)) { + RequestOrResponseType.TtlUpdateRequest, RequestOrResponseType.FileMetaDataRequest, + RequestOrResponseType.FileMetaDataResponse, RequestOrResponseType.FileChunkRequest, + RequestOrResponseType.FileChunkResponse)) { requestsDisableInfo.put(requestType, Collections.newSetFromMap(new ConcurrentHashMap<>())); } StoreKeyJacksonConfig.setupObjectMapper(objectMapper, new BlobIdFactory(clusterMap)); diff --git a/ambry-server/src/test/java/com/github/ambry/server/StatsManagerTest.java b/ambry-server/src/test/java/com/github/ambry/server/StatsManagerTest.java index 8b5ffd4925..e1b7e8ef42 100644 --- a/ambry-server/src/test/java/com/github/ambry/server/StatsManagerTest.java +++ b/ambry-server/src/test/java/com/github/ambry/server/StatsManagerTest.java @@ -669,6 +669,11 @@ public boolean isBootstrapInProgress() { throw new IllegalStateException("Not implemented"); } + @Override + public boolean isFileCopyInProgress() { + return false; + } + @Override public boolean isDecommissionInProgress() { throw new IllegalStateException("Not implemented"); diff --git a/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java b/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java index 3a72a26d31..0ec368df00 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java +++ b/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java @@ -40,6 +40,7 @@ import java.text.SimpleDateFormat; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -56,6 +57,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1318,6 +1320,24 @@ public void shutdown() throws StoreException { shutdown(false); } + public List getSealedLogsAndMetaDataFiles(){ + List logSegments = log.getAllLogSegmentNames().stream().filter(segment -> log.getActiveSegment().getName() != segment) + .map(segment -> log.getSegment(segment)) + .map(segment -> new SealedFileInfo(segment.getName().toString(), segment.getView().getFirst().length())).collect(Collectors.toList()); + return logSegments; + } + + public List getAllIndexSegmentsForALogSegment(String dataDir, LogSegmentName logSegmentName){ + return Arrays.stream(PersistentIndex.getIndexSegmentFilesForLogSegment(dataDir, logSegmentName)) + .map(file -> new SealedFileInfo(file.getName(), file.length())).collect( + Collectors.toList()); + } + + public List getAllBloomFiltersForALogSegment(String dataDir, LogSegmentName logSegmentName){ + return Arrays.stream(PersistentIndex.getIndexAndBloomFilterFiles(dataDir, logSegmentName)) + .map(file -> new SealedFileInfo(file.getName(), file.length())).collect(Collectors.toList()); + } + /** * Update the sealed status of the replica. */ diff --git a/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java b/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java index 5dc34c3412..b5362a695d 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java +++ b/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java @@ -64,6 +64,7 @@ public class DiskManager { private final ConcurrentHashMap stores = new ConcurrentHashMap<>(); + private final ConcurrentHashMap fileStores = new ConcurrentHashMap<>(); private final ConcurrentHashMap partitionToReplicaMap = new ConcurrentHashMap<>(); private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final DiskId disk; @@ -310,7 +311,9 @@ void shutdown() throws InterruptedException { running = false; compactionManager.disable(); diskIOScheduler.disable(); - final AtomicInteger numFailures = new AtomicInteger(0); + final AtomicInteger numStoreFailures = new AtomicInteger(0); + final AtomicInteger numFileStoreFailures = new AtomicInteger(0); + List shutdownThreads = new ArrayList<>(); for (final Map.Entry partitionAndStore : stores.entrySet()) { if (!partitionAndStore.getValue().isStarted()) { @@ -320,7 +323,7 @@ void shutdown() throws InterruptedException { try { partitionAndStore.getValue().shutdown(); } catch (Exception e) { - numFailures.incrementAndGet(); + numStoreFailures.incrementAndGet(); metrics.totalStoreShutdownFailures.inc(); logger.error("Exception while shutting down store {} on disk {}", partitionAndStore.getKey(), disk, e); } @@ -328,11 +331,34 @@ void shutdown() throws InterruptedException { thread.start(); shutdownThreads.add(thread); } + + for (final Map.Entry fileStore: fileStores.entrySet()) { + if(!fileStore.getValue().isRunning()){ + continue; + } + Thread thread = Utils.newThread("file-store-shutdown-" + fileStore.getKey(), () -> { + try { + fileStore.getValue().shutdown(); + } catch (Exception e) { + numFileStoreFailures.incrementAndGet(); + metrics.totalFileStoreShutDownFailures.inc(); + logger.error("Exception while shutting down File store {} on disk {}", fileStore.getKey(), disk, e); + } + }, false); + thread.start(); + shutdownThreads.add(thread); + } + for (Thread shutdownThread : shutdownThreads) { shutdownThread.join(); } - if (numFailures.get() > 0) { - logger.error("Could not shutdown {} out of {} stores on the disk {}", numFailures.get(), stores.size(), disk); + + if (numStoreFailures.get() > 0) { + logger.error("Could not shutdown {} out of {} stores on the disk {}", numStoreFailures.get(), stores.size(), disk); + } + + if (numFileStoreFailures.get() > 0) { + logger.error("Could not shutdown {} out of {} File stores on the disk {}", numFileStoreFailures.get(), fileStores.size(), disk); } compactionManager.awaitTermination(); longLivedTaskScheduler.shutdown(); @@ -471,6 +497,44 @@ boolean addBlobStore(ReplicaId replica) { return succeed; } + + public FileStore getFileStore(PartitionId partitionId){ + return fileStores.get(partitionId); + } + + boolean addFileStore(ReplicaId replica){ + rwLock.writeLock().lock(); + boolean succeed = false; + try{ + if(!running){ + logger.error("Failed to add {} because disk manager is not running", replica.getPartitionId()); + }else{ + File storeDir = new File(replica.getReplicaPath()); + if (storeDir.exists()) { + logger.info("Deleting previous store directory associated with {}", replica); + try { + Utils.deleteFileOrDirectory(storeDir); + } catch (Exception e) { + throw new IOException("Couldn't delete store directory " + replica.getReplicaPath(), e); + } + logger.info("Old store directory is deleted for {}", replica); + } + FileStore fileStore = new FileStore(""); + fileStore.start(); + fileStores.put(replica.getPartitionId(), fileStore); + //createFileCopyProgressFileIfAbsent(replica); + logger.info("New File Store is successfully added into DiskManager."); + succeed = true; + } + } catch (Exception e) { + logger.error("Failed to start new added store {} or add requirements to disk allocator", replica.getPartitionId(), + e); + } finally { + rwLock.writeLock().unlock(); + } + return succeed; + } + /** * Start the BlobStore with given {@link PartitionId} {@code id}. * @param id the {@link PartitionId} of the {@link BlobStore} which should be started. diff --git a/ambry-store/src/main/java/com/github/ambry/store/FileStore.java b/ambry-store/src/main/java/com/github/ambry/store/FileStore.java new file mode 100644 index 0000000000..7183600c7a --- /dev/null +++ b/ambry-store/src/main/java/com/github/ambry/store/FileStore.java @@ -0,0 +1,65 @@ +package com.github.ambry.store; + +import com.github.ambry.clustermap.FileStoreException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.concurrent.ConcurrentHashMap; +import com.github.ambry.clustermap.FileStoreException.FileStoreErrorCode; + + +public class FileStore { + private static boolean isRunning = false; + + private final String dataDir; + + public FileStore(String dataDir){ + this.dataDir = dataDir; + } + + public ConcurrentHashMap fileNameToFileChannelMap; + + public void start() throws StoreException { + isRunning = true; + } + public boolean isRunning() { + return isRunning; + } + public void stop() { + isRunning = false; + } + + public void putChunkToFile(String mountPath, String fileName, ByteBuffer byteBuffer, long offset, long size){ + if(!isRunning){ + throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure); + } + if(byteBuffer == null){ + throw new IllegalArgumentException("ByteBuffer is null"); + } + FileChannel currentFileBuffer = fileNameToFileChannelMap.get(fileName); + if(currentFileBuffer == null){ + throw new IllegalArgumentException("File not found"); + } + + //long currentOffset = + } + + + public void persistMetaDataToFile(String mountPath, String fileName, ByteBuffer byteBuffer, long offset, long size){ + if(!isRunning){ + throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure); + } + if(byteBuffer == null){ + throw new IllegalArgumentException("ByteBuffer is null"); + } + FileChannel currentFileBuffer = fileNameToFileChannelMap.get(fileName); + if(currentFileBuffer == null){ + throw new IllegalArgumentException("File not found"); + } + + //long currentOffset = + } + + public void shutdown(){ + return; + } +} diff --git a/ambry-store/src/main/java/com/github/ambry/store/Log.java b/ambry-store/src/main/java/com/github/ambry/store/Log.java index 4d76814d1d..c5a8fe9c81 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/Log.java +++ b/ambry-store/src/main/java/com/github/ambry/store/Log.java @@ -298,6 +298,10 @@ LogSegment getSegment(LogSegmentName name) { return segmentsByName.get(name); } + + LogSegment getActiveSegment() { + return activeSegment; + } /** * @return the end offset of the log abstraction. */ diff --git a/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java b/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java index 0290013cf1..445420f71f 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java +++ b/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java @@ -2677,6 +2677,18 @@ public boolean accept(File dir, String name) { */ static void cleanupIndexSegmentFilesForLogSegment(String dataDir, final LogSegmentName logSegmentName) throws StoreException { + File[] filesToCleanup = getIndexAndBloomFilterFiles(dataDir, logSegmentName); + if (filesToCleanup == null) { + throw new StoreException("Failed to list index segment files", StoreErrorCodes.IOError); + } + for (File file : filesToCleanup) { + if (!file.delete()) { + throw new StoreException("Could not delete file named " + file, StoreErrorCodes.Unknown_Error); + } + } + } + + static File[] getIndexAndBloomFilterFiles(String dataDir, LogSegmentName logSegmentName){ File[] filesToCleanup = new File(dataDir).listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { @@ -2684,20 +2696,12 @@ public boolean accept(File dir, String name) { return (name.endsWith(IndexSegment.INDEX_SEGMENT_FILE_NAME_SUFFIX) || name.endsWith( IndexSegment.BLOOM_FILE_NAME_SUFFIX)); } else { - return name.startsWith(logSegmentName.toString() + BlobStore.SEPARATOR) && ( - name.endsWith(IndexSegment.INDEX_SEGMENT_FILE_NAME_SUFFIX) || name.endsWith( - IndexSegment.BLOOM_FILE_NAME_SUFFIX)); + return name.startsWith(logSegmentName.toString() + BlobStore.SEPARATOR) && (name.endsWith(IndexSegment.INDEX_SEGMENT_FILE_NAME_SUFFIX) || name.endsWith( + IndexSegment.BLOOM_FILE_NAME_SUFFIX)); } } }); - if (filesToCleanup == null) { - throw new StoreException("Failed to list index segment files", StoreErrorCodes.IOError); - } - for (File file : filesToCleanup) { - if (!file.delete()) { - throw new StoreException("Could not delete file named " + file, StoreErrorCodes.Unknown_Error); - } - } + return filesToCleanup; } class IndexPersistor implements Runnable { diff --git a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java index d3b279e3a1..98b6705b81 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java +++ b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java @@ -361,6 +361,12 @@ public Store getStore(PartitionId id, boolean skipStateCheck) { return diskManager != null ? diskManager.getStore(id, skipStateCheck) : null; } + + public FileStore getFileStore(PartitionId id){ + DiskManager diskManager = partitionToDiskManager.get(id); + return diskManager != null ? diskManager.getFileStore(id) : null; + } + /** * True is the replica is on a failed disk * @param replicaId @@ -514,6 +520,7 @@ DiskManager addDisk(DiskId diskId) { }); } + @Override public boolean addBlobStore(ReplicaId replica) { if (partitionToDiskManager.containsKey(replica.getPartitionId())) { @@ -531,6 +538,50 @@ public boolean addBlobStore(ReplicaId replica) { return true; } + @Override + public boolean addFileStore(ReplicaId replicaId) { + return false; + } + + @Override + public void setUpReplica(String partitionName) { + // check if partition exists on current node + ReplicaId replica = partitionNameToReplicaId.get(partitionName); + Store store; + if (replica == null) { + ReplicaId replicaToAdd; + boolean replicaAdded = false; + do { + // there can be two scenarios: + // 1. this is the first time to add new replica onto current node; + // 2. last replica addition failed at some point before updating InstanceConfig in Helix + // In either case, we should add replica to current node by calling "addBlobStore(ReplicaId replica)" + replicaToAdd = clusterMap.getBootstrapReplica(partitionName, currentNode); + if (replicaToAdd == null) { + logger.error("No new replica found for partition {} in cluster map", partitionName); + throw new StateTransitionException( + "New replica " + partitionName + " is not found in clustermap for " + currentNode, ReplicaNotFound); + } + // Attempt to add store into storage manager. If store already exists on disk (but not in clustermap), make + // sure old store of this replica is deleted (this store may be created in previous replica addition but failed + // at some point). Then a brand new store associated with this replica should be created and started. + if (!addFileStore(replicaToAdd)) { + // We have decreased the available disk space in HelixClusterManager#getDiskForBootstrapReplica. Increase it + // back since addition of store failed. + replicaToAdd.getDiskId().increaseAvailableSpaceInBytes(replicaToAdd.getCapacityInBytes()); + + logger.info("Failed to add store {} at location {}. Retrying bootstrapping replica at different location", + partitionName, replicaToAdd.getReplicaPath()); + tryRemoveFailedBootstrapBlobStore(replicaToAdd); + } + } else { + } + + @Override + public void buildStateForFileCopy(String partitionName){ + // no-op + } + /** * If a bootstrap replica fails, try to remove all the files and directories associated with it. * @param replica The failed bootstrap {@link ReplicaId}. diff --git a/ambry-store/src/main/java/com/github/ambry/store/StorageManagerMetrics.java b/ambry-store/src/main/java/com/github/ambry/store/StorageManagerMetrics.java index 9ff05ebe6d..04fc577193 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/StorageManagerMetrics.java +++ b/ambry-store/src/main/java/com/github/ambry/store/StorageManagerMetrics.java @@ -34,6 +34,7 @@ public class StorageManagerMetrics { public final Histogram diskShutdownTimeMs; public final Counter totalStoreStartFailures; public final Counter totalStoreShutdownFailures; + public final Counter totalFileStoreShutDownFailures; public final Counter diskMountPathFailures; public final Counter diskDownCount; public final Counter resumeDecommissionErrorCount; @@ -76,6 +77,7 @@ public StorageManagerMetrics(MetricRegistry registry) { diskShutdownTimeMs = registry.histogram(MetricRegistry.name(DiskManager.class, "DiskShutdownTimeMs")); totalStoreStartFailures = registry.counter(MetricRegistry.name(DiskManager.class, "TotalStoreStartFailures")); totalStoreShutdownFailures = registry.counter(MetricRegistry.name(DiskManager.class, "TotalStoreShutdownFailures")); + totalFileStoreShutDownFailures = registry.counter(MetricRegistry.name(DiskManager.class, "TotalFileStoreShutDownFailures")); diskMountPathFailures = registry.counter(MetricRegistry.name(DiskManager.class, "DiskMountPathFailures")); diskDownCount = registry.counter(MetricRegistry.name(DiskManager.class, "DiskDownCount")); diff --git a/build.gradle b/build.gradle index d790cf130b..419ddea95e 100644 --- a/build.gradle +++ b/build.gradle @@ -334,7 +334,9 @@ project(':ambry-server') { project(':ambry-rest'), project(':ambry-account'), project(':ambry-replication'), - project(':ambry-mysql') + project(':ambry-mysql'), + project(':ambry-prioritisation'), + project(':ambry-file-transfer') compile "io.dropwizard.metrics:metrics-core:$metricsVersion" compile "io.dropwizard.metrics:metrics-jmx:$metricsVersion" runtimeOnly "org.apache.logging.log4j:log4j-core:$log4jVersion" @@ -532,6 +534,28 @@ project(':ambry-vcr') { } } +project (':ambry-prioritisation') { + dependencies{ + compile project(':ambry-api') + compile project(':ambry-mysql') + compile project(':ambry-commons') + compile project(':ambry-store') + compile project(':ambry-clustermap') + + } +} + +project (':ambry-file-transfer') { + dependencies{ + compile project(':ambry-api') + compile project(':ambry-mysql') + compile project(':ambry-commons') + compile project(':ambry-store') + compile project(':ambry-prioritisation') + } +} + + project(':ambry-named-mysql') { dependencies { compile project(':ambry-api') diff --git a/settings.gradle b/settings.gradle index d19582e993..3887cdca08 100644 --- a/settings.gradle +++ b/settings.gradle @@ -31,4 +31,6 @@ include 'ambry-api', 'ambry-mysql', 'ambry-filesystem', 'ambry-vcr' +include 'ambry-file-transfer' +include 'ambry-prioritisation'