Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.OperationCancelledException;
import com.azure.cosmos.implementation.TestConfigurations;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.rx.TestSuiteBase;
Expand Down Expand Up @@ -258,6 +260,41 @@ public void queryItemWithEndToEndTimeoutPolicyInOptionsShouldTimeout() {
}
}

@Test(groups = {"fast"}, timeOut = 10000L, retryAnalyzer = FlakyTestRetryAnalyzer.class)
public void queryChangeFeedWithEndToEndTimeoutPolicyInOptionsShouldTimeout() {
if (getClientBuilder().buildConnectionPolicy().getConnectionMode() != ConnectionMode.DIRECT) {
throw new SkipException("Failure injection only supported for DIRECT mode");
}

CosmosAsyncClient cosmosClient = initializeClient(endToEndOperationLatencyPolicyConfig);
FaultInjectionRule faultInjectionRule = null;
try {
CosmosEndToEndOperationLatencyPolicyConfig endToEndOperationLatencyPolicyConfig =
new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(1))
.build();

CosmosChangeFeedRequestOptions options =
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange());
options.setCosmosEndToEndOperationLatencyPolicyConfig(endToEndOperationLatencyPolicyConfig);

faultInjectionRule = injectFailure(createdContainer, FaultInjectionOperationType.READ_FEED_ITEM, null);
CosmosPagedFlux<TestObject> changeFeedPagedFlux =
createdContainer.queryChangeFeed(options, TestObject.class);

StepVerifier.create(changeFeedPagedFlux)
.expectErrorMatches(throwable -> throwable instanceof OperationCancelledException
&& ((OperationCancelledException) throwable).getSubStatusCode()
== HttpConstants.SubStatusCodes.CLIENT_OPERATION_TIMEOUT)
.verify();
} finally {
if (faultInjectionRule != null) {
faultInjectionRule.disable();
}

safeClose(cosmosClient);
}
}

@Test(groups = {"fast"}, timeOut = 10000L, retryAnalyzer = FlakyTestRetryAnalyzer.class)
public void queryItemWithEndToEndTimeoutPolicyInOptionsShouldTimeoutWithClientConfig() {
if (getClientBuilder().buildConnectionPolicy().getConnectionMode() != ConnectionMode.DIRECT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public final class CosmosChangeFeedRequestOptionsImpl implements OverridableRequ
private boolean completeAfterAllCurrentChangesRetrieved;
private Long endLSN;
private ReadConsistencyStrategy readConsistencyStrategy;
private CosmosEndToEndOperationLatencyPolicyConfig endToEndOperationLatencyPolicyConfig;

public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toBeCloned) {
if (toBeCloned.continuationState != null) {
Expand Down Expand Up @@ -80,6 +81,7 @@ public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toB
this.keywordIdentifiers = toBeCloned.keywordIdentifiers;
this.completeAfterAllCurrentChangesRetrieved = toBeCloned.completeAfterAllCurrentChangesRetrieved;
this.endLSN = toBeCloned.endLSN;
this.endToEndOperationLatencyPolicyConfig = toBeCloned.endToEndOperationLatencyPolicyConfig;
}

public CosmosChangeFeedRequestOptionsImpl(
Expand Down Expand Up @@ -296,8 +298,11 @@ public CosmosChangeFeedRequestOptionsImpl setExcludedRegions(List<String> exclud

@Override
public CosmosEndToEndOperationLatencyPolicyConfig getCosmosEndToEndLatencyPolicyConfig() {
// @TODO: Implement this and some of the others below
return null;
return this.endToEndOperationLatencyPolicyConfig;
}

public void setCosmosEndToEndLatencyPolicyConfig(CosmosEndToEndOperationLatencyPolicyConfig endToEndOperationLatencyPolicyConfig) {
this.endToEndOperationLatencyPolicyConfig = endToEndOperationLatencyPolicyConfig;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,50 @@ private static <T> Flux<FeedResponse<T>> getFeedResponseFluxWithTimeout(
});
}

private static <T> Flux<FeedResponse<T>> getChangeFeedResponseFluxWithTimeout(
Flux<FeedResponse<T>> feedResponseFlux,
CosmosEndToEndOperationLatencyPolicyConfig endToEndPolicyConfig,
DiagnosticsClientContext diagnosticsClientContext) {

Duration endToEndTimeout = endToEndPolicyConfig.getEndToEndOperationTimeout();

if (endToEndTimeout.isNegative()) {
return feedResponseFlux
.timeout(endToEndTimeout)
.onErrorMap(throwable -> {
if (throwable instanceof TimeoutException) {
CosmosException cancellationException = getNegativeTimeoutException(null, endToEndTimeout);
cancellationException.setStackTrace(throwable.getStackTrace());

CosmosDiagnostics mostRecentDiagnostics = diagnosticsClientContext.getMostRecentlyCreatedDiagnostics();
if (mostRecentDiagnostics != null) {
BridgeInternal.setCosmosDiagnostics(cancellationException, mostRecentDiagnostics);
}

return cancellationException;
}
return throwable;
});
}

return feedResponseFlux
.timeout(endToEndTimeout)
.onErrorMap(throwable -> {
if (throwable instanceof TimeoutException) {
CosmosException exception = new OperationCancelledException();
exception.setStackTrace(throwable.getStackTrace());

CosmosDiagnostics mostRecentDiagnostics = diagnosticsClientContext.getMostRecentlyCreatedDiagnostics();
if (mostRecentDiagnostics != null) {
BridgeInternal.setCosmosDiagnostics(exception, mostRecentDiagnostics);
}

return exception;
}
return throwable;
});
}

private void addUserAgentSuffix(UserAgentContainer userAgentContainer, Set<UserAgentFeatureFlags> userAgentFeatureFlags) {

if (!this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.isPerPartitionAutomaticFailoverEnabled()) {
Expand Down Expand Up @@ -4775,7 +4819,25 @@ public <T> Flux<FeedResponse<T>> queryDocumentChangeFeed(
diagnosticsClientContext,
crossRegionAvailabilityContextForRequest);

return changeFeedQueryImpl.executeAsync();
CosmosChangeFeedRequestOptionsImpl implOptions =
ImplementationBridgeHelpers
.CosmosChangeFeedRequestOptionsHelper
.getCosmosChangeFeedRequestOptionsAccessor()
.getImpl(requestOptions);

CosmosEndToEndOperationLatencyPolicyConfig endToEndPolicyConfig =
this.getEffectiveEndToEndOperationLatencyPolicyConfig(
implOptions.getCosmosEndToEndLatencyPolicyConfig(),
ResourceType.Document,
OperationType.ReadFeed);

Flux<FeedResponse<T>> feedResponseFlux = changeFeedQueryImpl.executeAsync();

if (endToEndPolicyConfig != null && endToEndPolicyConfig.isEnabled()) {
return getChangeFeedResponseFluxWithTimeout(feedResponseFlux, endToEndPolicyConfig, diagnosticsClientContext);
}

return feedResponseFlux;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosDiagnosticsThresholds;
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig;
import com.azure.cosmos.CosmosItemSerializer;
import com.azure.cosmos.ReadConsistencyStrategy;
import com.azure.cosmos.implementation.CosmosChangeFeedRequestOptionsImpl;
Expand Down Expand Up @@ -116,6 +117,20 @@ public CosmosChangeFeedRequestOptions setReadConsistencyStrategy(ReadConsistency
return this;
}

/**
* Sets the {@link CosmosEndToEndOperationLatencyPolicyConfig} to be used for the request. If the config is already
* set on the client, then this will override the client level config for this request.
*
* @param cosmosEndToEndOperationLatencyPolicyConfig the {@link CosmosEndToEndOperationLatencyPolicyConfig}
* @return the CosmosChangeFeedRequestOptions
*/
public CosmosChangeFeedRequestOptions setCosmosEndToEndOperationLatencyPolicyConfig(
CosmosEndToEndOperationLatencyPolicyConfig cosmosEndToEndOperationLatencyPolicyConfig) {

this.actualRequestOptions.setCosmosEndToEndLatencyPolicyConfig(cosmosEndToEndOperationLatencyPolicyConfig);
return this;
}

/**
* Gets the maximum number of pages that will be prefetched from the backend asynchronously
* in the background. By pre-fetching these changes the throughput of processing the
Expand Down
Loading