diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java index aa5004539f92..a3af2b84bed6 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java @@ -6,10 +6,12 @@ import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.OperationCancelledException; import com.azure.cosmos.implementation.TestConfigurations; +import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.CosmosItemRequestOptions; import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.SqlQuerySpec; import com.azure.cosmos.rx.TestSuiteBase; @@ -258,6 +260,41 @@ public void queryItemWithEndToEndTimeoutPolicyInOptionsShouldTimeout() { } } + @Test(groups = {"fast"}, timeOut = 10000L, retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void queryChangeFeedWithEndToEndTimeoutPolicyInOptionsShouldTimeout() { + if (getClientBuilder().buildConnectionPolicy().getConnectionMode() != ConnectionMode.DIRECT) { + throw new SkipException("Failure injection only supported for DIRECT mode"); + } + + CosmosAsyncClient cosmosClient = initializeClient(endToEndOperationLatencyPolicyConfig); + FaultInjectionRule faultInjectionRule = null; + try { + CosmosEndToEndOperationLatencyPolicyConfig endToEndOperationLatencyPolicyConfig = + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(1)) + .build(); + + CosmosChangeFeedRequestOptions options = + CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange()); + options.setCosmosEndToEndOperationLatencyPolicyConfig(endToEndOperationLatencyPolicyConfig); + + faultInjectionRule = injectFailure(createdContainer, FaultInjectionOperationType.READ_FEED_ITEM, null); + CosmosPagedFlux changeFeedPagedFlux = + createdContainer.queryChangeFeed(options, TestObject.class); + + StepVerifier.create(changeFeedPagedFlux) + .expectErrorMatches(throwable -> throwable instanceof OperationCancelledException + && ((OperationCancelledException) throwable).getSubStatusCode() + == HttpConstants.SubStatusCodes.CLIENT_OPERATION_TIMEOUT) + .verify(); + } finally { + if (faultInjectionRule != null) { + faultInjectionRule.disable(); + } + + safeClose(cosmosClient); + } + } + @Test(groups = {"fast"}, timeOut = 10000L, retryAnalyzer = FlakyTestRetryAnalyzer.class) public void queryItemWithEndToEndTimeoutPolicyInOptionsShouldTimeoutWithClientConfig() { if (getClientBuilder().buildConnectionPolicy().getConnectionMode() != ConnectionMode.DIRECT) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java index 2ef17d397b5f..d7f4faf29955 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java @@ -53,6 +53,7 @@ public final class CosmosChangeFeedRequestOptionsImpl implements OverridableRequ private boolean completeAfterAllCurrentChangesRetrieved; private Long endLSN; private ReadConsistencyStrategy readConsistencyStrategy; + private CosmosEndToEndOperationLatencyPolicyConfig endToEndOperationLatencyPolicyConfig; public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toBeCloned) { if (toBeCloned.continuationState != null) { @@ -80,6 +81,7 @@ public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toB this.keywordIdentifiers = toBeCloned.keywordIdentifiers; this.completeAfterAllCurrentChangesRetrieved = toBeCloned.completeAfterAllCurrentChangesRetrieved; this.endLSN = toBeCloned.endLSN; + this.endToEndOperationLatencyPolicyConfig = toBeCloned.endToEndOperationLatencyPolicyConfig; } public CosmosChangeFeedRequestOptionsImpl( @@ -296,8 +298,11 @@ public CosmosChangeFeedRequestOptionsImpl setExcludedRegions(List exclud @Override public CosmosEndToEndOperationLatencyPolicyConfig getCosmosEndToEndLatencyPolicyConfig() { - // @TODO: Implement this and some of the others below - return null; + return this.endToEndOperationLatencyPolicyConfig; + } + + public void setCosmosEndToEndLatencyPolicyConfig(CosmosEndToEndOperationLatencyPolicyConfig endToEndOperationLatencyPolicyConfig) { + this.endToEndOperationLatencyPolicyConfig = endToEndOperationLatencyPolicyConfig; } @Override diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 192f8175978f..c8660e6280cb 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -1520,6 +1520,50 @@ private static Flux> getFeedResponseFluxWithTimeout( }); } + private static Flux> getChangeFeedResponseFluxWithTimeout( + Flux> feedResponseFlux, + CosmosEndToEndOperationLatencyPolicyConfig endToEndPolicyConfig, + DiagnosticsClientContext diagnosticsClientContext) { + + Duration endToEndTimeout = endToEndPolicyConfig.getEndToEndOperationTimeout(); + + if (endToEndTimeout.isNegative()) { + return feedResponseFlux + .timeout(endToEndTimeout) + .onErrorMap(throwable -> { + if (throwable instanceof TimeoutException) { + CosmosException cancellationException = getNegativeTimeoutException(null, endToEndTimeout); + cancellationException.setStackTrace(throwable.getStackTrace()); + + CosmosDiagnostics mostRecentDiagnostics = diagnosticsClientContext.getMostRecentlyCreatedDiagnostics(); + if (mostRecentDiagnostics != null) { + BridgeInternal.setCosmosDiagnostics(cancellationException, mostRecentDiagnostics); + } + + return cancellationException; + } + return throwable; + }); + } + + return feedResponseFlux + .timeout(endToEndTimeout) + .onErrorMap(throwable -> { + if (throwable instanceof TimeoutException) { + CosmosException exception = new OperationCancelledException(); + exception.setStackTrace(throwable.getStackTrace()); + + CosmosDiagnostics mostRecentDiagnostics = diagnosticsClientContext.getMostRecentlyCreatedDiagnostics(); + if (mostRecentDiagnostics != null) { + BridgeInternal.setCosmosDiagnostics(exception, mostRecentDiagnostics); + } + + return exception; + } + return throwable; + }); + } + private void addUserAgentSuffix(UserAgentContainer userAgentContainer, Set userAgentFeatureFlags) { if (!this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.isPerPartitionAutomaticFailoverEnabled()) { @@ -4775,7 +4819,25 @@ public Flux> queryDocumentChangeFeed( diagnosticsClientContext, crossRegionAvailabilityContextForRequest); - return changeFeedQueryImpl.executeAsync(); + CosmosChangeFeedRequestOptionsImpl implOptions = + ImplementationBridgeHelpers + .CosmosChangeFeedRequestOptionsHelper + .getCosmosChangeFeedRequestOptionsAccessor() + .getImpl(requestOptions); + + CosmosEndToEndOperationLatencyPolicyConfig endToEndPolicyConfig = + this.getEffectiveEndToEndOperationLatencyPolicyConfig( + implOptions.getCosmosEndToEndLatencyPolicyConfig(), + ResourceType.Document, + OperationType.ReadFeed); + + Flux> feedResponseFlux = changeFeedQueryImpl.executeAsync(); + + if (endToEndPolicyConfig != null && endToEndPolicyConfig.isEnabled()) { + return getChangeFeedResponseFluxWithTimeout(feedResponseFlux, endToEndPolicyConfig, diagnosticsClientContext); + } + + return feedResponseFlux; } @Override diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java index 3ac526de6d63..2464d59df75d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java @@ -5,6 +5,7 @@ import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosDiagnosticsThresholds; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; import com.azure.cosmos.CosmosItemSerializer; import com.azure.cosmos.ReadConsistencyStrategy; import com.azure.cosmos.implementation.CosmosChangeFeedRequestOptionsImpl; @@ -116,6 +117,20 @@ public CosmosChangeFeedRequestOptions setReadConsistencyStrategy(ReadConsistency return this; } + /** + * Sets the {@link CosmosEndToEndOperationLatencyPolicyConfig} to be used for the request. If the config is already + * set on the client, then this will override the client level config for this request. + * + * @param cosmosEndToEndOperationLatencyPolicyConfig the {@link CosmosEndToEndOperationLatencyPolicyConfig} + * @return the CosmosChangeFeedRequestOptions + */ + public CosmosChangeFeedRequestOptions setCosmosEndToEndOperationLatencyPolicyConfig( + CosmosEndToEndOperationLatencyPolicyConfig cosmosEndToEndOperationLatencyPolicyConfig) { + + this.actualRequestOptions.setCosmosEndToEndLatencyPolicyConfig(cosmosEndToEndOperationLatencyPolicyConfig); + return this; + } + /** * Gets the maximum number of pages that will be prefetched from the backend asynchronously * in the background. By pre-fetching these changes the throughput of processing the