Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
08abc7e
chore: Pipe headers through data sources.
kinyoklion Jan 21, 2026
bd15015
Merge branch 'main' into rlamb/connect-headers-to-data-sources
kinyoklion Jan 21, 2026
b20c621
Cleanup
kinyoklion Jan 21, 2026
b46ac5e
chore: Add fallback and recovery support for FDv2.
kinyoklion Jan 23, 2026
fba5aa2
Extract conditions from FDv2DataSource.
kinyoklion Jan 23, 2026
472a1d2
Extract synchronizer state management from the FDv2DataSource.
kinyoklion Jan 23, 2026
0c150a1
Add recovery condition tests.
kinyoklion Jan 23, 2026
5888761
Merge branch 'main' into rlamb/fallback-and-recovery
kinyoklion Jan 26, 2026
9e79c00
Merge branch 'rlamb/fallback-and-recovery' of github.com:launchdarkly…
kinyoklion Jan 26, 2026
33fa6c5
Closeable synchronizer state manager.
kinyoklion Jan 26, 2026
ec0d0ec
More clean shutdown model.
kinyoklion Jan 26, 2026
5b62238
SynchronizerStateManager tests.
kinyoklion Jan 26, 2026
53ea319
FDv2DataSource tests.
kinyoklion Jan 26, 2026
147556d
Add documentation to long-running test.
kinyoklion Jan 26, 2026
321fb4f
Fix test expectations.
kinyoklion Jan 26, 2026
1234bc9
Remove un-needed synchronizer.
kinyoklion Jan 27, 2026
10cab33
Correct lock on getNextAvailableSynchronizer.
kinyoklion Jan 27, 2026
1ade2f7
chore: Add support for FDv1 fallback.
kinyoklion Jan 27, 2026
382246d
PR Feedback.
kinyoklion Jan 27, 2026
e6b032e
Correct prime synchronizer logic.
kinyoklion Jan 27, 2026
615ce13
WIP
kinyoklion Jan 27, 2026
9c2c592
Merge branch 'rlamb/fallback-and-recovery' into rlamb/sdk-1625/FDv1-f…
kinyoklion Jan 27, 2026
cce4e8b
Fallback and data source status.
kinyoklion Jan 27, 2026
37482b1
Merge branch 'main' into rlamb/sdk-1625/FDv1-fallback
kinyoklion Jan 28, 2026
5b4ed39
More nuanced status.
kinyoklion Jan 28, 2026
d9e3182
Data source status tests.
kinyoklion Jan 28, 2026
0f063be
Fix remaining contract tests.
kinyoklion Jan 28, 2026
fb19847
More robust data source status testing.
kinyoklion Jan 28, 2026
cb8d9ad
Remove debug code and make tests more robust.
kinyoklion Jan 28, 2026
6673954
PR feedback first pass.
kinyoklion Jan 28, 2026
5b3c669
More robust DataSourceSynchronizerAdapter.
kinyoklion Jan 28, 2026
86aa10e
More graceful shutdown.
kinyoklion Jan 28, 2026
16ecc8f
Rename sourceManager.
kinyoklion Jan 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.launchdarkly.sdk.server.FlagsStateOption;
import com.launchdarkly.sdk.server.LDClient;
import com.launchdarkly.sdk.server.LDConfig;
import com.launchdarkly.sdk.server.interfaces.ServiceEndpoints;
import com.launchdarkly.sdk.server.migrations.Migration;
import com.launchdarkly.sdk.server.migrations.MigrationBuilder;
import com.launchdarkly.sdk.server.migrations.MigrationExecution;
Expand All @@ -25,12 +26,15 @@
import com.launchdarkly.sdk.server.integrations.HooksConfigurationBuilder;
import com.launchdarkly.sdk.server.integrations.ServiceEndpointsBuilder;
import com.launchdarkly.sdk.server.integrations.StreamingDataSourceBuilder;
import com.launchdarkly.sdk.server.integrations.PollingDataSourceBuilder;
import com.launchdarkly.sdk.server.integrations.DataSystemBuilder;
import com.launchdarkly.sdk.server.DataSystemComponents;
import com.launchdarkly.sdk.server.integrations.FDv2PollingInitializerBuilder;
import com.launchdarkly.sdk.server.integrations.FDv2PollingSynchronizerBuilder;
import com.launchdarkly.sdk.server.integrations.FDv2StreamingSynchronizerBuilder;
import com.launchdarkly.sdk.server.interfaces.BigSegmentStoreStatusProvider;
import com.launchdarkly.sdk.server.subsystems.ComponentConfigurer;
import com.launchdarkly.sdk.server.subsystems.DataSource;
import com.launchdarkly.sdk.server.subsystems.DataSourceBuilder;
import com.launchdarkly.sdk.server.datasources.Initializer;
import com.launchdarkly.sdk.server.datasources.Synchronizer;
Expand Down Expand Up @@ -563,6 +567,22 @@ private LDConfig buildSdkConfig(SdkConfigParams params, String tag) {
}
}

// Configure FDv1 fallback synchronizer
SdkConfigSynchronizerParams fallbackSynchronizer =
selectFallbackSynchronizer(params.dataSystem);
if (fallbackSynchronizer != null) {
// Set global polling endpoints if the fallback synchronizer has polling with custom base URI
if (fallbackSynchronizer.polling != null &&
fallbackSynchronizer.polling.baseUri != null) {
endpoints.polling(fallbackSynchronizer.polling.baseUri);
}

// Create and configure FDv1 fallback
ComponentConfigurer<DataSource> fdv1Fallback =
createFDv1FallbackSynchronizer(fallbackSynchronizer);
dataSystemBuilder.fDv1FallbackSynchronizer(fdv1Fallback);
}

builder.dataSystem(dataSystemBuilder);
}

Expand Down Expand Up @@ -601,4 +621,59 @@ private DataSourceBuilder<Synchronizer> createSynchronizer(
}
return null;
}

/**
* Selects the best synchronizer configuration to use for FDv1 fallback.
* Prefers polling synchronizers, falls back to primary synchronizer.
*/
private static SdkConfigSynchronizerParams selectFallbackSynchronizer(
SdkConfigDataSystemParams dataSystemParams) {

// Prefer secondary polling synchronizer
if (dataSystemParams.synchronizers != null &&
dataSystemParams.synchronizers.secondary != null &&
dataSystemParams.synchronizers.secondary.polling != null) {
return dataSystemParams.synchronizers.secondary;
}

// Fall back to primary polling synchronizer
if (dataSystemParams.synchronizers != null &&
dataSystemParams.synchronizers.primary != null &&
dataSystemParams.synchronizers.primary.polling != null) {
return dataSystemParams.synchronizers.primary;
}

// Fall back to primary synchronizer (even if streaming)
if (dataSystemParams.synchronizers != null &&
dataSystemParams.synchronizers.primary != null) {
return dataSystemParams.synchronizers.primary;
}

return null;
}

/**
* Creates the FDv1 fallback synchronizer based on the selected synchronizer config.
* FDv1 fallback is always polling-based and uses the global service endpoints configuration.
*/
private static ComponentConfigurer<DataSource> createFDv1FallbackSynchronizer(
SdkConfigSynchronizerParams synchronizer) {

// FDv1 fallback is always polling-based
PollingDataSourceBuilder fdv1Polling = Components.pollingDataSource();

// Configure polling interval if the synchronizer has polling configuration
if (synchronizer.polling != null) {
if (synchronizer.polling.pollIntervalMs != null) {
fdv1Polling.pollInterval(Duration.ofMillis(synchronizer.polling.pollIntervalMs));
}
// Note: FDv1 polling doesn't support per-source service endpoints override,
// so it will use the global service endpoints configuration (which is set
// by the caller before this method is invoked)
}
// If streaming synchronizer, use default polling interval
// (no additional configuration needed)

return fdv1Polling;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused endpoints parameter in helper function

Low Severity

The endpoints parameter in createFDv1FallbackSynchronizer is passed but never used inside the function body. The function creates and configures a PollingDataSourceBuilder using only synchronizer.polling properties, while the endpoints parameter is completely ignored. This is dead code that may confuse future developers about whether endpoint configuration is actually applied to the FDv1 fallback.

Fix in Cursor Fix in Web

}
4 changes: 0 additions & 4 deletions lib/sdk/server/contract-tests/test-suppressions-fdv2.txt
Original file line number Diff line number Diff line change
@@ -1,4 +0,0 @@
streaming/validation/unrecognized data that can be safely ignored/unknown event name with JSON body
streaming/validation/unrecognized data that can be safely ignored/unknown event name with non-JSON body
streaming/validation/unrecognized data that can be safely ignored/patch event with unrecognized path kind
streaming/fdv2/fallback to FDv1 handling
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package com.launchdarkly.sdk.server;

import com.launchdarkly.sdk.internal.fdv2.sources.Selector;
import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
import com.launchdarkly.sdk.server.datasources.Synchronizer;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;
import com.launchdarkly.sdk.server.subsystems.DataSource;
import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes;

import java.io.IOException;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
* Adapter that wraps a DataSource (FDv1 protocol) and exposes it as a Synchronizer (FDv2 protocol).
* <p>
* This adapter bridges the push-based DataSource interface with the pull-based Synchronizer interface
* by listening to updates through a custom DataSourceUpdateSink and queueing them as FDv2SourceResult objects.
* <p>
* The adapter is constructed with a factory function that receives the listening update sink and
* creates the DataSource. This ensures the DataSource uses the adapter's internal sink without exposing it.
*/
class DataSourceSynchronizerAdapter implements Synchronizer {
private final DataSource dataSource;
private final IterableAsyncQueue<FDv2SourceResult> resultQueue = new IterableAsyncQueue<>();
private final CompletableFuture<FDv2SourceResult> shutdownFuture = new CompletableFuture<>();
private final Object startLock = new Object();
private boolean started = false;
private boolean closed = false;
private Future<Void> startFuture;

/**
* Functional interface for creating a DataSource with a given update sink.
*/
@FunctionalInterface
public interface DataSourceFactory {
DataSource create(DataSourceUpdateSink updateSink);
}

/**
* Creates a new adapter that wraps a DataSource.
*
* @param dataSourceFactory factory that creates the DataSource with the provided update sink
*/
public DataSourceSynchronizerAdapter(DataSourceFactory dataSourceFactory) {
ConvertingUpdateSink convertingSink = new ConvertingUpdateSink(resultQueue);
this.dataSource = dataSourceFactory.create(convertingSink);
}

@Override
public CompletableFuture<FDv2SourceResult> next() {
synchronized (startLock) {
if (!started && !closed) {
started = true;
startFuture = dataSource.start();

// Monitor the start future for errors
// The data source will emit updates through the listening sink
CompletableFuture.runAsync(() -> {
try {
startFuture.get();
} catch (ExecutionException e) {
// Initialization failed - emit an interrupted status
DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo(
DataSourceStatusProvider.ErrorKind.UNKNOWN,
0,
e.getCause() != null ? e.getCause().toString() : e.toString(),
Instant.now()
);
resultQueue.put(FDv2SourceResult.interrupted(errorInfo, false));
} catch (CancellationException e) {
// Start future was cancelled (during close) - exit cleanly
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}

return CompletableFuture.anyOf(shutdownFuture, resultQueue.take())
.thenApply(result -> (FDv2SourceResult) result);
}

@Override
public void close() throws IOException {
synchronized (startLock) {
if (closed) {
return;
}
closed = true;
}

dataSource.close();
shutdownFuture.complete(FDv2SourceResult.shutdown());
if(startFuture != null) {
// If the start future is done, this has no effect.
// If it is not, then this will unblock the code waiting on start.
startFuture.cancel(true);
}
}

/**
* A DataSourceUpdateSink that converts DataSource updates into FDv2SourceResult objects.
* This sink doesn't delegate to any other sink - it exists solely to convert FDv1 updates to FDv2 results.
*/
private static class ConvertingUpdateSink implements DataSourceUpdateSink {
private final IterableAsyncQueue<FDv2SourceResult> resultQueue;

public ConvertingUpdateSink(IterableAsyncQueue<FDv2SourceResult> resultQueue) {
this.resultQueue = resultQueue;
}

@Override
public boolean init(DataStoreTypes.FullDataSet<DataStoreTypes.ItemDescriptor> allData) {
// Convert the full data set into a ChangeSet and emit it
DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet =
new DataStoreTypes.ChangeSet<>(
DataStoreTypes.ChangeSetType.Full,
Selector.EMPTY,
allData.getData(),
null);
resultQueue.put(FDv2SourceResult.changeSet(changeSet, false));
return true;
}

@Override
public boolean upsert(DataStoreTypes.DataKind kind, String key, DataStoreTypes.ItemDescriptor item) {
// Convert the upsert into a ChangeSet with a single item and emit it
DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor> items =
new DataStoreTypes.KeyedItems<>(Collections.<Map.Entry<String, DataStoreTypes.ItemDescriptor>>singletonList(
new AbstractMap.SimpleEntry<>(key, item)));
Iterable<Map.Entry<DataStoreTypes.DataKind, DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor>>> data =
Collections.<Map.Entry<DataStoreTypes.DataKind, DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor>>>singletonList(
new AbstractMap.SimpleEntry<>(kind, items));

DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet =
new DataStoreTypes.ChangeSet<>(
DataStoreTypes.ChangeSetType.Partial,
Selector.EMPTY,
data,
null);
resultQueue.put(FDv2SourceResult.changeSet(changeSet, false));
return true;
}

@Override
public DataStoreStatusProvider getDataStoreStatusProvider() {
// This adapter doesn't use a data store
return null;
}

@Override
public void updateStatus(DataSourceStatusProvider.State newState, DataSourceStatusProvider.ErrorInfo newError) {
// Convert state changes to FDv2SourceResult status events
switch (newState) {
case INTERRUPTED:
resultQueue.put(FDv2SourceResult.interrupted(newError, false));
break;
case OFF:
if (newError != null) {
resultQueue.put(FDv2SourceResult.terminalError(newError, false));
}
break;
case VALID:
case INITIALIZING:
// These states don't map to FDv2SourceResult status events
break;
}
}
}
}
Loading
Loading