-
Notifications
You must be signed in to change notification settings - Fork 11
feat: Add support for FDv1 fallback. #116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
kinyoklion
wants to merge
33
commits into
main
Choose a base branch
from
rlamb/sdk-1625/FDv1-fallback
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+1,612
−163
Draft
Changes from all commits
Commits
Show all changes
33 commits
Select commit
Hold shift + click to select a range
08abc7e
chore: Pipe headers through data sources.
kinyoklion bd15015
Merge branch 'main' into rlamb/connect-headers-to-data-sources
kinyoklion b20c621
Cleanup
kinyoklion b46ac5e
chore: Add fallback and recovery support for FDv2.
kinyoklion fba5aa2
Extract conditions from FDv2DataSource.
kinyoklion 472a1d2
Extract synchronizer state management from the FDv2DataSource.
kinyoklion 0c150a1
Add recovery condition tests.
kinyoklion 5888761
Merge branch 'main' into rlamb/fallback-and-recovery
kinyoklion 9e79c00
Merge branch 'rlamb/fallback-and-recovery' of github.com:launchdarkly…
kinyoklion 33fa6c5
Closeable synchronizer state manager.
kinyoklion ec0d0ec
More clean shutdown model.
kinyoklion 5b62238
SynchronizerStateManager tests.
kinyoklion 53ea319
FDv2DataSource tests.
kinyoklion 147556d
Add documentation to long-running test.
kinyoklion 321fb4f
Fix test expectations.
kinyoklion 1234bc9
Remove un-needed synchronizer.
kinyoklion 10cab33
Correct lock on getNextAvailableSynchronizer.
kinyoklion 1ade2f7
chore: Add support for FDv1 fallback.
kinyoklion 382246d
PR Feedback.
kinyoklion e6b032e
Correct prime synchronizer logic.
kinyoklion 615ce13
WIP
kinyoklion 9c2c592
Merge branch 'rlamb/fallback-and-recovery' into rlamb/sdk-1625/FDv1-f…
kinyoklion cce4e8b
Fallback and data source status.
kinyoklion 37482b1
Merge branch 'main' into rlamb/sdk-1625/FDv1-fallback
kinyoklion 5b4ed39
More nuanced status.
kinyoklion d9e3182
Data source status tests.
kinyoklion 0f063be
Fix remaining contract tests.
kinyoklion fb19847
More robust data source status testing.
kinyoklion cb8d9ad
Remove debug code and make tests more robust.
kinyoklion 6673954
PR feedback first pass.
kinyoklion 5b3c669
More robust DataSourceSynchronizerAdapter.
kinyoklion 86aa10e
More graceful shutdown.
kinyoklion 16ecc8f
Rename sourceManager.
kinyoklion File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +0,0 @@ | ||
| streaming/validation/unrecognized data that can be safely ignored/unknown event name with JSON body | ||
| streaming/validation/unrecognized data that can be safely ignored/unknown event name with non-JSON body | ||
| streaming/validation/unrecognized data that can be safely ignored/patch event with unrecognized path kind | ||
| streaming/fdv2/fallback to FDv1 handling | ||
179 changes: 179 additions & 0 deletions
179
lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSourceSynchronizerAdapter.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,179 @@ | ||
| package com.launchdarkly.sdk.server; | ||
|
|
||
| import com.launchdarkly.sdk.internal.fdv2.sources.Selector; | ||
| import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; | ||
| import com.launchdarkly.sdk.server.datasources.Synchronizer; | ||
| import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; | ||
| import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider; | ||
| import com.launchdarkly.sdk.server.subsystems.DataSource; | ||
| import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink; | ||
| import com.launchdarkly.sdk.server.subsystems.DataStoreTypes; | ||
|
|
||
| import java.io.IOException; | ||
| import java.time.Instant; | ||
| import java.util.AbstractMap; | ||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import java.util.concurrent.CancellationException; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.Future; | ||
|
|
||
| /** | ||
| * Adapter that wraps a DataSource (FDv1 protocol) and exposes it as a Synchronizer (FDv2 protocol). | ||
| * <p> | ||
| * This adapter bridges the push-based DataSource interface with the pull-based Synchronizer interface | ||
| * by listening to updates through a custom DataSourceUpdateSink and queueing them as FDv2SourceResult objects. | ||
| * <p> | ||
| * The adapter is constructed with a factory function that receives the listening update sink and | ||
| * creates the DataSource. This ensures the DataSource uses the adapter's internal sink without exposing it. | ||
| */ | ||
| class DataSourceSynchronizerAdapter implements Synchronizer { | ||
| private final DataSource dataSource; | ||
| private final IterableAsyncQueue<FDv2SourceResult> resultQueue = new IterableAsyncQueue<>(); | ||
| private final CompletableFuture<FDv2SourceResult> shutdownFuture = new CompletableFuture<>(); | ||
| private final Object startLock = new Object(); | ||
| private boolean started = false; | ||
| private boolean closed = false; | ||
| private Future<Void> startFuture; | ||
|
|
||
| /** | ||
| * Functional interface for creating a DataSource with a given update sink. | ||
| */ | ||
| @FunctionalInterface | ||
| public interface DataSourceFactory { | ||
| DataSource create(DataSourceUpdateSink updateSink); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a new adapter that wraps a DataSource. | ||
| * | ||
| * @param dataSourceFactory factory that creates the DataSource with the provided update sink | ||
| */ | ||
| public DataSourceSynchronizerAdapter(DataSourceFactory dataSourceFactory) { | ||
| ConvertingUpdateSink convertingSink = new ConvertingUpdateSink(resultQueue); | ||
| this.dataSource = dataSourceFactory.create(convertingSink); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<FDv2SourceResult> next() { | ||
| synchronized (startLock) { | ||
| if (!started && !closed) { | ||
| started = true; | ||
| startFuture = dataSource.start(); | ||
|
|
||
| // Monitor the start future for errors | ||
| // The data source will emit updates through the listening sink | ||
| CompletableFuture.runAsync(() -> { | ||
| try { | ||
| startFuture.get(); | ||
| } catch (ExecutionException e) { | ||
| // Initialization failed - emit an interrupted status | ||
| DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo( | ||
| DataSourceStatusProvider.ErrorKind.UNKNOWN, | ||
| 0, | ||
| e.getCause() != null ? e.getCause().toString() : e.toString(), | ||
| Instant.now() | ||
| ); | ||
| resultQueue.put(FDv2SourceResult.interrupted(errorInfo, false)); | ||
| } catch (CancellationException e) { | ||
| // Start future was cancelled (during close) - exit cleanly | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| }); | ||
kinyoklion marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| return CompletableFuture.anyOf(shutdownFuture, resultQueue.take()) | ||
| .thenApply(result -> (FDv2SourceResult) result); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() throws IOException { | ||
| synchronized (startLock) { | ||
| if (closed) { | ||
| return; | ||
| } | ||
| closed = true; | ||
| } | ||
|
|
||
| dataSource.close(); | ||
| shutdownFuture.complete(FDv2SourceResult.shutdown()); | ||
| if(startFuture != null) { | ||
| // If the start future is done, this has no effect. | ||
| // If it is not, then this will unblock the code waiting on start. | ||
| startFuture.cancel(true); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * A DataSourceUpdateSink that converts DataSource updates into FDv2SourceResult objects. | ||
| * This sink doesn't delegate to any other sink - it exists solely to convert FDv1 updates to FDv2 results. | ||
| */ | ||
| private static class ConvertingUpdateSink implements DataSourceUpdateSink { | ||
| private final IterableAsyncQueue<FDv2SourceResult> resultQueue; | ||
|
|
||
| public ConvertingUpdateSink(IterableAsyncQueue<FDv2SourceResult> resultQueue) { | ||
| this.resultQueue = resultQueue; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean init(DataStoreTypes.FullDataSet<DataStoreTypes.ItemDescriptor> allData) { | ||
| // Convert the full data set into a ChangeSet and emit it | ||
| DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet = | ||
| new DataStoreTypes.ChangeSet<>( | ||
| DataStoreTypes.ChangeSetType.Full, | ||
| Selector.EMPTY, | ||
| allData.getData(), | ||
| null); | ||
| resultQueue.put(FDv2SourceResult.changeSet(changeSet, false)); | ||
| return true; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean upsert(DataStoreTypes.DataKind kind, String key, DataStoreTypes.ItemDescriptor item) { | ||
| // Convert the upsert into a ChangeSet with a single item and emit it | ||
| DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor> items = | ||
| new DataStoreTypes.KeyedItems<>(Collections.<Map.Entry<String, DataStoreTypes.ItemDescriptor>>singletonList( | ||
| new AbstractMap.SimpleEntry<>(key, item))); | ||
| Iterable<Map.Entry<DataStoreTypes.DataKind, DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor>>> data = | ||
| Collections.<Map.Entry<DataStoreTypes.DataKind, DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor>>>singletonList( | ||
| new AbstractMap.SimpleEntry<>(kind, items)); | ||
|
|
||
| DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet = | ||
| new DataStoreTypes.ChangeSet<>( | ||
| DataStoreTypes.ChangeSetType.Partial, | ||
| Selector.EMPTY, | ||
| data, | ||
| null); | ||
| resultQueue.put(FDv2SourceResult.changeSet(changeSet, false)); | ||
| return true; | ||
| } | ||
|
|
||
| @Override | ||
| public DataStoreStatusProvider getDataStoreStatusProvider() { | ||
| // This adapter doesn't use a data store | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public void updateStatus(DataSourceStatusProvider.State newState, DataSourceStatusProvider.ErrorInfo newError) { | ||
| // Convert state changes to FDv2SourceResult status events | ||
| switch (newState) { | ||
| case INTERRUPTED: | ||
| resultQueue.put(FDv2SourceResult.interrupted(newError, false)); | ||
| break; | ||
| case OFF: | ||
| if (newError != null) { | ||
| resultQueue.put(FDv2SourceResult.terminalError(newError, false)); | ||
| } | ||
| break; | ||
| case VALID: | ||
| case INITIALIZING: | ||
| // These states don't map to FDv2SourceResult status events | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused
endpointsparameter in helper functionLow Severity
The
endpointsparameter increateFDv1FallbackSynchronizeris passed but never used inside the function body. The function creates and configures aPollingDataSourceBuilderusing onlysynchronizer.pollingproperties, while theendpointsparameter is completely ignored. This is dead code that may confuse future developers about whether endpoint configuration is actually applied to the FDv1 fallback.