diff --git a/lib/sdk/server/contract-tests/service/src/main/java/sdktest/SdkClientEntity.java b/lib/sdk/server/contract-tests/service/src/main/java/sdktest/SdkClientEntity.java index 17697fb4..22180a59 100644 --- a/lib/sdk/server/contract-tests/service/src/main/java/sdktest/SdkClientEntity.java +++ b/lib/sdk/server/contract-tests/service/src/main/java/sdktest/SdkClientEntity.java @@ -11,6 +11,7 @@ import com.launchdarkly.sdk.server.FlagsStateOption; import com.launchdarkly.sdk.server.LDClient; import com.launchdarkly.sdk.server.LDConfig; +import com.launchdarkly.sdk.server.interfaces.ServiceEndpoints; import com.launchdarkly.sdk.server.migrations.Migration; import com.launchdarkly.sdk.server.migrations.MigrationBuilder; import com.launchdarkly.sdk.server.migrations.MigrationExecution; @@ -25,12 +26,15 @@ import com.launchdarkly.sdk.server.integrations.HooksConfigurationBuilder; import com.launchdarkly.sdk.server.integrations.ServiceEndpointsBuilder; import com.launchdarkly.sdk.server.integrations.StreamingDataSourceBuilder; +import com.launchdarkly.sdk.server.integrations.PollingDataSourceBuilder; import com.launchdarkly.sdk.server.integrations.DataSystemBuilder; import com.launchdarkly.sdk.server.DataSystemComponents; import com.launchdarkly.sdk.server.integrations.FDv2PollingInitializerBuilder; import com.launchdarkly.sdk.server.integrations.FDv2PollingSynchronizerBuilder; import com.launchdarkly.sdk.server.integrations.FDv2StreamingSynchronizerBuilder; import com.launchdarkly.sdk.server.interfaces.BigSegmentStoreStatusProvider; +import com.launchdarkly.sdk.server.subsystems.ComponentConfigurer; +import com.launchdarkly.sdk.server.subsystems.DataSource; import com.launchdarkly.sdk.server.subsystems.DataSourceBuilder; import com.launchdarkly.sdk.server.datasources.Initializer; import com.launchdarkly.sdk.server.datasources.Synchronizer; @@ -563,6 +567,22 @@ private LDConfig buildSdkConfig(SdkConfigParams params, String tag) { } } + // Configure FDv1 fallback synchronizer + SdkConfigSynchronizerParams fallbackSynchronizer = + selectFallbackSynchronizer(params.dataSystem); + if (fallbackSynchronizer != null) { + // Set global polling endpoints if the fallback synchronizer has polling with custom base URI + if (fallbackSynchronizer.polling != null && + fallbackSynchronizer.polling.baseUri != null) { + endpoints.polling(fallbackSynchronizer.polling.baseUri); + } + + // Create and configure FDv1 fallback + ComponentConfigurer fdv1Fallback = + createFDv1FallbackSynchronizer(fallbackSynchronizer); + dataSystemBuilder.fDv1FallbackSynchronizer(fdv1Fallback); + } + builder.dataSystem(dataSystemBuilder); } @@ -601,4 +621,59 @@ private DataSourceBuilder createSynchronizer( } return null; } + + /** + * Selects the best synchronizer configuration to use for FDv1 fallback. + * Prefers polling synchronizers, falls back to primary synchronizer. + */ + private static SdkConfigSynchronizerParams selectFallbackSynchronizer( + SdkConfigDataSystemParams dataSystemParams) { + + // Prefer secondary polling synchronizer + if (dataSystemParams.synchronizers != null && + dataSystemParams.synchronizers.secondary != null && + dataSystemParams.synchronizers.secondary.polling != null) { + return dataSystemParams.synchronizers.secondary; + } + + // Fall back to primary polling synchronizer + if (dataSystemParams.synchronizers != null && + dataSystemParams.synchronizers.primary != null && + dataSystemParams.synchronizers.primary.polling != null) { + return dataSystemParams.synchronizers.primary; + } + + // Fall back to primary synchronizer (even if streaming) + if (dataSystemParams.synchronizers != null && + dataSystemParams.synchronizers.primary != null) { + return dataSystemParams.synchronizers.primary; + } + + return null; + } + + /** + * Creates the FDv1 fallback synchronizer based on the selected synchronizer config. + * FDv1 fallback is always polling-based and uses the global service endpoints configuration. + */ + private static ComponentConfigurer createFDv1FallbackSynchronizer( + SdkConfigSynchronizerParams synchronizer) { + + // FDv1 fallback is always polling-based + PollingDataSourceBuilder fdv1Polling = Components.pollingDataSource(); + + // Configure polling interval if the synchronizer has polling configuration + if (synchronizer.polling != null) { + if (synchronizer.polling.pollIntervalMs != null) { + fdv1Polling.pollInterval(Duration.ofMillis(synchronizer.polling.pollIntervalMs)); + } + // Note: FDv1 polling doesn't support per-source service endpoints override, + // so it will use the global service endpoints configuration (which is set + // by the caller before this method is invoked) + } + // If streaming synchronizer, use default polling interval + // (no additional configuration needed) + + return fdv1Polling; + } } diff --git a/lib/sdk/server/contract-tests/test-suppressions-fdv2.txt b/lib/sdk/server/contract-tests/test-suppressions-fdv2.txt index 637ace30..e69de29b 100644 --- a/lib/sdk/server/contract-tests/test-suppressions-fdv2.txt +++ b/lib/sdk/server/contract-tests/test-suppressions-fdv2.txt @@ -1,4 +0,0 @@ -streaming/validation/unrecognized data that can be safely ignored/unknown event name with JSON body -streaming/validation/unrecognized data that can be safely ignored/unknown event name with non-JSON body -streaming/validation/unrecognized data that can be safely ignored/patch event with unrecognized path kind -streaming/fdv2/fallback to FDv1 handling diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSourceSynchronizerAdapter.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSourceSynchronizerAdapter.java new file mode 100644 index 00000000..0786de7f --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSourceSynchronizerAdapter.java @@ -0,0 +1,183 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.datasources.Synchronizer; +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; +import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider; +import com.launchdarkly.sdk.server.subsystems.DataSource; +import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes; + +import java.io.IOException; +import java.time.Instant; +import java.util.AbstractMap; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** + * Adapter that wraps a DataSource (FDv1 protocol) and exposes it as a Synchronizer (FDv2 protocol). + *

+ * This adapter bridges the push-based DataSource interface with the pull-based Synchronizer interface + * by listening to updates through a custom DataSourceUpdateSink and queueing them as FDv2SourceResult objects. + *

+ * The adapter is constructed with a factory function that receives the listening update sink and + * creates the DataSource. This ensures the DataSource uses the adapter's internal sink without exposing it. + */ +class DataSourceSynchronizerAdapter implements Synchronizer { + private final DataSource dataSource; + private final IterableAsyncQueue resultQueue = new IterableAsyncQueue<>(); + private final CompletableFuture shutdownFuture = new CompletableFuture<>(); + private final Object startLock = new Object(); + private boolean started = false; + private boolean closed = false; + private Future startFuture; + + /** + * Functional interface for creating a DataSource with a given update sink. + */ + @FunctionalInterface + public interface DataSourceFactory { + DataSource create(DataSourceUpdateSink updateSink); + } + + /** + * Creates a new adapter that wraps a DataSource. + * + * @param dataSourceFactory factory that creates the DataSource with the provided update sink + */ + public DataSourceSynchronizerAdapter(DataSourceFactory dataSourceFactory) { + ConvertingUpdateSink convertingSink = new ConvertingUpdateSink(resultQueue); + this.dataSource = dataSourceFactory.create(convertingSink); + } + + @Override + public CompletableFuture next() { + synchronized (startLock) { + if (!started && !closed) { + started = true; + startFuture = dataSource.start(); + + // Monitor the start future for errors + // The data source will emit updates through the listening sink + CompletableFuture.runAsync(() -> { + try { + startFuture.get(); + } catch (ExecutionException e) { + // Initialization failed - emit an interrupted status + DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.UNKNOWN, + 0, + e.getCause() != null ? e.getCause().toString() : e.toString(), + Instant.now() + ); + resultQueue.put(FDv2SourceResult.interrupted(errorInfo, false)); + } catch (CancellationException e) { + // Start future was cancelled (during close) - exit cleanly + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + } + } + + return CompletableFuture.anyOf(shutdownFuture, resultQueue.take()) + .thenApply(result -> (FDv2SourceResult) result); + } + + @Override + public void close() { + synchronized (startLock) { + if (closed) { + return; + } + closed = true; + } + + try { + dataSource.close(); + } catch (IOException e) { + // Ignore as we are shutting down. + } + shutdownFuture.complete(FDv2SourceResult.shutdown()); + if(startFuture != null) { + // If the start future is done, this has no effect. + // If it is not, then this will unblock the code waiting on start. + startFuture.cancel(true); + } + } + + /** + * A DataSourceUpdateSink that converts DataSource updates into FDv2SourceResult objects. + * This sink doesn't delegate to any other sink - it exists solely to convert FDv1 updates to FDv2 results. + */ + private static class ConvertingUpdateSink implements DataSourceUpdateSink { + private final IterableAsyncQueue resultQueue; + + public ConvertingUpdateSink(IterableAsyncQueue resultQueue) { + this.resultQueue = resultQueue; + } + + @Override + public boolean init(DataStoreTypes.FullDataSet allData) { + // Convert the full data set into a ChangeSet and emit it + DataStoreTypes.ChangeSet changeSet = + new DataStoreTypes.ChangeSet<>( + DataStoreTypes.ChangeSetType.Full, + Selector.EMPTY, + allData.getData(), + null); + resultQueue.put(FDv2SourceResult.changeSet(changeSet, false)); + return true; + } + + @Override + public boolean upsert(DataStoreTypes.DataKind kind, String key, DataStoreTypes.ItemDescriptor item) { + // Convert the upsert into a ChangeSet with a single item and emit it + DataStoreTypes.KeyedItems items = + new DataStoreTypes.KeyedItems<>(Collections.>singletonList( + new AbstractMap.SimpleEntry<>(key, item))); + Iterable>> data = + Collections.>>singletonList( + new AbstractMap.SimpleEntry<>(kind, items)); + + DataStoreTypes.ChangeSet changeSet = + new DataStoreTypes.ChangeSet<>( + DataStoreTypes.ChangeSetType.Partial, + Selector.EMPTY, + data, + null); + resultQueue.put(FDv2SourceResult.changeSet(changeSet, false)); + return true; + } + + @Override + public DataStoreStatusProvider getDataStoreStatusProvider() { + // This adapter doesn't use a data store + return null; + } + + @Override + public void updateStatus(DataSourceStatusProvider.State newState, DataSourceStatusProvider.ErrorInfo newError) { + // Convert state changes to FDv2SourceResult status events + switch (newState) { + case INTERRUPTED: + resultQueue.put(FDv2SourceResult.interrupted(newError, false)); + break; + case OFF: + if (newError != null) { + resultQueue.put(FDv2SourceResult.terminalError(newError, false)); + } + break; + case VALID: + case INITIALIZING: + // These states don't map to FDv2SourceResult status events + break; + } + } + } +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java index bedf0eb6..12b55d1d 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java @@ -11,6 +11,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -32,8 +33,7 @@ class FDv2DataSource implements DataSource { */ private static final long defaultRecoveryTimeout = 5 * 60; - private final List> initializers; - private final SynchronizerStateManager synchronizerStateManager; + private final SourceManager sourceManager; private final List conditionFactories; @@ -46,6 +46,8 @@ class FDv2DataSource implements DataSource { private final LDLogger logger; + private volatile boolean closed = false; + public interface DataSourceFactory { T build(); } @@ -53,6 +55,7 @@ public interface DataSourceFactory { public FDv2DataSource( ImmutableList> initializers, ImmutableList> synchronizers, + DataSourceFactory fdv1DataSourceFactory, DataSourceUpdateSinkV2 dataSourceUpdates, int threadPriority, LDLogger logger, @@ -60,6 +63,7 @@ public FDv2DataSource( ) { this(initializers, synchronizers, + fdv1DataSourceFactory, dataSourceUpdates, threadPriority, logger, @@ -73,6 +77,7 @@ public FDv2DataSource( public FDv2DataSource( ImmutableList> initializers, ImmutableList> synchronizers, + DataSourceFactory fdv1DataSourceFactory, DataSourceUpdateSinkV2 dataSourceUpdates, int threadPriority, LDLogger logger, @@ -80,12 +85,24 @@ public FDv2DataSource( long fallbackTimeout, long recoveryTimeout ) { - this.initializers = initializers; List synchronizerFactories = synchronizers .stream() .map(SynchronizerFactoryWithState::new) .collect(Collectors.toList()); - this.synchronizerStateManager = new SynchronizerStateManager(synchronizerFactories); + + // If we have a fdv1 data source factory, then add that to the synchronizer factories in a blocked state. + // If we receive a request to fallback, then we will unblock it and block all other synchronizers. + if (fdv1DataSourceFactory != null) { + SynchronizerFactoryWithState wrapped = new SynchronizerFactoryWithState(fdv1DataSourceFactory, + true); + wrapped.block(); + synchronizerFactories.add(wrapped); + + // Currently, we only support 1 fdv1 fallback synchronizer, but that limitation is introduced by the + // configuration. + } + + this.sourceManager = new SourceManager(synchronizerFactories, initializers); this.dataSourceUpdates = dataSourceUpdates; this.threadPriority = threadPriority; this.logger = logger; @@ -96,30 +113,50 @@ public FDv2DataSource( private void run() { Thread runThread = new Thread(() -> { - if (!initializers.isEmpty()) { + if (!sourceManager.hasAvailableSources()) { + // There are not any initializer or synchronizers, so we are at the best state that + // can be achieved. + dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null); + startFuture.complete(true); + return; + } + + if (sourceManager.hasInitializers()) { runInitializers(); } - boolean fdv1Fallback = runSynchronizers(); - if (fdv1Fallback) { - // TODO: Run FDv1 fallback. + + if(!sourceManager.hasAvailableSynchronizers()) { + // If already completed by the initializers, then this will have no effect. + if (!isInitialized()) { + // If we have no synchronizers, and we didn't manage to initialize, and we aren't shutting down, + // then that was unexpected, and we will report it. + maybeReportUnexpectedExhaustion("All initializers exhausted and there are no available synchronizers."); + } + // If already completed has no effect. + startFuture.complete(false); + return; } - // TODO: Handle. We have ran out of sources or we are shutting down. + + runSynchronizers(); + + // If we had synchronizers, and we ran out of them, and we aren't shutting down, then that was unexpected, + // and we will report it. + maybeReportUnexpectedExhaustion("All data source acquisition methods have been exhausted."); // If we had initialized at some point, then the future will already be complete and this will be ignored. startFuture.complete(false); }); + runThread.setName("LaunchDarkly-SDK-Server-FDv2DataSource"); runThread.setDaemon(true); runThread.setPriority(threadPriority); runThread.start(); } - private void runInitializers() { boolean anyDataReceived = false; - for (DataSourceFactory factory : initializers) { + Initializer initializer = sourceManager.getNextInitializerAndSetActive(); + while(initializer != null) { try { - Initializer initializer = factory.build(); - if (synchronizerStateManager.setActiveSource(initializer)) return; FDv2SourceResult result = initializer.run().get(); switch (result.getResultType()) { case CHANGE_SET: @@ -133,14 +170,40 @@ private void runInitializers() { } break; case STATUS: - // TODO: Implement. + FDv2SourceResult.Status status = result.getStatus(); + switch(status.getState()) { + case INTERRUPTED: + case TERMINAL_ERROR: + // The data source updates handler will filter the state during initializing, but this + // will make the error information available. + dataSourceUpdates.updateStatus( + // While the error was terminal to the individual initializer, it isn't terminal + // to the data source as a whole. + DataSourceStatusProvider.State.INTERRUPTED, + status.getErrorInfo()); + break; + case SHUTDOWN: + case GOODBYE: + // We don't need to inform anyone of these statuses. + logger.debug("Ignoring status {} from initializer", result.getStatus().getState()); + break; + } break; } } catch (ExecutionException | InterruptedException | CancellationException e) { - // TODO: Better messaging? - // TODO: Data source status? + // We don't expect these conditions to happen in practice. + + // The data source updates handler will filter the state during initializing, but this + // will make the error information available. + dataSourceUpdates.updateStatus( + DataSourceStatusProvider.State.INTERRUPTED, + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.UNKNOWN, + 0, + e.toString(), + new Date().toInstant())); logger.warn("Error running initializer: {}", e.toString()); } + initializer = sourceManager.getNextInitializerAndSetActive(); } // We received data without a selector, and we have exhausted initializers, so we are going to // consider ourselves initialized. @@ -148,6 +211,8 @@ private void runInitializers() { dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null); startFuture.complete(true); } + // If no data was received, then it is possible initialization will complete from synchronizers, so we give + // them an opportunity to run before reporting any issues. } /** @@ -157,8 +222,8 @@ private void runInitializers() { * @return a list of conditions to apply to the synchronizer */ private List getConditions() { - int availableSynchronizers = synchronizerStateManager.getAvailableSynchronizerCount(); - boolean isPrimeSynchronizer = synchronizerStateManager.isPrimeSynchronizer(); + int availableSynchronizers = sourceManager.getAvailableSynchronizerCount(); + boolean isPrimeSynchronizer = sourceManager.isPrimeSynchronizer(); if (availableSynchronizers == 1) { // If there is only 1 synchronizer, then we cannot fall back or recover, so we don't need any conditions. @@ -174,18 +239,13 @@ private List getConditions() { return conditionFactories.stream().map(ConditionFactory::build).collect(Collectors.toList()); } - private boolean runSynchronizers() { + private void runSynchronizers() { // When runSynchronizers exists, no matter how it exits, the synchronizerStateManager will be closed. try { - SynchronizerFactoryWithState availableSynchronizer = synchronizerStateManager.getNextAvailableSynchronizer(); + Synchronizer synchronizer = sourceManager.getNextAvailableSynchronizerAndSetActive(); // We want to continue running synchronizers for as long as any are available. - while (availableSynchronizer != null) { - Synchronizer synchronizer = availableSynchronizer.build(); - - // Returns true if shutdown. - if (synchronizerStateManager.setActiveSource(synchronizer)) return false; - + while (synchronizer != null) { try { boolean running = true; @@ -204,11 +264,13 @@ private boolean runSynchronizers() { // For fallback, we will move to the next available synchronizer, which may loop. // This is the default behavior of exiting the run loop, so we don't need to take // any action. + logger.debug("A synchronizer has experienced an interruption and we are falling back."); break; case RECOVERY: // For recovery, we will start at the first available synchronizer. // So we reset the source index, and finding the source will start at the beginning. - synchronizerStateManager.resetSourceIndex(); + sourceManager.resetSourceIndex(); + logger.debug("The data source is attempting to recover to a higher priority synchronizer."); break; } // A running synchronizer will only have fallback and recovery conditions that it can act on. @@ -217,7 +279,7 @@ private boolean runSynchronizers() { break; } - if(!(res instanceof FDv2SourceResult)) { + if (!(res instanceof FDv2SourceResult)) { logger.error("Unexpected result type from synchronizer: {}", res.getClass().getName()); continue; } @@ -228,6 +290,7 @@ private boolean runSynchronizers() { switch (result.getResultType()) { case CHANGE_SET: dataSourceUpdates.apply(result.getChangeSet()); + dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null); // This could have been completed by any data source. But if it has not been completed before // now, then we complete it. startFuture.complete(true); @@ -237,15 +300,20 @@ private boolean runSynchronizers() { switch (status.getState()) { case INTERRUPTED: // Handled by conditions. - // TODO: Data source status. + dataSourceUpdates.updateStatus( + DataSourceStatusProvider.State.INTERRUPTED, + status.getErrorInfo()); break; case SHUTDOWN: // We should be overall shutting down. - // TODO: We may need logging or to do a little more. - return false; + logger.debug("Synchronizer shutdown."); + return; case TERMINAL_ERROR: - availableSynchronizer.block(); + sourceManager.blockCurrentSynchronizer(); running = false; + dataSourceUpdates.updateStatus( + DataSourceStatusProvider.State.INTERRUPTED, + status.getErrorInfo()); break; case GOODBYE: // We let the synchronizer handle this internally. @@ -255,20 +323,38 @@ private boolean runSynchronizers() { } // We have been requested to fall back to FDv1. We handle whatever message was associated, // close the synchronizer, and then fallback. - if (result.isFdv1Fallback()) { - return true; + // Only trigger fallback if we're not already running the FDv1 fallback synchronizer. + if ( + result.isFdv1Fallback() && + sourceManager.hasFDv1Fallback() && + // This shouldn't happen in practice, an FDv1 source shouldn't request fallback + // to FDv1. But if it does, then we will discard its request. + !sourceManager.isCurrentSynchronizerFDv1Fallback() + ) { + sourceManager.fdv1Fallback(); + running = false; } } } } catch (ExecutionException | InterruptedException | CancellationException e) { - // TODO: Log. - // Move to next synchronizer. + dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.INTERRUPTED, + new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.UNKNOWN, + 0, + e.toString(), + new Date().toInstant() + )); + logger.warn("Error running synchronizer: {}, will try next synchronizer, or retry.", e.toString()); + // Move to the next synchronizer. } - availableSynchronizer = synchronizerStateManager.getNextAvailableSynchronizer(); + // Get the next available synchronizer and set it active + synchronizer = sourceManager.getNextAvailableSynchronizerAndSetActive(); } - return false; - } finally { - synchronizerStateManager.close(); + } catch(Exception e) { + // We are not expecting to encounter this situation, but if we do, then we should log it. + logger.error("Unexpected error in DataSource: {}", e.toString()); + }finally { + sourceManager.close(); } } @@ -291,16 +377,34 @@ public boolean isInitialized() { @Override public void close() { + closed = true; // If there is an active source, we will shut it down, and that will result in the loop handling that source // exiting. // If we do not have an active source, then the loop will check isShutdown when attempting to set one. When // it detects shutdown, it will exit the loop. - synchronizerStateManager.close(); + sourceManager.close(); + + dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.OFF, null); // If this is already set, then this has no impact. startFuture.complete(false); } + private void maybeReportUnexpectedExhaustion(String message) { + if(!closed) { + dataSourceUpdates.updateStatus( + DataSourceStatusProvider.State.OFF, + // If the data source was closed, then we just report we are OFF without an + // associated error. + new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.UNKNOWN, + 0, + message, + new Date().toInstant()) + ); + } + } + /** * Helper class to manage the lifecycle of conditions with automatic cleanup. */ diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSystem.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSystem.java index 77aeb4e8..5c63a654 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSystem.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSystem.java @@ -154,9 +154,23 @@ static FDv2DataSystem create( .map(synchronizer -> new FactoryWrapper<>(synchronizer, builderContext)) .collect(ImmutableList.toImmutableList()); + // Create FDv1 fallback synchronizer factory if configured + FDv2DataSource.DataSourceFactory fdv1FallbackFactory = null; + if (dataSystemConfiguration.getFDv1FallbackSynchronizer() != null) { + fdv1FallbackFactory = () -> { + // Wrap the FDv1 DataSource as a Synchronizer using the adapter + return new DataSourceSynchronizerAdapter( + updateSink -> dataSystemConfiguration + .getFDv1FallbackSynchronizer() + .build(clientContext.withDataSourceUpdateSink(updateSink)) + ); + }; + } + DataSource dataSource = new FDv2DataSource( initializerFactories, synchronizerFactories, + fdv1FallbackFactory, dataSourceUpdates, config.threadPriority, clientContext.getBaseLogger().subLogger(Loggers.DATA_SOURCE_LOGGER_NAME), diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SourceManager.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SourceManager.java new file mode 100644 index 00000000..dd093d9a --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SourceManager.java @@ -0,0 +1,297 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.server.datasources.Initializer; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * Manages the state of synchronizers including tracking which synchronizer is active, + * managing the list of available synchronizers, and handling source transitions. + *

+ * Package-private for internal use. + */ +class SourceManager implements Closeable { + private final List synchronizers; + + private final List> initializers; + + /** + * Lock for active sources and shutdown state. + */ + private final Object activeSourceLock = new Object(); + private Closeable activeSource; + private boolean isShutdown = false; + + /** + * We start at -1, so finding the next synchronizer can non-conditionally increment the index. + */ + private int synchronizerIndex = -1; + + private int initializerIndex = -1; + + /** + * The current synchronizer factory (for checking FDv1 fallback status and blocking) + */ + private SynchronizerFactoryWithState currentSynchronizerFactory; + + public SourceManager(List synchronizers, List> initializers) { + this.synchronizers = synchronizers; + this.initializers = initializers; + } + + /** + * Reset the source index to -1, indicating that we should start from the first synchronizer when looking for + * the next one to use. This is used when recovering from a non-primary synchronizer. + */ + public void resetSourceIndex() { + synchronized (activeSourceLock) { + synchronizerIndex = -1; + } + } + + public boolean hasFDv1Fallback() { + for (SynchronizerFactoryWithState factory : synchronizers) { + if (factory.isFDv1Fallback()) { + return true; + } + } + return false; + } + + /** + * Block all synchronizers aside from the fdv1 fallback and unblock the fdv1 fallback. + */ + public void fdv1Fallback() { + for (SynchronizerFactoryWithState factory : synchronizers) { + if(factory.isFDv1Fallback()) { + factory.unblock(); + } else { + factory.block(); + } + } + } + + /** + * Get the next synchronizer to use. This operates based on tracking the index of the currently active synchronizer, + * which will loop through all available synchronizers handling interruptions. Then a non-prime synchronizer recovers + * the source index will be reset, and we start at the beginning. + *

+ * Any given synchronizer can be marked as blocked, in which case that synchronizer is not eligible to be used again. + * Synchronizers that are not blocked are available, and this function will only return available synchronizers. + *

+ * Note: This is an internal method that must be called while holding activeSourceLock. + * It does not check shutdown status or handle locking - that's done by the caller. + * + * @return the next synchronizer factory to use, or null if there are no more available synchronizers. + */ + private SynchronizerFactoryWithState getNextAvailableSynchronizer() { + SynchronizerFactoryWithState factory = null; + + int visited = 0; + while(visited < synchronizers.size()) { + // Look for the next synchronizer starting at the position after the current one. (avoiding just re-using the same synchronizer.) + synchronizerIndex++; + + // We aren't using module here because we want to keep the stored index within range instead + // of increasing indefinitely. + if(synchronizerIndex >= synchronizers.size()) { + synchronizerIndex = 0; + } + + SynchronizerFactoryWithState candidate = synchronizers.get(synchronizerIndex); + if (candidate.getState() == SynchronizerFactoryWithState.State.Available) { + factory = candidate; + break; + } + visited++; + } + return factory; + } + + /** + * Get the next available synchronizer, build it, and set it as the active source in one atomic operation. + * This combines the two-step process of getting the next synchronizer and setting it active. + *

+ * If shutdown has been initiated, returns null without building or setting a source. + * Any previously active source will be closed before setting the new one. + *

+ * The current synchronizer factory can be retrieved with {@link #blockCurrentSynchronizer()} + * or {@link #isCurrentSynchronizerFDv1Fallback()} to interact with it. + * + * @return the built synchronizer that is now active, or null if no more synchronizers are available or shutdown has been initiated + */ + public com.launchdarkly.sdk.server.datasources.Synchronizer getNextAvailableSynchronizerAndSetActive() { + synchronized (activeSourceLock) { + // Handle shutdown first - if shutdown, don't do any work + if (isShutdown) { + currentSynchronizerFactory = null; + return null; + } + + SynchronizerFactoryWithState factory = getNextAvailableSynchronizer(); + if (factory == null) { + currentSynchronizerFactory = null; + return null; + } + + currentSynchronizerFactory = factory; + com.launchdarkly.sdk.server.datasources.Synchronizer synchronizer = factory.build(); + + // Close any previously active source + if (activeSource != null) { + safeClose(activeSource); + } + + activeSource = synchronizer; + return synchronizer; + } + } + + + public boolean hasAvailableSources() { + return hasInitializers() || getAvailableSynchronizerCount() > 0; + } + + public boolean hasInitializers() { + return !initializers.isEmpty(); + } + + public boolean hasAvailableSynchronizers() { + return getAvailableSynchronizerCount() > 0; + } + + /** + * Get the next initializer factory. This is an internal method that must be called while holding activeSourceLock. + * It does not check shutdown status or handle locking - that's done by the caller. + * + * @return the next initializer factory, or null if no more initializers are available + */ + private FDv2DataSource.DataSourceFactory getNextInitializer() { + initializerIndex++; + if (initializerIndex >= initializers.size()) { + return null; + } + return initializers.get(initializerIndex); + } + + public void blockCurrentSynchronizer() { + synchronized (activeSourceLock) { + if (currentSynchronizerFactory != null) { + currentSynchronizerFactory.block(); + } + } + } + + public boolean isCurrentSynchronizerFDv1Fallback() { + synchronized (activeSourceLock) { + return currentSynchronizerFactory != null && currentSynchronizerFactory.isFDv1Fallback(); + } + } + + /** + * Get the next initializer, build it, and set it as the active source in one atomic operation. + * This combines the two-step process of getting the next initializer and setting it active. + *

+ * If shutdown has been initiated, returns null without building or setting a source. + * Any previously active source will be closed before setting the new one. + * + * @return the built initializer that is now active, or null if no more initializers are available or shutdown has been initiated + */ + public Initializer getNextInitializerAndSetActive() { + synchronized (activeSourceLock) { + // Handle shutdown first - if shutdown, don't do any work + if (isShutdown) { + return null; + } + + FDv2DataSource.DataSourceFactory factory = getNextInitializer(); + if (factory == null) { + return null; + } + + Initializer initializer = factory.build(); + + // Close any previously active source + if (activeSource != null) { + safeClose(activeSource); + } + + activeSource = initializer; + return initializer; + } + } + + /** + * Determine if the currently active synchronizer is the prime (first available) synchronizer. + * @return true if the current synchronizer is the prime synchronizer, false otherwise + */ + public boolean isPrimeSynchronizer() { + synchronized (activeSourceLock) { + for (int index = 0; index < synchronizers.size(); index++) { + if (synchronizers.get(index).getState() == SynchronizerFactoryWithState.State.Available) { + if (synchronizerIndex == index) { + // This is the first synchronizer that is available, and it also is the current one. + return true; + } + break; + // Subsequently encountered synchronizers that are available are not the first one. + } + } + } + return false; + } + + /** + * Get the count of available synchronizers. + * @return the number of available synchronizers + */ + public int getAvailableSynchronizerCount() { + synchronized (activeSourceLock) { + int count = 0; + for (int index = 0; index < synchronizers.size(); index++) { + if (synchronizers.get(index).getState() == SynchronizerFactoryWithState.State.Available) { + count++; + } + } + return count; + } + } + + + /** + * Close the state manager and shut down any active source. + * Implements AutoCloseable to enable try-with-resources usage. + */ + @Override + public void close() { + synchronized (activeSourceLock) { + isShutdown = true; + if (activeSource != null) { + try { + activeSource.close(); + } catch (IOException e) { + // We are done with this synchronizer, so we don't care if it encounters + // an error condition. + } + activeSource = null; + } + } + } + + /** + * Safely close a closeable, ignoring any exceptions. + * @param closeable the closeable to close + */ + private void safeClose(Closeable closeable) { + if (closeable == null) { + return; + } + try { + closeable.close(); + } catch (IOException e) { + // Ignore close exceptions. + } + } +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java index 0145396a..69c0cd93 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java @@ -303,7 +303,9 @@ private void handleMessage(MessageEvent event) { Instant.now() ); result = FDv2SourceResult.interrupted(internalError, getFallback(event)); - restartStream(); + if(kind == DataSourceStatusProvider.ErrorKind.INVALID_DATA) { + restartStream(); + } break; case NONE: diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerFactoryWithState.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerFactoryWithState.java index c0afa642..31f02cc1 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerFactoryWithState.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerFactoryWithState.java @@ -19,11 +19,19 @@ public enum State { private State state = State.Available; + private boolean isFDv1Fallback = false; + public SynchronizerFactoryWithState(FDv2DataSource.DataSourceFactory factory) { this.factory = factory; } + public SynchronizerFactoryWithState(FDv2DataSource.DataSourceFactory factory, boolean isFDv1Fallback) { + this.factory = factory; + this.isFDv1Fallback = isFDv1Fallback; + } + + public State getState() { return state; } @@ -35,4 +43,12 @@ public void block() { public Synchronizer build() { return factory.build(); } + + public void unblock() { + state = State.Available; + } + + public boolean isFDv1Fallback() { + return isFDv1Fallback; + } } diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerStateManager.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerStateManager.java deleted file mode 100644 index 5ff754ed..00000000 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerStateManager.java +++ /dev/null @@ -1,164 +0,0 @@ -package com.launchdarkly.sdk.server; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; - -/** - * Manages the state of synchronizers including tracking which synchronizer is active, - * managing the list of available synchronizers, and handling source transitions. - *

- * Package-private for internal use. - */ -class SynchronizerStateManager implements Closeable { - private final List synchronizers; - - /** - * Lock for active sources and shutdown state. - */ - private final Object activeSourceLock = new Object(); - private Closeable activeSource; - private boolean isShutdown = false; - - /** - * We start at -1, so finding the next synchronizer can non-conditionally increment the index. - */ - private int sourceIndex = -1; - - public SynchronizerStateManager(List synchronizers) { - this.synchronizers = synchronizers; - } - - /** - * Reset the source index to -1, indicating that we should start from the first synchronizer when looking for - * the next one to use. This is used when recovering from a non-primary synchronizer. - */ - public void resetSourceIndex() { - synchronized (activeSourceLock) { - sourceIndex = -1; - } - } - - /** - * Get the next synchronizer to use. This operates based on tracking the index of the currently active synchronizer, - * which will loop through all available synchronizers handling interruptions. Then a non-prime synchronizer recovers - * the source index will be reset, and we start at the beginning. - *

- * Any given synchronizer can be marked as blocked, in which case that synchronizer is not eligible to be used again. - * Synchronizers that are not blocked are available, and this function will only return available synchronizers. - * @return the next synchronizer factory to use, or null if there are no more available synchronizers. - */ - public SynchronizerFactoryWithState getNextAvailableSynchronizer() { - synchronized (activeSourceLock) { - SynchronizerFactoryWithState factory = null; - - int visited = 0; - while(visited < synchronizers.size()) { - // Look for the next synchronizer starting at the position after the current one. (avoiding just re-using the same synchronizer.) - sourceIndex++; - - // We aren't using module here because we want to keep the stored index within range instead - // of increasing indefinitely. - if(sourceIndex >= synchronizers.size()) { - sourceIndex = 0; - } - - SynchronizerFactoryWithState candidate = synchronizers.get(sourceIndex); - if (candidate.getState() == SynchronizerFactoryWithState.State.Available) { - factory = candidate; - break; - } - visited++; - } - return factory; - } - } - - /** - * Determine if the currently active synchronizer is the prime (first available) synchronizer. - * @return true if the current synchronizer is the prime synchronizer, false otherwise - */ - public boolean isPrimeSynchronizer() { - synchronized (activeSourceLock) { - for (int index = 0; index < synchronizers.size(); index++) { - if (synchronizers.get(index).getState() == SynchronizerFactoryWithState.State.Available) { - if (sourceIndex == index) { - // This is the first synchronizer that is available, and it also is the current one. - return true; - } - break; - // Subsequently encountered synchronizers that are available are not the first one. - } - } - } - return false; - } - - /** - * Get the count of available synchronizers. - * @return the number of available synchronizers - */ - public int getAvailableSynchronizerCount() { - synchronized (activeSourceLock) { - int count = 0; - for (int index = 0; index < synchronizers.size(); index++) { - if (synchronizers.get(index).getState() == SynchronizerFactoryWithState.State.Available) { - count++; - } - } - return count; - } - } - - /** - * Set the active source. If shutdown has been initiated, the source will be closed immediately. - * Any previously active source will be closed. - * @param source the source to set as active - * @return true if shutdown has been initiated, false otherwise - */ - public boolean setActiveSource(Closeable source) { - synchronized (activeSourceLock) { - if (activeSource != null) { - safeClose(activeSource); - } - if (isShutdown) { - safeClose(source); - return true; - } - activeSource = source; - } - return false; - } - - /** - * Close the state manager and shut down any active source. - * Implements AutoCloseable to enable try-with-resources usage. - */ - @Override - public void close() { - synchronized (activeSourceLock) { - isShutdown = true; - if (activeSource != null) { - try { - activeSource.close(); - } catch (IOException e) { - // We are done with this synchronizer, so we don't care if it encounters - // an error condition. - } - activeSource = null; - } - } - } - - /** - * Safely close a closeable, ignoring any exceptions. - * @param closeable the closeable to close - */ - private void safeClose(Closeable closeable) { - try { - closeable.close(); - } catch (IOException e) { - // Ignore close exceptions. - } - } -} diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSourceSynchronizerAdapterTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSourceSynchronizerAdapterTest.java new file mode 100644 index 00000000..8f8f7824 --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSourceSynchronizerAdapterTest.java @@ -0,0 +1,296 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.subsystems.DataSource; +import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink; + +import org.junit.After; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.*; + +@SuppressWarnings("javadoc") +public class DataSourceSynchronizerAdapterTest extends BaseTest { + + private final List resourcesToClose = new ArrayList<>(); + + @After + public void tearDown() { + for (AutoCloseable resource : resourcesToClose) { + try { + resource.close(); + } catch (Exception e) { + // Ignore cleanup exceptions + } + } + resourcesToClose.clear(); + } + + /** + * Test that closing the adapter before initialization completes does not leak threads. + * This is the main test for the bug fix - verifies that cancelling startFuture unblocks the monitoring task. + */ + @Test + public void closeBeforeInitializationDoesNotLeakThread() throws Exception { + CountDownLatch blockInitLatch = new CountDownLatch(1); + CountDownLatch futureGetCalledLatch = new CountDownLatch(1); + + // Create an adapter with a data source that blocks during initialization + // The MockDataSource will signal futureGetCalledLatch when get() is called on the returned future + DataSourceSynchronizerAdapter adapter = new DataSourceSynchronizerAdapter(sink -> + new MockDataSource(blockInitLatch, null, null, futureGetCalledLatch) + ); + resourcesToClose.add(adapter); + + // Start the adapter (launches monitoring task) + CompletableFuture nextFuture = adapter.next(); + + // Wait for the monitoring task to actually call get() on the startFuture and block + // This ensures we're testing the exact scenario: monitoring task is blocked when cancel() is called + assertTrue("Future.get() should have been called", + futureGetCalledLatch.await(1, TimeUnit.SECONDS)); + + // Close before initialization completes - this should cancel startFuture and unblock the monitoring task + adapter.close(); + + // Verify next() completes with shutdown result (should be nearly immediate) + FDv2SourceResult result = nextFuture.get(2, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.SHUTDOWN, result.getStatus().getState()); + + // Signal the blocked initialization (should already be cancelled/irrelevant) + blockInitLatch.countDown(); + + // Test passes if we reach here without hanging. + } + + /** + * Test that normal initialization (without premature close) still works correctly. + * This ensures the fix doesn't break the happy path. + */ + @Test + public void normalInitializationCompletes() throws Exception { + CountDownLatch allowInitLatch = new CountDownLatch(1); + + DataSourceSynchronizerAdapter adapter = new DataSourceSynchronizerAdapter(sink -> + new MockDataSource(allowInitLatch, null) + ); + resourcesToClose.add(adapter); + + CompletableFuture nextFuture = adapter.next(); + + // Allow initialization to complete + allowInitLatch.countDown(); + + // Wait briefly for the monitoring task to process completion + Thread.sleep(200); + + // Close normally + adapter.close(); + + // Verify shutdown result is received + FDv2SourceResult result = nextFuture.get(1, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.SHUTDOWN, result.getStatus().getState()); + } + + /** + * Test that initialization errors are properly reported. + * Ensures the exception handling in the monitoring task still works correctly. + */ + @Test + public void initializationErrorIsReported() throws Exception { + // Create an adapter with a data source that fails during initialization + DataSourceSynchronizerAdapter adapter = new DataSourceSynchronizerAdapter(sink -> + new MockDataSource(new RuntimeException("Init failed")) + ); + resourcesToClose.add(adapter); + + CompletableFuture nextFuture = adapter.next(); + + // Wait for the error to be reported + FDv2SourceResult result = nextFuture.get(2, TimeUnit.SECONDS); + + // Should receive an interrupted status with error info + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.INTERRUPTED, result.getStatus().getState()); + assertNotNull(result.getStatus().getErrorInfo()); + assertTrue(result.getStatus().getErrorInfo().getMessage().contains("Init failed")); + + adapter.close(); + } + + /** + * Test that close() can be called before start()/next() without issues. + */ + @Test + public void closeBeforeStartDoesNotFail() throws Exception { + DataSourceSynchronizerAdapter adapter = new DataSourceSynchronizerAdapter(sink -> + new MockDataSource(new CountDownLatch(1), null) + ); + + // Close before calling next() + adapter.close(); + + // next() should still work and return shutdown immediately + CompletableFuture nextFuture = adapter.next(); + FDv2SourceResult result = nextFuture.get(1, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.SHUTDOWN, result.getStatus().getState()); + } + + /** + * Test multiple rapid close/next cycles to ensure no race conditions. + */ + @Test + public void rapidCloseDoesNotCauseIssues() throws Exception { + for (int i = 0; i < 10; i++) { + CountDownLatch blockLatch = new CountDownLatch(1); + DataSourceSynchronizerAdapter adapter = new DataSourceSynchronizerAdapter(sink -> + new MockDataSource(blockLatch, null) + ); + + CompletableFuture nextFuture = adapter.next(); + Thread.sleep(10); // Brief delay to let init start + adapter.close(); + + FDv2SourceResult result = nextFuture.get(1, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.SHUTDOWN, result.getStatus().getState()); + + blockLatch.countDown(); + } + } + + /** + * Mock DataSource implementation for testing. + * Allows controlling when initialization completes or fails. + */ + private static class MockDataSource implements DataSource { + private final CountDownLatch blockLatch; + private final CountDownLatch signalLatch; + private final Exception initException; + private final CountDownLatch futureGetCalledLatch; + private final CompletableFuture startFuture = new CompletableFuture<>(); + private volatile boolean closed = false; + + // Constructor for blocking init + public MockDataSource(CountDownLatch blockLatch, CountDownLatch signalLatch) { + this(blockLatch, signalLatch, null, null); + } + + // Constructor for init that fails + public MockDataSource(Exception initException) { + this(null, null, initException, null); + } + + public MockDataSource(CountDownLatch blockLatch, CountDownLatch signalLatch, Exception initException) { + this(blockLatch, signalLatch, initException, null); + } + + public MockDataSource(CountDownLatch blockLatch, CountDownLatch signalLatch, Exception initException, CountDownLatch futureGetCalledLatch) { + this.blockLatch = blockLatch; + this.signalLatch = signalLatch; + this.initException = initException; + this.futureGetCalledLatch = futureGetCalledLatch; + } + + @Override + public Future start() { + // Start initialization in background thread + CompletableFuture.runAsync(() -> { + try { + // Signal that init has started + if (signalLatch != null) { + signalLatch.countDown(); + } + + // If there's an exception to throw, throw it + if (initException != null) { + startFuture.completeExceptionally(initException); + return; + } + + // If there's a latch, wait for it (simulating slow initialization) + if (blockLatch != null) { + blockLatch.await(); + } + + // Complete successfully + startFuture.complete(null); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + startFuture.completeExceptionally(e); + } + }); + + // If we need to signal when get() is called, wrap the future + if (futureGetCalledLatch != null) { + return new SignalingFuture<>(startFuture, futureGetCalledLatch); + } + return startFuture; + } + + @Override + public void close() { + closed = true; + // Note: Like PollingProcessor and StreamProcessor, we do NOT complete the startFuture here + // This is what originally caused the thread leak that we're fixing in the adapter + } + + @Override + public boolean isInitialized() { + return startFuture.isDone() && !startFuture.isCompletedExceptionally(); + } + } + + /** + * Wrapper around a Future that signals a latch when get() is called. + * Used to precisely detect when the monitoring task calls get() and blocks. + */ + private static class SignalingFuture implements Future { + private final Future delegate; + private final CountDownLatch getCalledLatch; + + public SignalingFuture(Future delegate, CountDownLatch getCalledLatch) { + this.delegate = delegate; + this.getCalledLatch = getCalledLatch; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + getCalledLatch.countDown(); + return delegate.get(); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + getCalledLatch.countDown(); + return delegate.get(timeout, unit); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return delegate.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return delegate.isCancelled(); + } + + @Override + public boolean isDone() { + return delegate.isDone(); + } + } +} diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceTest.java index 757c3374..5dea1f24 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceTest.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceTest.java @@ -17,7 +17,6 @@ import java.time.Instant; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -105,15 +104,23 @@ public void firstInitializerFailsSecondInitializerSucceedsWithSelector() throws ImmutableList> synchronizers = ImmutableList.of(); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); startFuture.get(2, TimeUnit.SECONDS); + // In practice this intermediate status will be supressed by the data source updates sink. + + // Should receive INTERRUPTED from the first failed initializer, then VALID from second successful initializer + List statuses = sink.awaitStatuses(2, 2, TimeUnit.SECONDS); + assertEquals("Should receive 2 status updates", 2, statuses.size()); + assertEquals(DataSourceStatusProvider.State.INTERRUPTED, statuses.get(0)); + assertEquals(DataSourceStatusProvider.State.VALID, statuses.get(1)); + assertTrue(dataSource.isInitialized()); assertEquals(1, sink.getApplyCount()); - // TODO: Verify status updated to VALID when data source status is implemented + assertEquals(DataSourceStatusProvider.State.VALID, sink.getLastState()); } @Test @@ -145,7 +152,7 @@ public void firstInitializerFailsSecondInitializerSucceedsWithoutSelector() thro } ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -157,10 +164,18 @@ public void firstInitializerFailsSecondInitializerSucceedsWithoutSelector() thro Boolean synchronizerCalled = synchronizerCalledQueue.poll(2, TimeUnit.SECONDS); assertNotNull("Synchronizer should be called", synchronizerCalled); + // Expected status sequence: + // 1. INTERRUPTED when first initializer fails + // 2. VALID after synchronizer completes (second initializer has no selector, so must wait for synchronizer) + List statuses = sink.awaitStatuses(2, 2, TimeUnit.SECONDS); + assertEquals("Should receive 2 status updates", 2, statuses.size()); + assertEquals(DataSourceStatusProvider.State.INTERRUPTED, statuses.get(0)); + assertEquals(DataSourceStatusProvider.State.VALID, statuses.get(1)); + // Wait for apply to be processed sink.awaitApplyCount(2, 2, TimeUnit.SECONDS); assertEquals(2, sink.getApplyCount()); // One from initializer, one from synchronizer - // TODO: Verify status updated to VALID when data source status is implemented + assertEquals(DataSourceStatusProvider.State.VALID, sink.getLastState()); } @Test @@ -186,16 +201,21 @@ public void firstInitializerSucceedsWithSelectorSecondInitializerNotInvoked() th ImmutableList> synchronizers = ImmutableList.of(); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); startFuture.get(2, TimeUnit.SECONDS); + // Expected status: VALID (first initializer succeeds with selector, second not called) + List statuses = sink.awaitStatuses(1, 2, TimeUnit.SECONDS); + assertEquals("Should receive 1 status update", 1, statuses.size()); + assertEquals(DataSourceStatusProvider.State.VALID, statuses.get(0)); + assertTrue(dataSource.isInitialized()); assertFalse(secondInitializerCalled.get()); assertEquals(1, sink.getApplyCount()); - // TODO: Verify status updated to VALID when data source status is implemented + assertEquals(DataSourceStatusProvider.State.VALID, sink.getLastState()); } @Test @@ -226,7 +246,7 @@ public void allInitializersFailSwitchesToSynchronizers() throws Exception { } ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -268,15 +288,29 @@ public void allThreeInitializersFailWithNoSynchronizers() throws Exception { ImmutableList> synchronizers = ImmutableList.of(); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); startFuture.get(2, TimeUnit.SECONDS); + // Should receive INTERRUPTED for each failing initializer, then OFF when all sources exhausted + // Expected: 3 INTERRUPTED statuses + 1 OFF status = 4 total + List statuses = sink.awaitStatuses(4, 2, TimeUnit.SECONDS); + assertEquals("Should receive 4 status updates", 4, statuses.size()); + + // First 3 should be INTERRUPTED (one per failed initializer) + assertEquals(DataSourceStatusProvider.State.INTERRUPTED, statuses.get(0)); + assertEquals(DataSourceStatusProvider.State.INTERRUPTED, statuses.get(1)); + assertEquals(DataSourceStatusProvider.State.INTERRUPTED, statuses.get(2)); + + // Final status should be OFF (all sources exhausted, no synchronizers) + assertEquals(DataSourceStatusProvider.State.OFF, statuses.get(3)); + assertFalse(dataSource.isInitialized()); assertEquals(0, sink.getApplyCount()); - // TODO: Verify status reflects exhausted sources when data source status is implemented + assertEquals(DataSourceStatusProvider.State.OFF, sink.getLastState()); + assertNotNull(sink.getLastError()); } @Test @@ -294,15 +328,20 @@ public void oneInitializerNoSynchronizerIsWellBehaved() throws Exception { ImmutableList> synchronizers = ImmutableList.of(); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); startFuture.get(2, TimeUnit.SECONDS); + // Expected status: VALID (initializer succeeds with selector, no need to wait for synchronizer) + List statuses = sink.awaitStatuses(1, 2, TimeUnit.SECONDS); + assertEquals("Should receive 1 status update", 1, statuses.size()); + assertEquals(DataSourceStatusProvider.State.VALID, statuses.get(0)); + assertTrue(dataSource.isInitialized()); assertEquals(1, sink.getApplyCount()); - // TODO: Verify status updated to VALID when data source status is implemented + assertEquals(DataSourceStatusProvider.State.VALID, sink.getLastState()); } // ============================================================================ @@ -324,7 +363,7 @@ public void noInitializersOneSynchronizerIsWellBehaved() throws Exception { () -> new MockSynchronizer(synchronizerFuture) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -358,7 +397,7 @@ public void oneInitializerOneSynchronizerIsWellBehaved() throws Exception { () -> new MockSynchronizer(synchronizerFuture) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -379,15 +418,20 @@ public void noInitializersAndNoSynchronizersIsWellBehaved() throws Exception { ImmutableList> initializers = ImmutableList.of(); ImmutableList> synchronizers = ImmutableList.of(); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); startFuture.get(2, TimeUnit.SECONDS); - assertFalse(dataSource.isInitialized()); + // Expected status: VALID (no sources but data source initializes immediately) + List statuses = sink.awaitStatuses(1, 2, TimeUnit.SECONDS); + assertEquals("Should receive 1 status update", 1, statuses.size()); + assertEquals(DataSourceStatusProvider.State.VALID, statuses.get(0)); + + assertTrue(dataSource.isInitialized()); assertEquals(0, sink.getApplyCount()); - // TODO: Verify status reflects exhausted sources when data source status is implemented + assertEquals(DataSourceStatusProvider.State.VALID, sink.getLastState()); } // ============================================================================ @@ -413,7 +457,7 @@ public void errorWithFDv1FallbackTriggersFallback() throws Exception { } ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -457,7 +501,7 @@ public void fallbackAndRecoveryTasksWellBehaved() throws Exception { ); // Use short timeouts for testing - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -473,13 +517,25 @@ public void fallbackAndRecoveryTasksWellBehaved() throws Exception { // 5. After recovery, first sync sends apply again (3) // Total time: ~3-4 seconds (1s fallback + 2s recovery + processing) + // Expected status sequence: + // 1. VALID when first sync sends initial changeset + // 2. INTERRUPTED when first sync sends interrupted result + // 3. VALID when second sync (fallback) sends changeset + // 4. VALID when first sync recovers and sends changeset again + // Wait for at least the first 3 statuses that happen during initialization + List statuses = sink.awaitStatuses(3, 6, TimeUnit.SECONDS); + assertTrue("Should receive at least 3 status updates", statuses.size() >= 3); + assertEquals(DataSourceStatusProvider.State.VALID, statuses.get(0)); + assertEquals(DataSourceStatusProvider.State.INTERRUPTED, statuses.get(1)); + assertEquals(DataSourceStatusProvider.State.VALID, statuses.get(2)); + // Wait for 3 applies with enough time for fallback (1s) + recovery (2s) + overhead sink.awaitApplyCount(3, 5, TimeUnit.SECONDS); // Both synchronizers should have been called due to fallback and recovery assertTrue(firstSyncCallCount.get() >= 2); // Called initially and after recovery assertTrue(secondSyncCallCount.get() >= 1); // Called after fallback - // TODO: Verify status transitions when data source status is implemented + assertEquals(DataSourceStatusProvider.State.VALID, sink.getLastState()); } @Test @@ -498,7 +554,7 @@ public void canDisposeWhenSynchronizersFallingBack() throws Exception { () -> new MockQueuedSynchronizer(syncResults) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -545,7 +601,7 @@ public void terminalErrorBlocksSynchronizer() throws Exception { } ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -562,9 +618,19 @@ public void terminalErrorBlocksSynchronizer() throws Exception { assertEquals(Integer.valueOf(1), firstCall); assertEquals(Integer.valueOf(2), secondCall); + // Expected status sequence: + // 1. VALID when first synchronizer sends initial changeset + // 2. Terminal error from first synchronizer (blocks it) + // 3. VALID when second synchronizer sends changeset + List statuses = sink.awaitStatuses(3, 2, TimeUnit.SECONDS); + assertEquals("Should receive 3 status updates", 3, statuses.size()); + assertEquals(DataSourceStatusProvider.State.VALID, statuses.get(0)); + // Note: terminal error might be suppressed or show as INTERRUPTED + assertEquals(DataSourceStatusProvider.State.VALID, statuses.get(2)); + // Wait for applies from both sink.awaitApplyCount(2, 2, TimeUnit.SECONDS); - // TODO: Verify status transitions when data source status is implemented + assertEquals(DataSourceStatusProvider.State.VALID, sink.getLastState()); } @Test @@ -593,14 +659,22 @@ public void allThreeSynchronizersFailReportsExhaustion() throws Exception { } ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); startFuture.get(2, TimeUnit.SECONDS); + // Expected status sequence: 3 terminal errors (one per synchronizer) → OFF when all exhausted + // Terminal errors might show as INTERRUPTED status + List statuses = sink.awaitStatuses(4, 2, TimeUnit.SECONDS); + assertEquals("Should receive 4 status updates", 4, statuses.size()); + // Last status should be OFF + assertEquals(DataSourceStatusProvider.State.OFF, statuses.get(3)); + assertFalse(dataSource.isInitialized()); - // TODO: Verify status reflects exhausted sources when data source status is implemented + assertEquals(DataSourceStatusProvider.State.OFF, sink.getLastState()); + assertNotNull(sink.getLastError()); } // ============================================================================ @@ -633,7 +707,7 @@ public void disabledDataSourceCannotTriggerActions() throws Exception { () -> new MockQueuedSynchronizer(secondSyncResults) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -679,33 +753,46 @@ public void disposeCompletesStartFuture() throws Exception { () -> new MockQueuedSynchronizer(syncResults) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); Future startFuture = dataSource.start(); startFuture.get(2, TimeUnit.SECONDS); + // Expected status: VALID when synchronizer sends initial changeset + List statuses = sink.awaitStatuses(1, 2, TimeUnit.SECONDS); + assertEquals("Should receive 1 status update", 1, statuses.size()); + assertEquals(DataSourceStatusProvider.State.VALID, statuses.get(0)); + dataSource.close(); assertTrue(startFuture.isDone()); - // TODO: Verify status updated to OFF when data source status is implemented + + statuses = sink.awaitStatuses(1, 2, TimeUnit.SECONDS); + assertEquals("Should receive 1 status update", 1, statuses.size()); + assertEquals(DataSourceStatusProvider.State.OFF, statuses.get(0)); } @Test - public void noSourcesProvidedCompletesImmediately() throws Exception { + public void noSourcesProvidedCompletesImmediately() throws Exception{ executor = Executors.newScheduledThreadPool(2); MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); ImmutableList> initializers = ImmutableList.of(); ImmutableList> synchronizers = ImmutableList.of(); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); startFuture.get(2, TimeUnit.SECONDS); - assertFalse(dataSource.isInitialized()); - // TODO: Verify status reflects exhausted sources when data source status is implemented + // Expected status: VALID (no sources but data source initializes immediately) + List statuses = sink.awaitStatuses(1, 2, TimeUnit.SECONDS); + assertEquals("Should receive 1 status update", 1, statuses.size()); + assertEquals(DataSourceStatusProvider.State.VALID, statuses.get(0)); + + assertTrue(dataSource.isInitialized()); + assertEquals(DataSourceStatusProvider.State.VALID, sink.getLastState()); } // ============================================================================ @@ -733,7 +820,7 @@ public void startFutureCompletesExactlyOnce() throws Exception { () -> new MockSynchronizer(synchronizerFuture) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -756,7 +843,7 @@ public void concurrentCloseAndStartHandledSafely() throws Exception { () -> new MockQueuedSynchronizer(syncResults) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); Future startFuture = dataSource.start(); @@ -780,7 +867,7 @@ public void multipleStartCallsEventuallyComplete() throws Exception { () -> new MockQueuedSynchronizer(syncResults) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture1 = dataSource.start(); @@ -808,7 +895,7 @@ public void isInitializedThreadSafe() throws Exception { () -> new MockQueuedSynchronizer(syncResults) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); dataSource.start(); @@ -843,7 +930,7 @@ public void dataSourceUpdatesApplyThreadSafe() throws Exception { () -> new MockQueuedSynchronizer(syncResults) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -879,7 +966,7 @@ public void initializerThrowsExecutionException() throws Exception { ImmutableList> synchronizers = ImmutableList.of(); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -911,7 +998,7 @@ public void initializerThrowsInterruptedException() throws Exception { ImmutableList> synchronizers = ImmutableList.of(); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -941,7 +1028,7 @@ public void initializerThrowsCancellationException() throws Exception { ImmutableList> synchronizers = ImmutableList.of(); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -970,7 +1057,7 @@ public void synchronizerNextThrowsExecutionException() throws Exception { () -> new MockSynchronizer(goodFuture) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1002,7 +1089,7 @@ public void synchronizerNextThrowsInterruptedException() throws Exception { () -> new MockSynchronizer(goodFuture) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1032,7 +1119,7 @@ public void synchronizerNextThrowsCancellationException() throws Exception { () -> new MockSynchronizer(goodFuture) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1054,7 +1141,7 @@ public void closeWithoutStartDoesNotHang() { ImmutableList> initializers = ImmutableList.of(); ImmutableList> synchronizers = ImmutableList.of(); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); dataSource.close(); @@ -1076,7 +1163,7 @@ public void closeAfterInitializersCompletesImmediately() throws Exception { ImmutableList> synchronizers = ImmutableList.of(); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); Future startFuture = dataSource.start(); startFuture.get(2, TimeUnit.SECONDS); @@ -1107,7 +1194,7 @@ public void close() { } ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); Future startFuture = dataSource.start(); startFuture.get(2, TimeUnit.SECONDS); @@ -1132,7 +1219,7 @@ public void multipleCloseCallsAreIdempotent() throws Exception { ImmutableList> synchronizers = ImmutableList.of(); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); Future startFuture = dataSource.start(); startFuture.get(2, TimeUnit.SECONDS); @@ -1144,6 +1231,81 @@ public void multipleCloseCallsAreIdempotent() throws Exception { // Test passes if we reach here without throwing } + @Test + public void closingDataSourceDuringInitializationReportsOffWithoutErrors() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CompletableFuture initializerFuture = new CompletableFuture<>(); + + + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(initializerFuture) + ); + + ImmutableList> synchronizers = ImmutableList.of(); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + + // Close the data source - this sets closed=true + dataSource.close(); + // Result shouldn't be used. +// initializerFuture.complete(FDv2SourceResult.changeSet(makeChangeSet(true), false)); + + // Wait for start future (completes when exhaustion happens) + startFuture.get(2, TimeUnit.SECONDS); + System.out.println("Start future completed"); + + // Wait for the OFF status to be reported + DataSourceStatusProvider.State status = sink.awaitStatus(2, TimeUnit.SECONDS); + assertNotNull("Should receive status update", status); + assertEquals(DataSourceStatusProvider.State.OFF, status); + + // The data source should report OFF with null error because it was closed + assertNull("Error should be null when closed data source exhausts initializers", sink.getLastError()); + assertFalse(dataSource.isInitialized()); + } + + @Test + public void dataSourceClosedDuringSynchronizationReportsOffWithoutError() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // Synchronizer that emits a changeset to initialize, then waits. + ImmutableList> synchronizers = ImmutableList.of( + () -> { + BlockingQueue results = new LinkedBlockingQueue<>(); + results.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + return new MockQueuedSynchronizer(results); + } + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Close the data source - this sets closed=true + dataSource.close(); + + // Expected status sequence: + // 1. VALID (from first synchronizer's changeset) + // 4. OFF (from exhaustion, with null error because closed=true) + List statuses = sink.awaitStatuses(2, 2, TimeUnit.SECONDS); + assertEquals("Should receive 4 status updates", 2, statuses.size()); + + assertEquals(DataSourceStatusProvider.State.VALID, statuses.get(0)); + assertEquals(DataSourceStatusProvider.State.OFF, statuses.get(1)); + + assertNull("Error should be null when closed data source exhausts synchronizers", sink.getLastError()); + assertTrue(dataSource.isInitialized()); // Was initialized before close + } + @Test public void closeInterruptsConditionWaiting() throws Exception { executor = Executors.newScheduledThreadPool(2); @@ -1160,7 +1322,7 @@ public void closeInterruptsConditionWaiting() throws Exception { () -> new MockQueuedSynchronizer(syncResults) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); Future startFuture = dataSource.start(); startFuture.get(2, TimeUnit.SECONDS); @@ -1198,7 +1360,7 @@ public CompletableFuture run() { ImmutableList> synchronizers = ImmutableList.of(); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); dataSource.close(); Future startFuture = dataSource.start(); @@ -1235,7 +1397,7 @@ public void close() { () -> new MockQueuedSynchronizer(secondSyncResults) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1269,7 +1431,7 @@ public void close() { } ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); Future startFuture = dataSource.start(); startFuture.get(2, TimeUnit.SECONDS); @@ -1301,7 +1463,7 @@ public void setActiveSourceOnInitializerChecksShutdown() throws Exception { ImmutableList> synchronizers = ImmutableList.of(); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); Future startFuture = dataSource.start(); @@ -1362,7 +1524,7 @@ public void blockedSynchronizerSkippedInRotation() throws Exception { } ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1392,7 +1554,7 @@ public void allSynchronizersBlockedReturnsNullAndExits() throws Exception { } ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1432,7 +1594,7 @@ public void recoveryResetsToFirstAvailableSynchronizer() throws Exception { ); // Short recovery timeout - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1472,7 +1634,7 @@ public void fallbackMovesToNextSynchronizer() throws Exception { ); // Short fallback timeout - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 1, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1502,7 +1664,7 @@ public void conditionsClosedAfterSynchronizerLoop() throws Exception { () -> new MockQueuedSynchronizer(syncResults) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1529,7 +1691,7 @@ public void conditionsInformedOfAllResults() throws Exception { () -> new MockQueuedSynchronizer(syncResults) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 10, 20); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 10, 20); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1557,7 +1719,7 @@ public void conditionsClosedOnException() throws Exception { )) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1582,7 +1744,7 @@ public void primeSynchronizerHasNoRecoveryCondition() throws Exception { () -> new MockQueuedSynchronizer(new LinkedBlockingQueue<>()) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1613,7 +1775,7 @@ public void nonPrimeSynchronizerHasBothConditions() throws Exception { () -> new MockQueuedSynchronizer(secondSyncResults) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1637,7 +1799,7 @@ public void singleSynchronizerHasNoConditions() throws Exception { () -> new MockQueuedSynchronizer(syncResults) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1662,7 +1824,7 @@ public void conditionFutureNeverCompletesWhenNoConditions() throws Exception { () -> new MockQueuedSynchronizer(syncResults) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1692,7 +1854,7 @@ public void changeSetAppliedToDataSourceUpdates() throws Exception { ImmutableList> synchronizers = ImmutableList.of(); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1718,7 +1880,7 @@ public void multipleChangeSetsAppliedInOrder() throws Exception { () -> new MockQueuedSynchronizer(syncResults) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1751,7 +1913,7 @@ public void selectorNonEmptyCompletesInitialization() throws Exception { } ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, ImmutableList.of(), sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, ImmutableList.of(), null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1780,15 +1942,20 @@ public void initializerChangeSetWithoutSelectorCompletesIfLastInitializer() thro ImmutableList> synchronizers = ImmutableList.of(); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); startFuture.get(2, TimeUnit.SECONDS); + // Expected status: VALID (single initializer without selector completes when it's the last initializer) + List statuses = sink.awaitStatuses(1, 2, TimeUnit.SECONDS); + assertEquals("Should receive 1 status update", 1, statuses.size()); + assertEquals(DataSourceStatusProvider.State.VALID, statuses.get(0)); + assertTrue(dataSource.isInitialized()); assertEquals(1, sink.getApplyCount()); - // TODO: Verify status updated to VALID when data source status is implemented + assertEquals(DataSourceStatusProvider.State.VALID, sink.getLastState()); } @Test @@ -1806,7 +1973,7 @@ public void synchronizerChangeSetAlwaysCompletesStartFuture() throws Exception { () -> new MockSynchronizer(synchronizerFuture) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1835,7 +2002,7 @@ public void goodbyeStatusHandledGracefully() throws Exception { () -> new MockQueuedSynchronizer(syncResults) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1869,7 +2036,7 @@ public void shutdownStatusExitsImmediately() throws Exception { } ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1896,7 +2063,7 @@ public void fdv1FallbackFlagHonored() throws Exception { () -> new MockQueuedSynchronizer(syncResults) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1929,7 +2096,7 @@ public void emptyInitializerListSkipsToSynchronizers() throws Exception { } ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture = dataSource.start(); @@ -1957,7 +2124,7 @@ public void startedFlagPreventsMultipleRuns() throws Exception { ImmutableList> synchronizers = ImmutableList.of(); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); Future startFuture1 = dataSource.start(); @@ -1987,7 +2154,7 @@ public void startBeforeRunCompletesAllComplete() throws Exception { () -> new MockQueuedSynchronizer(syncResults) ); - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, null, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); resourcesToClose.add(dataSource); // Call start multiple times before completion @@ -2001,6 +2168,506 @@ public void startBeforeRunCompletesAllComplete() throws Exception { assertTrue(dataSource.isInitialized()); } + // ============================================================================ + // Data Source Status Tests + // ============================================================================ + + @Test + public void statusTransitionsToValidAfterInitialization() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CompletableFuture initializerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(false), false) + ); + + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(initializerFuture) + ); + + FDv2DataSource dataSource = new FDv2DataSource( + initializers, + ImmutableList.of(), + null, + sink, + Thread.NORM_PRIORITY, + logger, + executor, + 120, + 300 + ); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // After initializers complete with data (no selector), VALID status is emitted + // Since we initialized successfully and there are no synchronizers, we stay VALID + DataSourceStatusProvider.State status = sink.awaitStatus(2, TimeUnit.SECONDS); + assertNotNull("Should receive status update", status); + assertEquals(DataSourceStatusProvider.State.VALID, status); + assertEquals(DataSourceStatusProvider.State.VALID, sink.getLastState()); + assertNull("Should not have error when VALID", sink.getLastError()); + } + + @Test + public void statusIncludesErrorInfoOnFailure() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // Synchronizer that sends terminal error + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.terminalError( + new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.ERROR_RESPONSE, + 401, + "Unauthorized", + Instant.now() + ), + false + )); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource( + initializers, + synchronizers, + null, + sink, + Thread.NORM_PRIORITY, + logger, + executor, + 120, + 300 + ); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Should receive INTERRUPTED first (from terminal error), then OFF (from exhausted synchronizers) + DataSourceStatusProvider.State firstStatus = sink.awaitStatus(2, TimeUnit.SECONDS); + assertNotNull("Should receive first status update", firstStatus); + assertEquals(DataSourceStatusProvider.State.INTERRUPTED, firstStatus); + + DataSourceStatusProvider.State secondStatus = sink.awaitStatus(2, TimeUnit.SECONDS); + assertNotNull("Should receive second status update", secondStatus); + assertEquals(DataSourceStatusProvider.State.OFF, secondStatus); + + // Final state should be OFF with error info from the last status update + assertEquals(DataSourceStatusProvider.State.OFF, sink.getLastState()); + assertNotNull("Should have error info", sink.getLastError()); + assertEquals(DataSourceStatusProvider.ErrorKind.UNKNOWN, sink.getLastError().getKind()); + } + + @Test + public void statusRemainsValidDuringSynchronizerOperation() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // Synchronizer that sends multiple changesets + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource( + initializers, + synchronizers, + null, + sink, + Thread.NORM_PRIORITY, + logger, + executor, + 120, + 300 + ); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Wait for all changesets to be applied + sink.awaitApplyCount(3, 2, TimeUnit.SECONDS); + + // Status should be VALID throughout + assertEquals(DataSourceStatusProvider.State.VALID, sink.getLastState()); + assertEquals(3, sink.getApplyCount()); + } + + @Test + public void statusTransitionsFromValidToOffWhenAllSynchronizersFail() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // First synchronizer sends a changeset then terminal error + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + syncResults.add(FDv2SourceResult.terminalError( + new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, + 500, + "Server error", + Instant.now() + ), + false + )); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource( + initializers, + synchronizers, + null, + sink, + Thread.NORM_PRIORITY, + logger, + executor, + 120, + 300 + ); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Should transition: VALID (from changeset) → INTERRUPTED (from terminal error) → OFF (from exhausted sources) + DataSourceStatusProvider.State firstStatus = sink.awaitStatus(2, TimeUnit.SECONDS); + assertEquals(DataSourceStatusProvider.State.VALID, firstStatus); + + DataSourceStatusProvider.State secondStatus = sink.awaitStatus(2, TimeUnit.SECONDS); + assertEquals(DataSourceStatusProvider.State.INTERRUPTED, secondStatus); + + DataSourceStatusProvider.State thirdStatus = sink.awaitStatus(2, TimeUnit.SECONDS); + assertEquals(DataSourceStatusProvider.State.OFF, thirdStatus); + + assertEquals(DataSourceStatusProvider.State.OFF, sink.getLastState()); + assertNotNull("Should have error info when OFF", sink.getLastError()); + } + + // ============================================================================ + // FDv1 Fallback Tests + // ============================================================================ + + @Test + public void fdv1FallbackActivatesWhenFlagSet() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // First synchronizer sends a result with FDv1 fallback flag set + BlockingQueue fdv2SyncResults = new LinkedBlockingQueue<>(); + fdv2SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + fdv2SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), true)); // FDv1 fallback triggered + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(fdv2SyncResults) + ); + + // Create FDv1 fallback synchronizer + BlockingQueue fdv1SyncResults = new LinkedBlockingQueue<>(); + fdv1SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + BlockingQueue fdv1CalledQueue = new LinkedBlockingQueue<>(); + FDv2DataSource.DataSourceFactory fdv1Fallback = () -> { + fdv1CalledQueue.offer(true); + return new MockQueuedSynchronizer(fdv1SyncResults); + }; + + FDv2DataSource dataSource = new FDv2DataSource( + initializers, + synchronizers, + fdv1Fallback, + sink, + Thread.NORM_PRIORITY, + logger, + executor, + 120, + 300 + ); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Wait for FDv1 to be called + Boolean fdv1Called = fdv1CalledQueue.poll(2, TimeUnit.SECONDS); + assertNotNull("FDv1 fallback synchronizer should be activated", fdv1Called); + + // Wait for changesets from both FDv2 and FDv1 + sink.awaitApplyCount(3, 2, TimeUnit.SECONDS); + assertTrue("Should have at least 2 changesets", sink.getApplyCount() >= 2); + } + + @Test + public void fdv1FallbackNotCalledWithoutFlag() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // Synchronizer sends normal results without FDv1 fallback flag + BlockingQueue fdv2SyncResults = new LinkedBlockingQueue<>(); + fdv2SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + fdv2SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(fdv2SyncResults) + ); + + // Create FDv1 fallback synchronizer + BlockingQueue fdv1CalledQueue = new LinkedBlockingQueue<>(); + FDv2DataSource.DataSourceFactory fdv1Fallback = () -> { + fdv1CalledQueue.offer(true); + return new MockQueuedSynchronizer(new LinkedBlockingQueue<>()); + }; + + FDv2DataSource dataSource = new FDv2DataSource( + initializers, + synchronizers, + fdv1Fallback, + sink, + Thread.NORM_PRIORITY, + logger, + executor, + 120, + 300 + ); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Wait to see if FDv1 gets called (it shouldn't) + Boolean fdv1Called = fdv1CalledQueue.poll(500, TimeUnit.MILLISECONDS); + assertNull("FDv1 fallback should not be activated without flag", fdv1Called); + + sink.awaitApplyCount(2, 2, TimeUnit.SECONDS); + assertEquals(2, sink.getApplyCount()); + } + + @Test + public void fdv1FallbackWorksAfterInterruption() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // First synchronizer sends data, then INTERRUPTED with fallback flag + BlockingQueue fdv2SyncResults = new LinkedBlockingQueue<>(); + fdv2SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + fdv2SyncResults.add(FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, + 500, + "Network error", + Instant.now() + ), + true // FDv1 fallback flag + )); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(fdv2SyncResults) + ); + + // Create FDv1 fallback synchronizer that sends data + BlockingQueue fdv1SyncResults = new LinkedBlockingQueue<>(); + fdv1SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + BlockingQueue fdv1CalledQueue = new LinkedBlockingQueue<>(); + FDv2DataSource.DataSourceFactory fdv1Fallback = () -> { + fdv1CalledQueue.offer(true); + return new MockQueuedSynchronizer(fdv1SyncResults); + }; + + FDv2DataSource dataSource = new FDv2DataSource( + initializers, + synchronizers, + fdv1Fallback, + sink, + Thread.NORM_PRIORITY, + logger, + executor, + 120, + 300 + ); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Wait for FDv1 to be called + Boolean fdv1Called = fdv1CalledQueue.poll(2, TimeUnit.SECONDS); + assertNotNull("FDv1 fallback should be activated after interruption with flag", fdv1Called); + + // Wait for changesets from both FDv2 and FDv1 + sink.awaitApplyCount(2, 2, TimeUnit.SECONDS); + assertTrue("Should have at least 2 changesets", sink.getApplyCount() >= 2); + } + + @Test + public void fdv1FallbackWithoutConfiguredFallbackIgnoresFlag() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // Synchronizer sends result with FDv1 fallback flag + BlockingQueue fdv2SyncResults = new LinkedBlockingQueue<>(); + fdv2SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + fdv2SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), true)); // FDv1 fallback flag + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(fdv2SyncResults) + ); + + // No FDv1 fallback configured (null) + FDv2DataSource dataSource = new FDv2DataSource( + initializers, + synchronizers, + null, // No FDv1 fallback + sink, + Thread.NORM_PRIORITY, + logger, + executor, + 120, + 300 + ); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Should receive both changesets even though fallback flag was set + sink.awaitApplyCount(2, 2, TimeUnit.SECONDS); + assertEquals(2, sink.getApplyCount()); + } + + @Test + public void fdv1FallbackBlocksOtherSynchronizers() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // First synchronizer sends result with FDv1 fallback flag + BlockingQueue fdv2SyncResults = new LinkedBlockingQueue<>(); + fdv2SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + fdv2SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), true)); // FDv1 fallback + + // Second synchronizer that should not be called after fallback + BlockingQueue secondSyncCalledQueue = new LinkedBlockingQueue<>(); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(fdv2SyncResults), + () -> { + secondSyncCalledQueue.offer(true); + return new MockQueuedSynchronizer(new LinkedBlockingQueue<>()); + } + ); + + // Create FDv1 fallback synchronizer + BlockingQueue fdv1SyncResults = new LinkedBlockingQueue<>(); + fdv1SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + BlockingQueue fdv1CalledQueue = new LinkedBlockingQueue<>(); + FDv2DataSource.DataSourceFactory fdv1Fallback = () -> { + fdv1CalledQueue.offer(true); + return new MockQueuedSynchronizer(fdv1SyncResults); + }; + + FDv2DataSource dataSource = new FDv2DataSource( + initializers, + synchronizers, + fdv1Fallback, + sink, + Thread.NORM_PRIORITY, + logger, + executor, + 120, + 300 + ); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Wait for FDv1 to be called + Boolean fdv1Called = fdv1CalledQueue.poll(2, TimeUnit.SECONDS); + assertNotNull("FDv1 fallback should be activated", fdv1Called); + + // Second synchronizer should not be called + Boolean secondSyncCalled = secondSyncCalledQueue.poll(500, TimeUnit.MILLISECONDS); + assertNull("Second synchronizer should not be called after FDv1 fallback", secondSyncCalled); + } + + @Test + public void fdv1FallbackOnlyCalledOncePerDataSource() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // Synchronizer sends multiple results with FDv1 fallback flags + BlockingQueue fdv2SyncResults = new LinkedBlockingQueue<>(); + fdv2SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + fdv2SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), true)); // First fallback + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(fdv2SyncResults) + ); + + // Create FDv1 fallback synchronizer that also sends a fallback flag + BlockingQueue fdv1SyncResults = new LinkedBlockingQueue<>(); + fdv1SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + fdv1SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), true)); // Second fallback attempt + + BlockingQueue fdv1CalledQueue = new LinkedBlockingQueue<>(); + FDv2DataSource.DataSourceFactory fdv1Fallback = () -> { + fdv1CalledQueue.offer(true); + return new MockQueuedSynchronizer(fdv1SyncResults); + }; + + FDv2DataSource dataSource = new FDv2DataSource( + initializers, + synchronizers, + fdv1Fallback, + sink, + Thread.NORM_PRIORITY, + logger, + executor, + 120, + 300 + ); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Wait for first FDv1 call + Boolean firstCall = fdv1CalledQueue.poll(2, TimeUnit.SECONDS); + assertNotNull("FDv1 fallback should be called once", firstCall); + + // Should not be called again even if FDv1 sends fallback flag + Boolean secondCall = fdv1CalledQueue.poll(500, TimeUnit.MILLISECONDS); + assertNull("FDv1 fallback should only be called once", secondCall); + } + // ============================================================================ // Mock Implementations // ============================================================================ @@ -2009,6 +2676,9 @@ private static class MockDataSourceUpdateSink implements DataSourceUpdateSinkV2 private final AtomicInteger applyCount = new AtomicInteger(0); private final AtomicReference> lastChangeSet = new AtomicReference<>(); private final BlockingQueue applySignals = new LinkedBlockingQueue<>(); + private final AtomicReference lastState = new AtomicReference<>(); + private final AtomicReference lastError = new AtomicReference<>(); + private final BlockingQueue statusUpdates = new LinkedBlockingQueue<>(); @Override public boolean apply(DataStoreTypes.ChangeSet changeSet) { @@ -2020,7 +2690,9 @@ public boolean apply(DataStoreTypes.ChangeSet cha @Override public void updateStatus(DataSourceStatusProvider.State newState, DataSourceStatusProvider.ErrorInfo errorInfo) { - // TODO: Track status updates when data source status is fully implemented + lastState.set(newState); + lastError.set(errorInfo); + statusUpdates.offer(newState); } @Override @@ -2036,6 +2708,35 @@ public DataStoreTypes.ChangeSet getLastChangeSet( return lastChangeSet.get(); } + public DataSourceStatusProvider.State getLastState() { + return lastState.get(); + } + + public DataSourceStatusProvider.ErrorInfo getLastError() { + return lastError.get(); + } + + public DataSourceStatusProvider.State awaitStatus(long timeout, TimeUnit unit) throws InterruptedException { + return statusUpdates.poll(timeout, unit); + } + + public List awaitStatuses(int count, long timeout, TimeUnit unit) throws InterruptedException { + List statuses = new ArrayList<>(); + long deadline = System.currentTimeMillis() + unit.toMillis(timeout); + for (int i = 0; i < count; i++) { + long remaining = deadline - System.currentTimeMillis(); + if (remaining <= 0) { + break; + } + DataSourceStatusProvider.State status = statusUpdates.poll(remaining, TimeUnit.MILLISECONDS); + if (status == null) { + break; + } + statuses.add(status); + } + return statuses; + } + public void awaitApplyCount(int expectedCount, long timeout, TimeUnit unit) throws InterruptedException { long deadline = System.currentTimeMillis() + unit.toMillis(timeout); while (applyCount.get() < expectedCount && System.currentTimeMillis() < deadline) { @@ -2127,38 +2828,39 @@ public void close() { } private static class MockQueuedSynchronizer implements Synchronizer { - private final BlockingQueue results; + private final IterableAsyncQueue results; private volatile boolean closed = false; public MockQueuedSynchronizer(BlockingQueue results) { + // Convert BlockingQueue to IterableAsyncQueue by draining it + this.results = new IterableAsyncQueue<>(); + java.util.ArrayList temp = new java.util.ArrayList<>(); + results.drainTo(temp); + temp.forEach(this.results::put); + } + + public MockQueuedSynchronizer(IterableAsyncQueue results) { this.results = results; } public void addResult(FDv2SourceResult result) { if (!closed) { - results.add(result); + results.put(result); } } @Override public CompletableFuture next() { - if (closed) { - return CompletableFuture.completedFuture(FDv2SourceResult.shutdown()); - } - - // Try to get immediately, don't wait - FDv2SourceResult result = results.poll(); - if (result != null) { - return CompletableFuture.completedFuture(result); - } else { - // Queue is empty - return a never-completing future to simulate waiting for more data - return new CompletableFuture<>(); - } + return results.take(); } @Override public void close() { - closed = true; + if (!closed) { + closed = true; + // Emit shutdown result - this will complete any pending take() or queue it for next take() + results.put(FDv2SourceResult.shutdown()); + } } } diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/SourceManagerTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/SourceManagerTest.java new file mode 100644 index 00000000..d0a20e66 --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/SourceManagerTest.java @@ -0,0 +1,409 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.server.datasources.Synchronizer; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@SuppressWarnings("javadoc") +public class SourceManagerTest extends BaseTest { + + private static class TestSynchronizerFactory extends SynchronizerFactoryWithState { + private final FDv2DataSource.DataSourceFactory mockFactory; + + public TestSynchronizerFactory(FDv2DataSource.DataSourceFactory mockFactory) { + super(mockFactory); + this.mockFactory = mockFactory; + } + + public FDv2DataSource.DataSourceFactory getFactory() { + return mockFactory; + } + } + + private TestSynchronizerFactory createMockFactory() { + FDv2DataSource.DataSourceFactory factory = mock(FDv2DataSource.DataSourceFactory.class); + // Return a new mock each time build() is called to avoid reusing the same instance + when(factory.build()).thenAnswer(invocation -> mock(Synchronizer.class)); + return new TestSynchronizerFactory(factory); + } + + @Test + public void getNextAvailableSynchronizerReturnsNullWhenEmpty() { + List synchronizers = new ArrayList<>(); + SourceManager manager = new SourceManager(synchronizers, new ArrayList<>()); + + Synchronizer result = manager.getNextAvailableSynchronizerAndSetActive(); + + assertNull(result); + } + + @Test + public void getNextAvailableSynchronizerReturnsFirstOnFirstCall() { + List synchronizers = new ArrayList<>(); + TestSynchronizerFactory sync1 = createMockFactory(); + synchronizers.add(sync1); + + SourceManager manager = new SourceManager(synchronizers, new ArrayList<>()); + + Synchronizer result = manager.getNextAvailableSynchronizerAndSetActive(); + + assertNotNull(result); + // Verify it was built from sync1 + verify(sync1.getFactory(), times(1)).build(); + } + + @Test + public void getNextAvailableSynchronizerLoopsThroughAvailable() { + List synchronizers = new ArrayList<>(); + TestSynchronizerFactory sync1 = createMockFactory(); + TestSynchronizerFactory sync2 = createMockFactory(); + TestSynchronizerFactory sync3 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + synchronizers.add(sync3); + + SourceManager manager = new SourceManager(synchronizers, new ArrayList<>()); + + // First call builds from sync1 + manager.getNextAvailableSynchronizerAndSetActive(); + verify(sync1.getFactory(), times(1)).build(); + + // Second call builds from sync2 + manager.getNextAvailableSynchronizerAndSetActive(); + verify(sync2.getFactory(), times(1)).build(); + + // Third call builds from sync3 + manager.getNextAvailableSynchronizerAndSetActive(); + verify(sync3.getFactory(), times(1)).build(); + } + + @Test + public void getNextAvailableSynchronizerWrapsAroundToBeginning() { + List synchronizers = new ArrayList<>(); + TestSynchronizerFactory sync1 = createMockFactory(); + TestSynchronizerFactory sync2 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + + SourceManager manager = new SourceManager(synchronizers, new ArrayList<>()); + + // Get all synchronizers + manager.getNextAvailableSynchronizerAndSetActive(); // sync1 + manager.getNextAvailableSynchronizerAndSetActive(); // sync2 + + // Should wrap around to sync1 + manager.getNextAvailableSynchronizerAndSetActive(); + + // sync1 should have been built twice + verify(sync1.getFactory(), times(2)).build(); + } + + @Test + public void getNextAvailableSynchronizerSkipsBlockedSynchronizers() { + List synchronizers = new ArrayList<>(); + TestSynchronizerFactory sync1 = createMockFactory(); + TestSynchronizerFactory sync2 = createMockFactory(); + TestSynchronizerFactory sync3 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + synchronizers.add(sync3); + + SourceManager manager = new SourceManager(synchronizers, new ArrayList<>()); + + // Block sync2 + sync2.block(); + + // First call builds from sync1 + manager.getNextAvailableSynchronizerAndSetActive(); + verify(sync1.getFactory(), times(1)).build(); + + // Second call skips sync2 and builds from sync3 + manager.getNextAvailableSynchronizerAndSetActive(); + verify(sync3.getFactory(), times(1)).build(); + verify(sync2.getFactory(), times(0)).build(); + + // Third call wraps and builds from sync1 (skips sync2) + manager.getNextAvailableSynchronizerAndSetActive(); + verify(sync1.getFactory(), times(2)).build(); + } + + @Test + public void getNextAvailableSynchronizerReturnsNullWhenAllBlocked() { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync1 = createMockFactory(); + SynchronizerFactoryWithState sync2 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + + SourceManager manager = new SourceManager(synchronizers, new ArrayList<>()); + + // Block all synchronizers + sync1.block(); + sync2.block(); + + Synchronizer result = manager.getNextAvailableSynchronizerAndSetActive(); + + assertNull(result); + } + + @Test + public void resetSourceIndexResetsToFirstSynchronizer() { + List synchronizers = new ArrayList<>(); + TestSynchronizerFactory sync1 = createMockFactory(); + TestSynchronizerFactory sync2 = createMockFactory(); + TestSynchronizerFactory sync3 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + synchronizers.add(sync3); + + SourceManager manager = new SourceManager(synchronizers, new ArrayList<>()); + + // Advance to sync3 + manager.getNextAvailableSynchronizerAndSetActive(); // sync1 + manager.getNextAvailableSynchronizerAndSetActive(); // sync2 + manager.getNextAvailableSynchronizerAndSetActive(); // sync3 + + // Reset + manager.resetSourceIndex(); + + // Next call should build from sync1 again + manager.getNextAvailableSynchronizerAndSetActive(); + verify(sync1.getFactory(), times(2)).build(); + } + + @Test + public void isPrimeSynchronizerReturnsTrueForFirst() { + List synchronizers = new ArrayList<>(); + TestSynchronizerFactory sync1 = createMockFactory(); + TestSynchronizerFactory sync2 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + + SourceManager manager = new SourceManager(synchronizers, new ArrayList<>()); + + // Get first synchronizer + manager.getNextAvailableSynchronizerAndSetActive(); + + assertTrue(manager.isPrimeSynchronizer()); + } + + @Test + public void isPrimeSynchronizerReturnsFalseForNonFirst() { + List synchronizers = new ArrayList<>(); + TestSynchronizerFactory sync1 = createMockFactory(); + TestSynchronizerFactory sync2 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + + SourceManager manager = new SourceManager(synchronizers, new ArrayList<>()); + + // Get first then second synchronizer + manager.getNextAvailableSynchronizerAndSetActive(); + manager.getNextAvailableSynchronizerAndSetActive(); + + assertFalse(manager.isPrimeSynchronizer()); + } + + @Test + public void isPrimeSynchronizerReturnsFalseWhenNoSynchronizerSelected() { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync1 = createMockFactory(); + synchronizers.add(sync1); + + SourceManager manager = new SourceManager(synchronizers, new ArrayList<>()); + + // Haven't called getNext yet + assertFalse(manager.isPrimeSynchronizer()); + } + + @Test + public void isPrimeSynchronizerHandlesBlockedFirstSynchronizer() { + List synchronizers = new ArrayList<>(); + TestSynchronizerFactory sync1 = createMockFactory(); + TestSynchronizerFactory sync2 = createMockFactory(); + TestSynchronizerFactory sync3 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + synchronizers.add(sync3); + + SourceManager manager = new SourceManager(synchronizers, new ArrayList<>()); + + // Block first synchronizer + sync1.block(); + + // Get second synchronizer (which is now the prime) + manager.getNextAvailableSynchronizerAndSetActive(); + + assertTrue(manager.isPrimeSynchronizer()); + } + + @Test + public void getAvailableSynchronizerCountReturnsCorrectCount() { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync1 = createMockFactory(); + SynchronizerFactoryWithState sync2 = createMockFactory(); + SynchronizerFactoryWithState sync3 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + synchronizers.add(sync3); + + SourceManager manager = new SourceManager(synchronizers, new ArrayList<>()); + + assertEquals(3, manager.getAvailableSynchronizerCount()); + } + + @Test + public void getAvailableSynchronizerCountUpdatesWhenBlocked() { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync1 = createMockFactory(); + SynchronizerFactoryWithState sync2 = createMockFactory(); + SynchronizerFactoryWithState sync3 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + synchronizers.add(sync3); + + SourceManager manager = new SourceManager(synchronizers, new ArrayList<>()); + + assertEquals(3, manager.getAvailableSynchronizerCount()); + + sync2.block(); + assertEquals(2, manager.getAvailableSynchronizerCount()); + + sync1.block(); + assertEquals(1, manager.getAvailableSynchronizerCount()); + + sync3.block(); + assertEquals(0, manager.getAvailableSynchronizerCount()); + } + + + @Test + public void shutdownClosesActiveSource() throws IOException { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync = createMockFactory(); + synchronizers.add(sync); + SourceManager manager = new SourceManager(synchronizers, new ArrayList<>()); + + Synchronizer source = manager.getNextAvailableSynchronizerAndSetActive(); + assertNotNull(source); + + manager.close(); + + verify(source, times(1)).close(); + } + + @Test + public void shutdownCanBeCalledMultipleTimes() throws IOException { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync = createMockFactory(); + synchronizers.add(sync); + SourceManager manager = new SourceManager(synchronizers, new ArrayList<>()); + + Synchronizer source = manager.getNextAvailableSynchronizerAndSetActive(); + assertNotNull(source); + + manager.close(); + manager.close(); + manager.close(); + + // Should only close once + verify(source, times(1)).close(); + } + + @Test + public void shutdownIgnoresCloseException() throws IOException { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync = createMockFactory(); + synchronizers.add(sync); + SourceManager manager = new SourceManager(synchronizers, new ArrayList<>()); + + Synchronizer source = manager.getNextAvailableSynchronizerAndSetActive(); + assertNotNull(source); + doThrow(new IOException("test")).when(source).close(); + + // Should not throw + manager.close(); + } + + @Test + public void shutdownWithoutActiveSourceDoesNotFail() { + List synchronizers = new ArrayList<>(); + SourceManager manager = new SourceManager(synchronizers, new ArrayList<>()); + + // Should not throw + manager.close(); + } + + @Test + public void integrationTestFullCycle() throws IOException { + List synchronizers = new ArrayList<>(); + TestSynchronizerFactory sync1 = createMockFactory(); + TestSynchronizerFactory sync2 = createMockFactory(); + TestSynchronizerFactory sync3 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + synchronizers.add(sync3); + + SourceManager manager = new SourceManager(synchronizers, new ArrayList<>()); + + // Initial state + assertEquals(3, manager.getAvailableSynchronizerCount()); + assertFalse(manager.isPrimeSynchronizer()); + + // Get first synchronizer + Synchronizer first = manager.getNextAvailableSynchronizerAndSetActive(); + assertNotNull(first); + verify(sync1.getFactory(), times(1)).build(); + assertTrue(manager.isPrimeSynchronizer()); + + // Get second synchronizer + Synchronizer second = manager.getNextAvailableSynchronizerAndSetActive(); + assertNotNull(second); + verify(sync2.getFactory(), times(1)).build(); + assertFalse(manager.isPrimeSynchronizer()); + + // Block second + sync2.block(); + assertEquals(2, manager.getAvailableSynchronizerCount()); + + // Get third synchronizer + Synchronizer third = manager.getNextAvailableSynchronizerAndSetActive(); + assertNotNull(third); + verify(sync3.getFactory(), times(1)).build(); + assertFalse(manager.isPrimeSynchronizer()); + + // Reset and get first again + manager.resetSourceIndex(); + Synchronizer firstAgain = manager.getNextAvailableSynchronizerAndSetActive(); + assertNotNull(firstAgain); + verify(sync1.getFactory(), times(2)).build(); + assertTrue(manager.isPrimeSynchronizer()); + + // Verify latest active source is set + Synchronizer source = manager.getNextAvailableSynchronizerAndSetActive(); + assertNotNull(source); + + // Shutdown + manager.close(); + verify(source, times(1)).close(); + + // After shutdown, new sources are immediately closed + Synchronizer newSource = manager.getNextAvailableSynchronizerAndSetActive(); + assertNull(newSource); // Returns null after shutdown + } +} diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/SynchronizerStateManagerTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/SynchronizerStateManagerTest.java deleted file mode 100644 index ca19d351..00000000 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/SynchronizerStateManagerTest.java +++ /dev/null @@ -1,425 +0,0 @@ -package com.launchdarkly.sdk.server; - -import com.launchdarkly.sdk.server.datasources.Synchronizer; - -import org.junit.Test; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@SuppressWarnings("javadoc") -public class SynchronizerStateManagerTest extends BaseTest { - - private SynchronizerFactoryWithState createMockFactory() { - FDv2DataSource.DataSourceFactory factory = mock(FDv2DataSource.DataSourceFactory.class); - when(factory.build()).thenReturn(mock(Synchronizer.class)); - return new SynchronizerFactoryWithState(factory); - } - - @Test - public void getNextAvailableSynchronizerReturnsNullWhenEmpty() { - List synchronizers = new ArrayList<>(); - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - SynchronizerFactoryWithState result = manager.getNextAvailableSynchronizer(); - - assertNull(result); - } - - @Test - public void getNextAvailableSynchronizerReturnsFirstOnFirstCall() { - List synchronizers = new ArrayList<>(); - SynchronizerFactoryWithState sync1 = createMockFactory(); - synchronizers.add(sync1); - - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - SynchronizerFactoryWithState result = manager.getNextAvailableSynchronizer(); - - assertSame(sync1, result); - } - - @Test - public void getNextAvailableSynchronizerLoopsThroughAvailable() { - List synchronizers = new ArrayList<>(); - SynchronizerFactoryWithState sync1 = createMockFactory(); - SynchronizerFactoryWithState sync2 = createMockFactory(); - SynchronizerFactoryWithState sync3 = createMockFactory(); - synchronizers.add(sync1); - synchronizers.add(sync2); - synchronizers.add(sync3); - - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - // First call returns sync1 - assertSame(sync1, manager.getNextAvailableSynchronizer()); - // Second call returns sync2 - assertSame(sync2, manager.getNextAvailableSynchronizer()); - // Third call returns sync3 - assertSame(sync3, manager.getNextAvailableSynchronizer()); - } - - @Test - public void getNextAvailableSynchronizerWrapsAroundToBeginning() { - List synchronizers = new ArrayList<>(); - SynchronizerFactoryWithState sync1 = createMockFactory(); - SynchronizerFactoryWithState sync2 = createMockFactory(); - synchronizers.add(sync1); - synchronizers.add(sync2); - - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - // Get all synchronizers - manager.getNextAvailableSynchronizer(); // sync1 - manager.getNextAvailableSynchronizer(); // sync2 - - // Should wrap around to sync1 - assertSame(sync1, manager.getNextAvailableSynchronizer()); - } - - @Test - public void getNextAvailableSynchronizerSkipsBlockedSynchronizers() { - List synchronizers = new ArrayList<>(); - SynchronizerFactoryWithState sync1 = createMockFactory(); - SynchronizerFactoryWithState sync2 = createMockFactory(); - SynchronizerFactoryWithState sync3 = createMockFactory(); - synchronizers.add(sync1); - synchronizers.add(sync2); - synchronizers.add(sync3); - - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - // Block sync2 - sync2.block(); - - // First call returns sync1 - assertSame(sync1, manager.getNextAvailableSynchronizer()); - // Second call skips sync2 and returns sync3 - assertSame(sync3, manager.getNextAvailableSynchronizer()); - // Third call wraps and returns sync1 (skips sync2) - assertSame(sync1, manager.getNextAvailableSynchronizer()); - } - - @Test - public void getNextAvailableSynchronizerReturnsNullWhenAllBlocked() { - List synchronizers = new ArrayList<>(); - SynchronizerFactoryWithState sync1 = createMockFactory(); - SynchronizerFactoryWithState sync2 = createMockFactory(); - synchronizers.add(sync1); - synchronizers.add(sync2); - - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - // Block all synchronizers - sync1.block(); - sync2.block(); - - SynchronizerFactoryWithState result = manager.getNextAvailableSynchronizer(); - - assertNull(result); - } - - @Test - public void resetSourceIndexResetsToFirstSynchronizer() { - List synchronizers = new ArrayList<>(); - SynchronizerFactoryWithState sync1 = createMockFactory(); - SynchronizerFactoryWithState sync2 = createMockFactory(); - SynchronizerFactoryWithState sync3 = createMockFactory(); - synchronizers.add(sync1); - synchronizers.add(sync2); - synchronizers.add(sync3); - - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - // Advance to sync3 - manager.getNextAvailableSynchronizer(); // sync1 - manager.getNextAvailableSynchronizer(); // sync2 - manager.getNextAvailableSynchronizer(); // sync3 - - // Reset - manager.resetSourceIndex(); - - // Next call should return sync1 again - assertSame(sync1, manager.getNextAvailableSynchronizer()); - } - - @Test - public void isPrimeSynchronizerReturnsTrueForFirst() { - List synchronizers = new ArrayList<>(); - SynchronizerFactoryWithState sync1 = createMockFactory(); - SynchronizerFactoryWithState sync2 = createMockFactory(); - synchronizers.add(sync1); - synchronizers.add(sync2); - - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - // Get first synchronizer - manager.getNextAvailableSynchronizer(); - - assertTrue(manager.isPrimeSynchronizer()); - } - - @Test - public void isPrimeSynchronizerReturnsFalseForNonFirst() { - List synchronizers = new ArrayList<>(); - SynchronizerFactoryWithState sync1 = createMockFactory(); - SynchronizerFactoryWithState sync2 = createMockFactory(); - synchronizers.add(sync1); - synchronizers.add(sync2); - - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - // Get first then second synchronizer - manager.getNextAvailableSynchronizer(); - manager.getNextAvailableSynchronizer(); - - assertFalse(manager.isPrimeSynchronizer()); - } - - @Test - public void isPrimeSynchronizerReturnsFalseWhenNoSynchronizerSelected() { - List synchronizers = new ArrayList<>(); - SynchronizerFactoryWithState sync1 = createMockFactory(); - synchronizers.add(sync1); - - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - // Haven't called getNext yet - assertFalse(manager.isPrimeSynchronizer()); - } - - @Test - public void isPrimeSynchronizerHandlesBlockedFirstSynchronizer() { - List synchronizers = new ArrayList<>(); - SynchronizerFactoryWithState sync1 = createMockFactory(); - SynchronizerFactoryWithState sync2 = createMockFactory(); - SynchronizerFactoryWithState sync3 = createMockFactory(); - synchronizers.add(sync1); - synchronizers.add(sync2); - synchronizers.add(sync3); - - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - // Block first synchronizer - sync1.block(); - - // Get second synchronizer (which is now the prime) - manager.getNextAvailableSynchronizer(); - - assertTrue(manager.isPrimeSynchronizer()); - } - - @Test - public void getAvailableSynchronizerCountReturnsCorrectCount() { - List synchronizers = new ArrayList<>(); - SynchronizerFactoryWithState sync1 = createMockFactory(); - SynchronizerFactoryWithState sync2 = createMockFactory(); - SynchronizerFactoryWithState sync3 = createMockFactory(); - synchronizers.add(sync1); - synchronizers.add(sync2); - synchronizers.add(sync3); - - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - assertEquals(3, manager.getAvailableSynchronizerCount()); - } - - @Test - public void getAvailableSynchronizerCountUpdatesWhenBlocked() { - List synchronizers = new ArrayList<>(); - SynchronizerFactoryWithState sync1 = createMockFactory(); - SynchronizerFactoryWithState sync2 = createMockFactory(); - SynchronizerFactoryWithState sync3 = createMockFactory(); - synchronizers.add(sync1); - synchronizers.add(sync2); - synchronizers.add(sync3); - - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - assertEquals(3, manager.getAvailableSynchronizerCount()); - - sync2.block(); - assertEquals(2, manager.getAvailableSynchronizerCount()); - - sync1.block(); - assertEquals(1, manager.getAvailableSynchronizerCount()); - - sync3.block(); - assertEquals(0, manager.getAvailableSynchronizerCount()); - } - - @Test - public void setActiveSourceSetsNewSource() throws IOException { - List synchronizers = new ArrayList<>(); - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - Closeable source = mock(Closeable.class); - boolean shutdown = manager.setActiveSource(source); - - assertFalse(shutdown); - } - - @Test - public void setActiveSourceClosesPreviousSource() throws IOException { - List synchronizers = new ArrayList<>(); - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - Closeable firstSource = mock(Closeable.class); - Closeable secondSource = mock(Closeable.class); - - manager.setActiveSource(firstSource); - manager.setActiveSource(secondSource); - - verify(firstSource, times(1)).close(); - } - - @Test - public void setActiveSourceReturnsTrueAfterShutdown() throws IOException { - List synchronizers = new ArrayList<>(); - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - manager.close(); - - Closeable source = mock(Closeable.class); - boolean shutdown = manager.setActiveSource(source); - - assertTrue(shutdown); - verify(source, times(1)).close(); - } - - @Test - public void setActiveSourceIgnoresCloseExceptionFromPreviousSource() throws IOException { - List synchronizers = new ArrayList<>(); - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - Closeable firstSource = mock(Closeable.class); - doThrow(new IOException("test")).when(firstSource).close(); - - Closeable secondSource = mock(Closeable.class); - - manager.setActiveSource(firstSource); - // Should not throw - manager.setActiveSource(secondSource); - } - - @Test - public void shutdownClosesActiveSource() throws IOException { - List synchronizers = new ArrayList<>(); - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - Closeable source = mock(Closeable.class); - manager.setActiveSource(source); - - manager.close(); - - verify(source, times(1)).close(); - } - - @Test - public void shutdownCanBeCalledMultipleTimes() throws IOException { - List synchronizers = new ArrayList<>(); - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - Closeable source = mock(Closeable.class); - manager.setActiveSource(source); - - manager.close(); - manager.close(); - manager.close(); - - // Should only close once - verify(source, times(1)).close(); - } - - @Test - public void shutdownIgnoresCloseException() throws IOException { - List synchronizers = new ArrayList<>(); - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - Closeable source = mock(Closeable.class); - doThrow(new IOException("test")).when(source).close(); - - manager.setActiveSource(source); - - // Should not throw - manager.close(); - } - - @Test - public void shutdownWithoutActiveSourceDoesNotFail() { - List synchronizers = new ArrayList<>(); - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - // Should not throw - manager.close(); - } - - @Test - public void integrationTestFullCycle() throws IOException { - List synchronizers = new ArrayList<>(); - SynchronizerFactoryWithState sync1 = createMockFactory(); - SynchronizerFactoryWithState sync2 = createMockFactory(); - SynchronizerFactoryWithState sync3 = createMockFactory(); - synchronizers.add(sync1); - synchronizers.add(sync2); - synchronizers.add(sync3); - - SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); - - // Initial state - assertEquals(3, manager.getAvailableSynchronizerCount()); - assertFalse(manager.isPrimeSynchronizer()); - - // Get first synchronizer - SynchronizerFactoryWithState first = manager.getNextAvailableSynchronizer(); - assertSame(sync1, first); - assertTrue(manager.isPrimeSynchronizer()); - - // Get second synchronizer - SynchronizerFactoryWithState second = manager.getNextAvailableSynchronizer(); - assertSame(sync2, second); - assertFalse(manager.isPrimeSynchronizer()); - - // Block second - sync2.block(); - assertEquals(2, manager.getAvailableSynchronizerCount()); - - // Get third synchronizer - SynchronizerFactoryWithState third = manager.getNextAvailableSynchronizer(); - assertSame(sync3, third); - assertFalse(manager.isPrimeSynchronizer()); - - // Reset and get first again - manager.resetSourceIndex(); - SynchronizerFactoryWithState firstAgain = manager.getNextAvailableSynchronizer(); - assertSame(sync1, firstAgain); - assertTrue(manager.isPrimeSynchronizer()); - - // Set active source - Closeable source = mock(Closeable.class); - assertFalse(manager.setActiveSource(source)); - - // Shutdown - manager.close(); - verify(source, times(1)).close(); - - // After shutdown, new sources are immediately closed - Closeable newSource = mock(Closeable.class); - assertTrue(manager.setActiveSource(newSource)); - verify(newSource, times(1)).close(); - } -}