Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public static FullDataSet<ItemDescriptor> sortAllCollections(FullDataSet<ItemDes
DataKind kind = entry.getKey();
builder.put(kind, sortCollection(kind, entry.getValue()));
}
return new FullDataSet<>(builder.build().entrySet());
return new FullDataSet<>(builder.build().entrySet(), allData.shouldPersist());
}

/**
Expand All @@ -148,7 +148,7 @@ public static ChangeSet<ItemDescriptor> sortChangeset(ChangeSet<ItemDescriptor>
DataKind kind = entry.getKey();
builder.put(kind, sortCollection(kind, entry.getValue()));
}
return new ChangeSet<>(inSet.getType(), inSet.getSelector(), builder.build().entrySet(), inSet.getEnvironmentId());
return new ChangeSet<>(inSet.getType(), inSet.getSelector(), builder.build().entrySet(), inSet.getEnvironmentId(), inSet.shouldPersist());
}

private static KeyedItems<ItemDescriptor> sortCollection(DataKind kind, KeyedItems<ItemDescriptor> input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.launchdarkly.sdk.server.DataModel.VersionedData;
import com.launchdarkly.sdk.server.DataModel.WeightedVariation;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems;
import com.launchdarkly.sdk.server.subsystems.SerializationException;
Expand Down Expand Up @@ -106,7 +105,7 @@ static VersionedData deserializeFromJsonReader(DataKind kind, JsonReader jr) thr
* @param jr the JSON reader
* @return the deserialized data
*/
static FullDataSet<ItemDescriptor> parseFullDataSet(JsonReader jr) throws SerializationException {
static Iterable<Map.Entry<DataKind, KeyedItems<ItemDescriptor>>> parseFullDataSet(JsonReader jr) throws SerializationException {
ImmutableList.Builder<Map.Entry<String, ItemDescriptor>> flags = ImmutableList.builder();
ImmutableList.Builder<Map.Entry<String, ItemDescriptor>> segments = ImmutableList.builder();

Expand Down Expand Up @@ -141,10 +140,10 @@ static FullDataSet<ItemDescriptor> parseFullDataSet(JsonReader jr) throws Serial
}
jr.endObject();

return new FullDataSet<ItemDescriptor>(ImmutableMap.of(
return ImmutableMap.of(
FEATURES, new KeyedItems<>(flags.build()),
SEGMENTS, new KeyedItems<>(segments.build())
).entrySet());
).entrySet();
} catch (IOException e) {
throw new SerializationException(e);
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,8 @@ private boolean applyToLegacyStore(ChangeSet<ItemDescriptor> sortedChangeSet) {
}

private boolean applyFullChangeSetToLegacyStore(ChangeSet<ItemDescriptor> unsortedChangeset) {
// Convert ChangeSet to FullDataSet for legacy init path
return init(new FullDataSet<>(unsortedChangeset.getData()));
// Convert ChangeSet to FullDataSet for legacy init path, preserving shouldPersist flag
return init(new FullDataSet<>(unsortedChangeset.getData(), unsortedChangeset.shouldPersist()));
}

private boolean applyPartialChangeSetToLegacyStore(ChangeSet<ItemDescriptor> changeSet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public FullDataSet<ItemDescriptor> getAllData(boolean returnDataEvenIfCached)
}

JsonReader jr = new JsonReader(response.body().charStream());
return parseFullDataSet(jr);
// Polling data from LaunchDarkly should be persisted
return new FullDataSet<>(parseFullDataSet(jr), true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ private FDv2ChangeSetTranslator() {
* @param changeset the FDv2 changeset to convert
* @param logger logger for diagnostic messages
* @param environmentId the environment ID to include in the changeset (may be null)
* @param shouldPersist true if the data should be persisted to persistent stores, false otherwise
* @return a DataStoreTypes.ChangeSet containing the converted data
* @throws IllegalArgumentException if the changeset type is unknown
*/
public static DataStoreTypes.ChangeSet<ItemDescriptor> toChangeSet(
FDv2ChangeSet changeset,
LDLogger logger,
String environmentId) {
String environmentId,
boolean shouldPersist) {
ChangeSetType changeSetType;
switch (changeset.getType()) {
case FULL:
Expand Down Expand Up @@ -103,7 +105,8 @@ public static DataStoreTypes.ChangeSet<ItemDescriptor> toChangeSet(
changeSetType,
changeset.getSelector(),
dataBuilder.build(),
environmentId);
environmentId,
shouldPersist);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ class InMemoryDataStore implements DataStore, TransactionalDataStore, CacheExpor
private Object writeLock = new Object();
private final Object selectorLock = new Object();
private volatile Selector selector = Selector.EMPTY;
private volatile boolean shouldPersist = false;

@Override
public void init(FullDataSet<ItemDescriptor> allData) {
applyFullPayload(allData.getData(), null, Selector.EMPTY);
applyFullPayload(allData.getData(), null, Selector.EMPTY, allData.shouldPersist());
}

@Override
public ItemDescriptor get(DataKind kind, String key) {
Map<String, ItemDescriptor> items = allData.get(kind);
Map<String, ItemDescriptor> items = this.allData.get(kind);
if (items == null) {
return null;
}
Expand All @@ -49,7 +50,7 @@ public ItemDescriptor get(DataKind kind, String key) {

@Override
public KeyedItems<ItemDescriptor> getAll(DataKind kind) {
Map<String, ItemDescriptor> items = allData.get(kind);
Map<String, ItemDescriptor> items = this.allData.get(kind);
if (items == null) {
return new KeyedItems<>(null);
}
Expand All @@ -58,7 +59,7 @@ public KeyedItems<ItemDescriptor> getAll(DataKind kind) {

@Override
public boolean upsert(DataKind kind, String key, ItemDescriptor item) {
synchronized (writeLock) {
synchronized (this.writeLock) {
Map<String, ItemDescriptor> existingItems = this.allData.get(kind);
ItemDescriptor oldItem = null;
if (existingItems != null) {
Expand Down Expand Up @@ -97,7 +98,7 @@ public boolean upsert(DataKind kind, String key, ItemDescriptor item) {

@Override
public boolean isInitialized() {
return initialized;
return this.initialized;
}

@Override
Expand All @@ -124,10 +125,10 @@ public void close() throws IOException {
public void apply(ChangeSet<ItemDescriptor> changeSet) {
switch (changeSet.getType()) {
case Full:
applyFullPayload(changeSet.getData(), changeSet.getEnvironmentId(), changeSet.getSelector());
applyFullPayload(changeSet.getData(), changeSet.getEnvironmentId(), changeSet.getSelector(), changeSet.shouldPersist());
break;
case Partial:
applyPartialData(changeSet.getData(), changeSet.getSelector());
applyPartialData(changeSet.getData(), changeSet.getSelector(), changeSet.shouldPersist());
break;
case None:
break;
Expand All @@ -140,20 +141,20 @@ public void apply(ChangeSet<ItemDescriptor> changeSet) {

@Override
public Selector getSelector() {
synchronized (selectorLock) {
return selector;
synchronized (this.selectorLock) {
return this.selector;
}
}

private void setSelector(Selector newSelector) {
synchronized (selectorLock) {
selector = newSelector;
synchronized (this.selectorLock) {
this.selector = newSelector;
}
}

private void applyPartialData(Iterable<Map.Entry<DataKind, KeyedItems<ItemDescriptor>>> data,
Selector selector) {
synchronized (writeLock) {
Selector selector, boolean shouldPersist) {
synchronized (this.writeLock) {
// Build the complete updated dictionary before assigning to Items for transactional update
ImmutableMap.Builder<DataKind, Map<String, ItemDescriptor>> itemsBuilder = ImmutableMap.builder();

Expand All @@ -164,7 +165,7 @@ private void applyPartialData(Iterable<Map.Entry<DataKind, KeyedItems<ItemDescri
}

// Add all existing kinds that are NOT being updated
for (Map.Entry<DataKind, Map<String, ItemDescriptor>> existingEntry : allData.entrySet()) {
for (Map.Entry<DataKind, Map<String, ItemDescriptor>> existingEntry : this.allData.entrySet()) {
if (!updatedKinds.contains(existingEntry.getKey())) {
itemsBuilder.put(existingEntry.getKey(), existingEntry.getValue());
}
Expand All @@ -176,7 +177,7 @@ private void applyPartialData(Iterable<Map.Entry<DataKind, KeyedItems<ItemDescri
// Use HashMap to allow overwriting, then convert to ImmutableMap
Map<String, ItemDescriptor> kindMap = new HashMap<>();

Map<String, ItemDescriptor> itemsOfKind = allData.get(kind);
Map<String, ItemDescriptor> itemsOfKind = this.allData.get(kind);
if (itemsOfKind != null) {
kindMap.putAll(itemsOfKind);
}
Expand All @@ -189,13 +190,14 @@ private void applyPartialData(Iterable<Map.Entry<DataKind, KeyedItems<ItemDescri
itemsBuilder.put(kind, ImmutableMap.copyOf(kindMap));
}

allData = itemsBuilder.build();
this.allData = itemsBuilder.build();
this.shouldPersist = shouldPersist;
setSelector(selector);
}
}

private void applyFullPayload(Iterable<Map.Entry<DataKind, KeyedItems<ItemDescriptor>>> data,
String environmentId, Selector selector) {
String environmentId, Selector selector, boolean shouldPersist) {
ImmutableMap.Builder<DataKind, Map<String, ItemDescriptor>> itemsBuilder = ImmutableMap.builder();

for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> kindEntry : data) {
Expand All @@ -208,26 +210,28 @@ private void applyFullPayload(Iterable<Map.Entry<DataKind, KeyedItems<ItemDescri

ImmutableMap<DataKind, Map<String, ItemDescriptor>> newItems = itemsBuilder.build();

synchronized (writeLock) {
allData = newItems;
initialized = true;
synchronized (this.writeLock) {
this.allData = newItems;
this.initialized = true;
this.shouldPersist = shouldPersist;
setSelector(selector);
}
}

@Override
public FullDataSet<ItemDescriptor> exportAll() {
synchronized (writeLock) {
synchronized (this.writeLock) {
ImmutableList.Builder<Map.Entry<DataKind, KeyedItems<ItemDescriptor>>> builder = ImmutableList.builder();

for (Map.Entry<DataKind, Map<String, ItemDescriptor>> kindEntry : allData.entrySet()) {
for (Map.Entry<DataKind, Map<String, ItemDescriptor>> kindEntry : this.allData.entrySet()) {
builder.add(new AbstractMap.SimpleEntry<>(
kindEntry.getKey(),
new KeyedItems<>(ImmutableList.copyOf(kindEntry.getValue().entrySet()))
));
}

return new FullDataSet<>(builder.build());
// Preserve the shouldPersist value that was set when data was provided to this store
return new FullDataSet<>(builder.build(), this.shouldPersist);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ static FullDataSet<SerializedItemDescriptor> toSerializedFormat(
));
}

return new FullDataSet<>(builder.build());
// Preserve shouldPersist flag when converting formats
return new FullDataSet<>(builder.build(), inMemoryData.shouldPersist());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void init(FullDataSet<ItemDescriptor> allData) {
KeyedItems<SerializedItemDescriptor> items = PersistentDataStoreConverter.serializeAll(kind, e0.getValue());
allBuilder.add(new AbstractMap.SimpleEntry<>(kind, items));
}
RuntimeException failure = initCore(new FullDataSet<>(allBuilder.build()));
RuntimeException failure = initCore(new FullDataSet<>(allBuilder.build(), allData.shouldPersist()));
if (itemCache != null && allCache != null) {
itemCache.invalidateAll();
allCache.invalidateAll();
Expand Down Expand Up @@ -410,6 +410,12 @@ private boolean pollAvailabilityAfterOutage() {
if (externalCacheSnapshot.isInitialized()) {
try {
FullDataSet<ItemDescriptor> externalData = externalCacheSnapshot.exportAll();

if (!externalData.shouldPersist()) {
logger.debug("Skipping persistence of non-authoritative data (shouldPersist=false) during recovery");
return true; // Recovery succeeded, but we didn't persist
}

FullDataSet<SerializedItemDescriptor> serializedData =
PersistentDataStoreConverter.toSerializedFormat(externalData);
RuntimeException e = initCore(serializedData);
Expand Down Expand Up @@ -453,7 +459,8 @@ private boolean pollAvailabilityAfterOutage() {
builder.add(new AbstractMap.SimpleEntry<>(kind, PersistentDataStoreConverter.serializeAll(kind, items)));
}
}
RuntimeException e = initCore(new FullDataSet<>(builder.build()));
// any data that this PersistentDataStoreWrapper contains has already passed the shouldPersist check
RuntimeException e = initCore(new FullDataSet<>(builder.build(), true));
if (e == null) {
logger.warn("Successfully updated persistent store from cached data");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
new DataStoreTypes.ChangeSet<>(DataStoreTypes.ChangeSetType.None,
Selector.EMPTY,
null,
null // Header derived values will have been handled on initial response.
null, // Header derived values will have been handled on initial response.
true // Polling data from LaunchDarkly should be persisted
),
// Headers would have been processed from the initial response.
false);
Expand All @@ -113,7 +114,8 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> converted = FDv2ChangeSetTranslator.toChangeSet(
((FDv2ProtocolHandler.FDv2ActionChangeset) res).getChangeset(),
logger,
environmentId
environmentId,
true // Polling data from LaunchDarkly should be persisted
);
return FDv2SourceResult.changeSet(converted, fdv1Fallback);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ static PutData parsePutData(JsonReader jr) {
path = jr.nextString();
break;
case "data":
data = parseFullDataSet(jr);
// Streaming data from LaunchDarkly should be persisted
data = new FullDataSet<>(parseFullDataSet(jr), true);
break;
default:
jr.skipValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ private void handleMessage(MessageEvent event) {
FDv2ChangeSetTranslator.toChangeSet(
changeset.getChangeset(),
logger,
event.getHeaders().value(HeaderConstants.ENVIRONMENT_ID.getHeaderName()));
event.getHeaders().value(HeaderConstants.ENVIRONMENT_ID.getHeaderName()),
true);
result = FDv2SourceResult.changeSet(converted, getFallback(event));
} catch (Exception e) {
logger.error("Failed to convert FDv2 changeset: {}", LogValues.exceptionSummary(e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public void init(FullDataSet<ItemDescriptor> allData) {
memoryStore.init(allData);
maybeSwitchStore();

if (persistenceMode == DataStoreMode.READ_WRITE) {
// Only write to persistent store if shouldPersist is true and store is in READ_WRITE mode
if (persistenceMode == DataStoreMode.READ_WRITE && allData.shouldPersist()) {
if (persistentStore != null) {
persistentStore.init(allData);
}
Expand All @@ -77,6 +78,8 @@ public KeyedItems<ItemDescriptor> getAll(DataKind kind) {
@Override
public boolean upsert(DataKind kind, String key, ItemDescriptor item) {
boolean result = memoryStore.upsert(kind, key, item);
// Note: upsert() doesn't have persist information. For legacy paths (FDv1), we always persist.
// For FDv2 paths, this method shouldn't be called - use apply() instead which has persist info.
if (hasPersistence && persistenceMode == DataStoreMode.READ_WRITE) {
result = result && persistentStore.upsert(kind, key, item);
}
Expand Down Expand Up @@ -107,7 +110,8 @@ public void apply(ChangeSet<ItemDescriptor> changeSet) {
txMemoryStore.apply(changeSet);
maybeSwitchStore();

if (!hasPersistence || persistenceMode != DataStoreMode.READ_WRITE) {
// Only write to persistent store if shouldPersist is true and store is in READ_WRITE mode
if (!hasPersistence || persistenceMode != DataStoreMode.READ_WRITE || !changeSet.shouldPersist()) {
return;
}

Expand Down Expand Up @@ -175,7 +179,8 @@ private boolean applyToLegacyPersistence(ChangeSet<ItemDescriptor> sortedChangeS
* Applies a full change set to a legacy persistent store.
*/
private void applyFullChangeSetToLegacyStore(ChangeSet<ItemDescriptor> sortedChangeSet) {
persistentStore.init(new FullDataSet<>(sortedChangeSet.getData()));
// Preserve shouldPersist flag when converting ChangeSet to FullDataSet
persistentStore.init(new FullDataSet<>(sortedChangeSet.getData(), sortedChangeSet.shouldPersist()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ public FullDataSet<ItemDescriptor> build() {
for (Map.Entry<DataKind, Map<String, ItemDescriptor>> e0: allData.entrySet()) {
allBuilder.add(new AbstractMap.SimpleEntry<>(e0.getKey(), new KeyedItems<>(e0.getValue().entrySet())));
}
return new FullDataSet<>(allBuilder.build());
// File data source data is not authoritative and should not be persisted
return new FullDataSet<>(allBuilder.build(), false);
}

public void add(DataKind kind, String key, ItemDescriptor item) throws FileDataException {
Expand Down
Loading