diff --git a/sdk/cosmos/azure-cosmos-tests/CONNECT_TIMEOUT_TESTING_README.md b/sdk/cosmos/azure-cosmos-tests/CONNECT_TIMEOUT_TESTING_README.md new file mode 100644 index 000000000000..7895b856b331 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/CONNECT_TIMEOUT_TESTING_README.md @@ -0,0 +1,114 @@ +# Http2ConnectTimeoutBifurcationTests — Connect Timeout Testing + +## What This Tests + +`Http2ConnectTimeoutBifurcationTests` validates that the TCP connect timeout (`CONNECT_TIMEOUT_MILLIS`) is +correctly bifurcated between Gateway V1 metadata (45s) and Gateway V2 thin client data plane (5s default). +Uses Linux `iptables` to DROP SYN packets and `tc netem` with `iptables mangle` for per-port delay. + +**Key invariant proven:** Thin client data plane requests fail fast (5s connect timeout) while +metadata requests on port 443 remain unaffected (45s timeout). + +## Why Not SDK Fault Injection? + +SDK `CONNECTION_DELAY` delays at the HTTP/Mono layer — the TCP handshake already completed. +`CONNECT_TIMEOUT_MILLIS` fires at the TCP SYN→SYN-ACK layer, which requires blocking at the kernel. +Only `iptables DROP SYN` prevents the TCP handshake, triggering the real netty `ConnectTimeoutException`. + +## Prerequisites + +- Docker Desktop with Linux containers +- Docker memory: **8 GB+** +- A Cosmos DB account with thin client enabled +- System properties: `COSMOS.THINCLIENT_ENABLED=true`, `COSMOS.THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS=5` (default; override to 1 for fast-fail local tests) +- Credentials in `sdk/cosmos/cosmos-v4.properties` + +## Build & Run + +Same Docker setup as `NETWORK_DELAY_TESTING_README.md`, with additional system properties: + +```bash +# See NETWORK_DELAY_TESTING_README.md for full Docker run setup +# (credential env vars, volume mounts, image build). +# Additional system properties for connect timeout tests: +docker run --rm --cap-add=NET_ADMIN --memory 8g \ + ... \ + cosmos-netem-test bash -c ' + cd /workspace && \ + java -DCOSMOS.THINCLIENT_ENABLED=true \ + -DCOSMOS.THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS=1 \ + -DCOSMOS.HTTP2_ENABLED=true \ + org.testng.TestNG /workspace/azure-cosmos-tests/src/test/resources/manual-thinclient-network-delay-testng.xml \ + -verbose 2 + ' +``` + +## Network Commands Used + +### iptables DROP SYN — Block New TCP Connections + +```bash +# Block TCP handshake to thin client port +iptables -A OUTPUT -p tcp --dport 10250 --tcp-flags SYN,ACK,FIN,RST SYN -j DROP + +# Remove rule +iptables -D OUTPUT -p tcp --dport 10250 --tcp-flags SYN,ACK,FIN,RST SYN -j DROP +``` + +Drops only the initial SYN packet. Server never sees the connection. Client's TCP stack +retransmits with exponential backoff. Netty's `CONNECT_TIMEOUT_MILLIS` fires after 1s → +`ConnectTimeoutException`. Existing connections are unaffected. + +### Per-Port Delay — tc prio + iptables mangle + +```bash +# 1. Create 3-band priority qdisc +tc qdisc add dev eth0 root handle 1: prio bands 3 + +# 2. Attach delays to bands +tc qdisc add dev eth0 parent 1:1 handle 10: netem delay 5000ms # port 443 SYN +tc qdisc add dev eth0 parent 1:2 handle 20: netem delay 5000ms # port 10250 SYN +tc qdisc add dev eth0 parent 1:3 handle 30: pfifo_fast # everything else + +# 3. Mark SYN-ONLY packets by port +iptables -t mangle -A OUTPUT -p tcp --dport 443 --tcp-flags SYN,ACK,FIN,RST SYN -j MARK --set-mark 1 +iptables -t mangle -A OUTPUT -p tcp --dport 10250 --tcp-flags SYN,ACK,FIN,RST SYN -j MARK --set-mark 2 + +# 4. Route marks to bands +tc filter add dev eth0 parent 1:0 protocol ip prio 1 handle 1 fw flowid 1:1 +tc filter add dev eth0 parent 1:0 protocol ip prio 2 handle 2 fw flowid 1:2 + +# Cleanup +tc qdisc del dev eth0 root +iptables -t mangle -F OUTPUT +``` + +Port 443 gets 5s SYN delay (< 45s connect timeout → succeeds; +5s > 1s thin client timeout → proves metadata uses 45s, not 1s). +Port 10250 gets 5s SYN delay (> 1s thin client connect timeout → fails). +**Same delay, different outcomes** — the only variable is the CONNECT_TIMEOUT_MILLIS value. + +**Why SYN-only delay?** tc netem delays every packet it matches. Delaying ALL packets causes +TLS handshake timeout (sslHandshakeTimeout=10s), HTTP response timeout, and premature +connection close — all unrelated to CONNECT_TIMEOUT_MILLIS. SYN-only delay isolates the +TCP connect phase, which is exactly what CONNECT_TIMEOUT_MILLIS controls. + +**Critical tc detail:** The `prio` qdisc's default priomap sends unmarked traffic to +band 1 (the first delay band). A catch-all filter (`u32 match u32 0 0 flowid 1:3`) +is required to route non-SYN traffic to band 3 (no delay). + +## Tests + +| Test | Technique | What It Proves | +|------|-----------|---------------| +| `connectTimeout_GwV2_DataPlane_1sFiresOnDroppedSyn` | iptables DROP SYN on 10250 | Data plane fails in ~1s, not 45s | +| `connectTimeout_GwV1_Metadata_UnaffectedByGwV2Drop` | iptables DROP SYN on 10250 only | Metadata on 443 unaffected | +| `connectTimeout_GwV2_PreciseTiming` | iptables DROP SYN, 3s e2e | ≥2 connect attempts in 3s budget (proving 1s each) | +| `connectTimeout_Bifurcation_DelayBased_...` | tc prio + SYN-only mangle | Same 5s SYN delay on both ports: 443 succeeds (5s < 45s), 10250 fails (5s > 1s) | + +## Important Notes + +- Tests run **sequentially** — tc/iptables are interface-global +- `--cap-add=NET_ADMIN` required for both `tc` and `iptables` +- `@AfterClass` removes all iptables rules (`alwaysRun=true`) +- System property `COSMOS.THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS=1` sets the 1s bifurcated timeout diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectTimeoutBifurcationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectTimeoutBifurcationTests.java new file mode 100644 index 000000000000..e93b4cf56353 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectTimeoutBifurcationTests.java @@ -0,0 +1,571 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.faultinjection; + +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDiagnostics; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.implementation.Configs; +import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.PartitionKey; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.assertj.core.api.AssertionsForClassTypes; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.time.Duration; +import java.time.Instant; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.AssertJUnit.fail; + +/** + * Tests for connect timeout bifurcation using Linux iptables to DROP SYN packets. + * + * Unlike SDK-level fault injection (which operates above the TCP layer), iptables DROP + * on SYN packets prevents the TCP handshake from completing, causing the real + * CONNECT_TIMEOUT_MILLIS to fire at the netty ChannelOption level. + * + * The bifurcation under test: + * - Data plane requests → GW V2 endpoint (port 10250) → CONNECT_TIMEOUT_MILLIS = 5s (default, configurable) + * - Metadata requests → GW V1 endpoint (port 443) → CONNECT_TIMEOUT_MILLIS = 45s (unchanged) + * + * HOW TO RUN: + * 1. Group "manual-thinclient-network-delay" — NOT included in CI. + * 2. Docker container with --cap-add=NET_ADMIN, JDK 21, .m2 mounted. + * 3. Tests self-manage iptables rules (add/remove) — no manual intervention. + * 4. See CONNECT_TIMEOUT_TESTING_README.md for full setup and run instructions. + * + * DESIGN: + * - Each test: warm-up (establish connection) → close client (force new TCP) → + * iptables DROP SYN to port 10250 → time the connect failure → remove rule → verify. + * - Key assertion: failure latency should be ~1s (CONNECT_TIMEOUT_MILLIS), NOT 45s. + */ +public class Http2ConnectTimeoutBifurcationTests extends FaultInjectionTestBase { + + private CosmosAsyncClient client; + private CosmosAsyncContainer cosmosAsyncContainer; + private TestObject seedItem; + + private static final String TEST_GROUP = "manual-thinclient-network-delay"; + private static final long TEST_TIMEOUT = 180_000; + + @Factory(dataProvider = "clientBuildersWithGatewayAndHttp2") + public Http2ConnectTimeoutBifurcationTests(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + this.subscriberValidationTimeout = TIMEOUT; + } + + @BeforeClass(groups = {TEST_GROUP}, timeOut = TIMEOUT) + public void beforeClass() { + System.setProperty("COSMOS.THINCLIENT_ENABLED", "true"); + // Use the default THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS (5s) — no override. + // Tests are designed around the 5s default to match production behavior. + + this.client = getClientBuilder().buildAsyncClient(); + this.cosmosAsyncContainer = getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(this.client); + + this.seedItem = TestObject.create(); + this.cosmosAsyncContainer.createItem(this.seedItem).block(); + logger.info("Seeded test item: id={}, pk={}", seedItem.getId(), seedItem.getId()); + + // Verify connectivity is healthy + this.cosmosAsyncContainer.readItem( + seedItem.getId(), new PartitionKey(seedItem.getId()), TestObject.class).block(); + logger.info("Seed item read verified — connection is healthy."); + } + + @AfterClass(groups = {TEST_GROUP}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + // Safety: remove any leftover iptables rules + removeIptablesDropOnPort(10250); + System.clearProperty("COSMOS.THINCLIENT_ENABLED"); + safeClose(this.client); + } + + // ======================================================================== + // tc netem + iptables mangle helpers — per-port delay injection + // ======================================================================== + + /** + * Sets up per-port network delay using tc netem with iptables mangle marks. + * + * tc netem operates at the interface level (not port-specific), so we use: + * 1. tc qdisc with handles to create two delay classes + * 2. iptables -t mangle to MARK packets by destination port + * 3. tc filter to route marked packets to the appropriate delay class + * + * @param port443DelayMs delay for port 443 traffic (metadata) + * @param port10250DelayMs delay for port 10250 traffic (thin client data plane) + */ + private void addPerPortDelay(int port443DelayMs, int port10250DelayMs) { + String[] cmds = { + // Create root prio qdisc with 3 bands + "tc qdisc add dev eth0 root handle 1: prio bands 3", + // Band 1 (handle 1:1): delay for port 443 + String.format("tc qdisc add dev eth0 parent 1:1 handle 10: netem delay %dms", port443DelayMs), + // Band 2 (handle 1:2): delay for port 10250 + String.format("tc qdisc add dev eth0 parent 1:2 handle 20: netem delay %dms", port10250DelayMs), + // Band 3 (handle 1:3): no delay (default for all other traffic) + "tc qdisc add dev eth0 parent 1:3 handle 30: pfifo_fast", + // Mark port 443 packets with mark 1 + "iptables -t mangle -A OUTPUT -p tcp --dport 443 -j MARK --set-mark 1", + // Mark port 10250 packets with mark 2 + "iptables -t mangle -A OUTPUT -p tcp --dport 10250 -j MARK --set-mark 2", + // Route mark 1 → band 1 (port 443 delay) + "tc filter add dev eth0 parent 1:0 protocol ip prio 1 handle 1 fw flowid 1:1", + // Route mark 2 → band 2 (port 10250 delay) + "tc filter add dev eth0 parent 1:0 protocol ip prio 2 handle 2 fw flowid 1:2", + }; + + for (String cmd : cmds) { + logger.info(">>> Executing: {}", cmd); + executeShellCommand(cmd); + } + logger.info(">>> Per-port delay active: port 443={}ms, port 10250={}ms", port443DelayMs, port10250DelayMs); + } + + /** + * Sets up per-port SYN-ONLY delay using tc netem with iptables mangle marks. + * + * Unlike {@link #addPerPortDelay} which delays ALL packets, this only delays + * the initial TCP SYN packet (--tcp-flags SYN,ACK,FIN,RST SYN). This means: + * - TCP connect phase is delayed (SYN held in kernel → CONNECT_TIMEOUT_MILLIS fires) + * - TLS handshake, HTTP request/response, and TCP ACKs flow normally (no delay) + * + * This is the correct technique for testing CONNECT_TIMEOUT_MILLIS bifurcation: + * the connect timeout fires during SYN→SYN-ACK, so only the SYN needs delaying. + * Delaying all packets causes secondary failures (TLS handshake timeout, HTTP + * response timeout, premature connection close) that are unrelated to connect timeout. + * + * @param port443SynDelayMs SYN delay for port 443 (metadata) + * @param port10250SynDelayMs SYN delay for port 10250 (thin client data plane) + */ + private void addPerPortSynDelay(int port443SynDelayMs, int port10250SynDelayMs) { + String[] cmds = { + // Create root prio qdisc with 3 bands + "tc qdisc add dev eth0 root handle 1: prio bands 3", + // Band 1 (handle 1:1): delay for port 443 SYN + String.format("tc qdisc add dev eth0 parent 1:1 handle 10: netem delay %dms", port443SynDelayMs), + // Band 2 (handle 1:2): delay for port 10250 SYN + String.format("tc qdisc add dev eth0 parent 1:2 handle 20: netem delay %dms", port10250SynDelayMs), + // Band 3 (handle 1:3): no delay (default for all other traffic including non-SYN) + "tc qdisc add dev eth0 parent 1:3 handle 30: pfifo_fast", + // Mark ONLY SYN packets (initial TCP connect) to port 443 with mark 1 + "iptables -t mangle -A OUTPUT -p tcp --dport 443 --tcp-flags SYN,ACK,FIN,RST SYN -j MARK --set-mark 1", + // Mark ONLY SYN packets to port 10250 with mark 2 + "iptables -t mangle -A OUTPUT -p tcp --dport 10250 --tcp-flags SYN,ACK,FIN,RST SYN -j MARK --set-mark 2", + // Route mark 1 → band 1 (port 443 SYN delay) + "tc filter add dev eth0 parent 1:0 protocol ip prio 1 handle 1 fw flowid 1:1", + // Route mark 2 → band 2 (port 10250 SYN delay) + "tc filter add dev eth0 parent 1:0 protocol ip prio 2 handle 2 fw flowid 1:2", + // CRITICAL: Catch-all filter → band 3 (no delay) for ALL unmarked traffic. + // Without this, prio qdisc's default priomap sends unmarked packets to band 1 + // (the delay band), which delays TLS/HTTP/ACK traffic and causes spurious failures. + "tc filter add dev eth0 parent 1:0 protocol ip prio 99 u32 match u32 0 0 flowid 1:3", + }; + + for (String cmd : cmds) { + logger.info(">>> Executing: {}", cmd); + executeShellCommand(cmd); + } + logger.info(">>> Per-port SYN-only delay active: port 443={}ms, port 10250={}ms", + port443SynDelayMs, port10250SynDelayMs); + } + + /** + * Removes all per-port delay rules (tc qdisc + iptables mangle marks). + */ + private void removePerPortDelay() { + String[] cmds = { + "tc qdisc del dev eth0 root 2>/dev/null", + "iptables -t mangle -F OUTPUT 2>/dev/null", + }; + + for (String cmd : cmds) { + logger.info(">>> Cleanup: {}", cmd); + try { + Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd}); + p.waitFor(); + } catch (Exception e) { + logger.warn("Cleanup command failed (may be expected): {}", e.getMessage()); + } + } + logger.info(">>> Per-port delay rules removed"); + } + + /** + * Executes a shell command, failing the test on non-zero exit. + */ + private void executeShellCommand(String cmd) { + try { + Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd}); + int exit = p.waitFor(); + if (exit != 0) { + try (BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream()))) { + String errMsg = err.readLine(); + logger.warn("Command failed (exit={}): {} — {}", exit, cmd, errMsg); + } + } + } catch (Exception e) { + logger.error("Failed to execute: {}", cmd, e); + fail("Shell command failed: " + cmd + " — " + e.getMessage()); + } + } + + // ======================================================================== + // iptables helpers — DROP SYN to specific destination port + // ======================================================================== + + /** + * Adds an iptables rule to DROP all outgoing TCP SYN packets (new connections) + * to the specified destination port. This prevents the TCP handshake from completing, + * causing the client's CONNECT_TIMEOUT_MILLIS to fire. + */ + private void addIptablesDropOnPort(int port) { + String cmd = String.format( + "iptables -A OUTPUT -p tcp --dport %d --tcp-flags SYN,ACK,FIN,RST SYN -j DROP", port); + logger.info(">>> Adding iptables DROP SYN rule: {}", cmd); + try { + Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd}); + int exit = p.waitFor(); + if (exit != 0) { + try (BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream()))) { + String errMsg = err.readLine(); + logger.warn("iptables add failed (exit={}): {}", exit, errMsg); + } + } else { + logger.info(">>> iptables DROP SYN rule active on port {}", port); + } + } catch (Exception e) { + logger.error("Failed to add iptables rule", e); + fail("Could not add iptables rule: " + e.getMessage()); + } + } + + /** + * Removes the iptables DROP SYN rule for the specified port. + */ + private void removeIptablesDropOnPort(int port) { + String cmd = String.format( + "iptables -D OUTPUT -p tcp --dport %d --tcp-flags SYN,ACK,FIN,RST SYN -j DROP", port); + logger.info(">>> Removing iptables DROP SYN rule: {}", cmd); + try { + Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd}); + int exit = p.waitFor(); + if (exit == 0) { + logger.info(">>> iptables DROP SYN rule removed on port {}", port); + } else { + logger.warn("iptables remove returned exit={} (may already be removed)", exit); + } + } catch (Exception e) { + logger.warn("Failed to remove iptables rule: {}", e.getMessage()); + } + } + + // ======================================================================== + // Tests + // ======================================================================== + + /** + * Proves that the GW V2 data plane connect timeout (5s default) fires when TCP SYN is dropped. + * + * Flow: + * 1. Close + recreate client to force a fresh TCP connection on next request + * 2. Add iptables DROP SYN on port 10250 (GW V2 data plane) + * 3. Attempt a read → CONNECT_TIMEOUT_MILLIS (5s) fires, request fails + * 4. Assert failure latency is within e2e budget (30s), NOT 45s per attempt + * 5. Remove iptables rule + * 6. Verify recovery read succeeds + * + * The 30s e2e timeout allows multiple 5s connect attempts + SDK retry overhead. + */ + @Test(groups = {TEST_GROUP}, timeOut = TEST_TIMEOUT) + public void connectTimeout_GwV2_DataPlane_1sFiresOnDroppedSyn() throws Exception { + // Close and recreate client to ensure no pooled connections exist — + // we need to force a NEW TCP connection which will hit the iptables DROP. + safeClose(this.client); + this.client = getClientBuilder().buildAsyncClient(); + this.cosmosAsyncContainer = getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(this.client); + + // e2e timeout caps total wait so the test doesn't hang for 45s on retry. + // With 5s connect timeout, 30s budget allows ~5-6 connect attempts. + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(30)).enable(true).build(); + CosmosItemRequestOptions opts = new CosmosItemRequestOptions(); + opts.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + addIptablesDropOnPort(10250); + Instant start = Instant.now(); + try { + this.cosmosAsyncContainer.readItem( + seedItem.getId(), new PartitionKey(seedItem.getId()), opts, TestObject.class).block(); + fail("Should have failed — SYN to port 10250 is dropped, connect timeout should fire"); + } catch (CosmosException e) { + Duration failureLatency = Duration.between(start, Instant.now()); + logger.info("Connect timeout fired: statusCode={}, subStatusCode={}, latency={}ms", + e.getStatusCode(), e.getSubStatusCode(), failureLatency.toMillis()); + logger.info("Full diagnostics: {}", + e.getDiagnostics() != null ? e.getDiagnostics().toString() : "null"); + + // The key assertion: failure should happen within ~30s (e2e budget), + // and each individual connect attempt should be bounded to ~5s. + // If the per-request CONNECT_TIMEOUT_MILLIS is NOT applied (i.e., falls back to 45s), + // the first connect attempt alone would take 45s, exceeding the 30s e2e budget. + assertThat(failureLatency) + .as("Failure latency should be within e2e budget (30s), proving connect timeout " + + "is NOT 45s. With 5s connect timeout, the SDK can attempt multiple retries " + + "before the 30s e2e budget expires.") + .isLessThan(Duration.ofSeconds(35)); // 30s e2e + 5s buffer + + assertThat(e.getStatusCode()) + .as("Should be 408 or 503 due to connect timeout / e2e timeout") + .isIn(408, 503); + } finally { + removeIptablesDropOnPort(10250); + } + + // Recovery: after removing the DROP rule, the next read should succeed + Thread.sleep(1000); // brief settling + CosmosDiagnostics recoveryDiag = this.performDocumentOperation( + this.cosmosAsyncContainer, OperationType.Read, seedItem, false); + logger.info("Recovery read succeeded. Diagnostics: {}", recoveryDiag.toString()); + assertThat(recoveryDiag).isNotNull(); + } + + /** + * Proves that GW V1 metadata (port 443) is UNAFFECTED by the iptables drop on port 10250. + * + * Flow: + * 1. Add iptables DROP SYN on port 10250 only + * 2. The SDK's initial metadata requests (account read) go through port 443 — should succeed + * 3. Remove the DROP rule + * 4. Data plane request succeeds + * + * This proves the bifurcation: port 10250 is blocked but port 443 is untouched. + */ + @Test(groups = {TEST_GROUP}, timeOut = TEST_TIMEOUT) + public void connectTimeout_GwV1_Metadata_UnaffectedByGwV2Drop() throws Exception { + addIptablesDropOnPort(10250); + try { + // Create a new client — its initialization contacts port 443 for account metadata. + // This should succeed because only port 10250 is blocked. + CosmosAsyncClient metadataClient = null; + try { + metadataClient = getClientBuilder().buildAsyncClient(); + logger.info("Client created successfully while port 10250 is blocked — " + + "metadata on port 443 is unaffected."); + + // The client should be able to resolve the account and database — + // all metadata requests go through GW V1 (port 443). + // However, any data plane request (port 10250) WILL fail. + CosmosAsyncContainer container = + getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(metadataClient); + logger.info("Container reference obtained via metadata (port 443) — success."); + + } finally { + safeClose(metadataClient); + } + } finally { + removeIptablesDropOnPort(10250); + } + + // After removing the DROP rule, verify full data plane works + CosmosDiagnostics recoveryDiag = this.performDocumentOperation( + this.cosmosAsyncContainer, OperationType.Read, seedItem, false); + logger.info("Recovery read after iptables removal. Diagnostics: {}", recoveryDiag.toString()); + assertThat(recoveryDiag).isNotNull(); + } + + /** + * Measures the precise connect timeout boundary. + * + * With THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS=5 (default), a single TCP connect attempt + * to a blackholed port 10250 should fail in ~5s. We measure multiple individual + * attempts by using an e2e timeout of 12s (enough for 2 connect attempts at 5s each). + * + * If CONNECT_TIMEOUT_MILLIS were 45s (the gateway default), a single connect attempt would + * consume the full 12s e2e budget — and the diagnostics would show 0 completed retries. + * With 5s CONNECT_TIMEOUT_MILLIS, we expect at least 2 retries within the 12s budget. + */ + @Test(groups = {TEST_GROUP}, timeOut = TEST_TIMEOUT) + public void connectTimeout_GwV2_PreciseTiming() throws Exception { + safeClose(this.client); + this.client = getClientBuilder().buildAsyncClient(); + this.cosmosAsyncContainer = getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(this.client); + + // e2e: 12s. With 5s connect timeout, expect 2 connect attempts. + // With 45s connect timeout, only 1 attempt (which wouldn't even complete). + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(12)).enable(true).build(); + CosmosItemRequestOptions opts = new CosmosItemRequestOptions(); + opts.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + addIptablesDropOnPort(10250); + Instant start = Instant.now(); + CosmosDiagnostics failedDiagnostics = null; + try { + this.cosmosAsyncContainer.readItem( + seedItem.getId(), new PartitionKey(seedItem.getId()), opts, TestObject.class).block(); + fail("Should have failed — SYN to port 10250 is dropped"); + } catch (CosmosException e) { + Duration failureLatency = Duration.between(start, Instant.now()); + failedDiagnostics = e.getDiagnostics(); + logger.info("Precise timing: latency={}ms, statusCode={}, subStatusCode={}", + failureLatency.toMillis(), e.getStatusCode(), e.getSubStatusCode()); + logger.info("Full diagnostics: {}", + failedDiagnostics != null ? failedDiagnostics.toString() : "null"); + + // Should complete within ~12-15s (12s e2e + buffer) + assertThat(failureLatency) + .as("Should complete within e2e budget + small buffer") + .isLessThan(Duration.ofSeconds(16)); + + // Parse diagnostics to count gatewayStatisticsList entries + // Each entry = one network attempt. With 5s connect timeout + 12s e2e, + // expect at least 2 entries (2 connect attempts that timed out at 5s each). + if (failedDiagnostics != null) { + ObjectNode diagNode = (ObjectNode) Utils.getSimpleObjectMapper() + .readTree(failedDiagnostics.toString()); + JsonNode gwStats = diagNode.get("gatewayStatisticsList"); + if (gwStats != null && gwStats.isArray()) { + logger.info("gatewayStatisticsList entries: {} (with 5s connect timeout, " + + "expect >= 2 in 12s budget)", gwStats.size()); + assertThat(gwStats.size()) + .as("With 5s CONNECT_TIMEOUT_MILLIS and 12s e2e budget, should have " + + ">= 2 gateway stats entries (each = one connect attempt). " + + "If only 1 entry, the connect timeout may still be 45s.") + .isGreaterThanOrEqualTo(2); + } + } + } finally { + removeIptablesDropOnPort(10250); + } + + // Recovery + Thread.sleep(1000); + CosmosDiagnostics recoveryDiag = this.performDocumentOperation( + this.cosmosAsyncContainer, OperationType.Read, seedItem, false); + logger.info("Recovery read succeeded. Diagnostics: {}", recoveryDiag.toString()); + } + + /** + * Proves connect timeout bifurcation using per-port SYN-only delay injection. + * + * This is the PUREST bifurcation test — same network condition on both ports, + * different outcomes because of different CONNECT_TIMEOUT_MILLIS values: + * + * - Port 443 (metadata): 5s SYN delay → 5s < 45s CONNECT_TIMEOUT → connect SUCCEEDS + * - Port 10250 (data plane): 5s SYN delay → 5s > CONNECT_TIMEOUT (default 5s) → connect FAILS + * + * SAME delay + different outcomes = the ONLY variable is the timeout configuration. + * This eliminates "different delays cause different outcomes" as an alternative explanation. + * + * Technique: tc netem delays only SYN packets (--tcp-flags SYN,ACK,FIN,RST SYN), + * not all traffic. This isolates the TCP connect phase — TLS handshake, HTTP request/ + * response, and TCP ACKs flow at normal speed. Unlike delaying all packets (which causes + * TLS handshake timeout, HTTP response timeout, premature connection close), SYN-only + * delay ONLY affects CONNECT_TIMEOUT_MILLIS, which is exactly what we're testing. + * + * Flow: + * 1. Apply 5s SYN-only delay on BOTH ports + * 2. Create a new client → metadata on port 443 succeeds (5s < 45s timeout) + * 3. Attempt a document read → data plane on port 10250 fails (5s ≥ 5s timeout) + * 4. Remove delays, verify full recovery + */ + @Test(groups = {TEST_GROUP}, timeOut = TEST_TIMEOUT) + public void connectTimeout_Bifurcation_DelayBased_MetadataSucceeds_DataPlaneFails() throws Exception { + // Close existing client to force new TCP connections on next use + safeClose(this.client); + + // Apply SYN-only delay: 7s on BOTH ports. + // Only the initial TCP SYN packet is delayed — all other traffic flows normally. + // Port 443: CONNECT_TIMEOUT = 45s → 7s delay < 45s → connect SUCCEEDS + // Port 10250: CONNECT_TIMEOUT = 5s → 7s delay > 5s → connect FAILS (ConnectTimeoutException at 5s) + addPerPortSynDelay(7000, 7000); + + try { + // Step 1: Create a new client — metadata requests go to port 443. + // The 7s SYN delay means the TCP handshake takes ~7s. + // Since the gateway CONNECT_TIMEOUT is 45s, the connect SUCCEEDS. + // If the thin client timeout (5s) were applied, the connect would FAIL at 5s + // (before the SYN-ACK arrives at 7s). This is the decisive proof. + Instant metadataStart = Instant.now(); + this.client = getClientBuilder().buildAsyncClient(); + this.cosmosAsyncContainer = getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(this.client); + Duration metadataLatency = Duration.between(metadataStart, Instant.now()); + + logger.info("Client + container setup succeeded with 7s SYN delay on port 443. " + + "Metadata latency: {}ms (includes 7s SYN delay + TLS + HTTP at normal speed)", + metadataLatency.toMillis()); + + // Metadata latency should be >= 6s (7s SYN delay - jitter) but < 30s + // (7s for SYN + normal speed TLS/HTTP should complete quickly) + assertThat(metadataLatency) + .as("Metadata should succeed despite 7s SYN delay (7s < 45s gateway CONNECT_TIMEOUT). " + + "If thin client timeout (5s) were applied, connect would fail at 5s.") + .isGreaterThanOrEqualTo(Duration.ofSeconds(6)) + .isLessThan(Duration.ofSeconds(30)); + + // Step 2: Attempt a document read — this goes to port 10250. + // The SAME 7s SYN delay is applied, but CONNECT_TIMEOUT_MILLIS is 5s (default). + // The connect timeout fires at 5s (before the delayed SYN-ACK arrives at 7s). + // This is the bifurcation: same delay, port 443 succeeded, port 10250 fails. + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(30)) + .enable(true).build(); + CosmosItemRequestOptions opts = new CosmosItemRequestOptions(); + opts.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + Instant dataPlaneStart = Instant.now(); + try { + this.cosmosAsyncContainer.readItem( + seedItem.getId(), new PartitionKey(seedItem.getId()), opts, TestObject.class).block(); + fail("Data plane request should have failed — 7s SYN delay exceeds 5s connect timeout"); + } catch (CosmosException e) { + Duration dataPlaneLatency = Duration.between(dataPlaneStart, Instant.now()); + logger.info("Data plane failed as expected: statusCode={}, latency={}ms", + e.getStatusCode(), dataPlaneLatency.toMillis()); + logger.info("Full diagnostics: {}", + e.getDiagnostics() != null ? e.getDiagnostics().toString() : "null"); + + // Data plane should fail within the e2e budget + assertThat(dataPlaneLatency) + .as("Data plane should fail within e2e budget (30s). " + + "Each connect attempt times out at 5s (not 45s), proving 5s CONNECT_TIMEOUT.") + .isLessThan(Duration.ofSeconds(35)); + + assertThat(e.getStatusCode()) + .as("Should be 408 or 503 due to connect timeout") + .isIn(408, 503); + } + + } finally { + removePerPortDelay(); + } + + // Recovery: after removing delays, everything should work + Thread.sleep(2000); + safeClose(this.client); + this.client = getClientBuilder().buildAsyncClient(); + this.cosmosAsyncContainer = getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(this.client); + CosmosDiagnostics recoveryDiag = this.performDocumentOperation( + this.cosmosAsyncContainer, OperationType.Read, seedItem, false); + logger.info("Recovery read succeeded. Diagnostics: {}", recoveryDiag.toString()); + assertThat(recoveryDiag).isNotNull(); + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientConfigDiagnosticsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientConfigDiagnosticsTest.java index dd5f3c479003..7fe8d7363b5e 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientConfigDiagnosticsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientConfigDiagnosticsTest.java @@ -204,7 +204,7 @@ public void rntbd() throws Exception { assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("(cto:PT5S, nrto:PT5S, icto:PT0S, ieto:PT1H, mcpe:130, mrpc:30, cer:true)"); String http2Enabled = Configs.isHttp2Enabled() ? "true" : "false"; - assertThat(objectNode.get("connCfg").get("gw").asText()).isEqualTo("(cps:1000, nrto:PT1M, icto:PT1M, cto:PT45S, p:false, http2:(enabled:"+ http2Enabled + ", maxc:1000, minc:" + Math.max(8, Runtime.getRuntime().availableProcessors()) + ", maxs:30))"); + assertThat(objectNode.get("connCfg").get("gw").asText()).isEqualTo("(cps:1000, nrto:PT1M, icto:PT1M, cto:PT45S, gwV2Cto:n/a, p:false, http2:(enabled:"+ http2Enabled + ", maxc:1000, minc:" + Math.max(8, Runtime.getRuntime().availableProcessors()) + ", maxs:30))"); assertThat(objectNode.get("connCfg").get("other").asText()).isEqualTo("(ed: false, cs: false, rv: true)"); } @@ -241,7 +241,7 @@ public void gw() throws Exception { assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("null"); String http2Enabled = Configs.isHttp2Enabled() ? "true" : "false"; - assertThat(objectNode.get("connCfg").get("gw").asText()).isEqualTo("(cps:500, nrto:PT18S, icto:PT17S, cto:PT45S, p:false, http2:(enabled:" + http2Enabled + ", maxc:1000, minc:" + Math.max(8, Runtime.getRuntime().availableProcessors()) + ", maxs:30))"); + assertThat(objectNode.get("connCfg").get("gw").asText()).isEqualTo("(cps:500, nrto:PT18S, icto:PT17S, cto:PT45S, gwV2Cto:n/a, p:false, http2:(enabled:" + http2Enabled + ", maxc:1000, minc:" + Math.max(8, Runtime.getRuntime().availableProcessors()) + ", maxs:30))"); assertThat(objectNode.get("connCfg").get("other").asText()).isEqualTo("(ed: false, cs: false, rv: true)"); } @@ -315,7 +315,7 @@ public void full( assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("null"); String http2Enabled = Configs.isHttp2Enabled() ? "true" : "false"; - assertThat(objectNode.get("connCfg").get("gw").asText()).isEqualTo("(cps:500, nrto:PT18S, icto:PT17S, cto:PT45S, p:false, http2:(enabled:" + http2Enabled + ", maxc:1000, minc:" + Math.max(8, Runtime.getRuntime().availableProcessors()) + ", maxs:30))"); + assertThat(objectNode.get("connCfg").get("gw").asText()).isEqualTo("(cps:500, nrto:PT18S, icto:PT17S, cto:PT45S, gwV2Cto:n/a, p:false, http2:(enabled:" + http2Enabled + ", maxc:1000, minc:" + Math.max(8, Runtime.getRuntime().availableProcessors()) + ", maxs:30))"); assertThat(objectNode.get("connCfg").get("other").asText()).isEqualTo("(ed: true, cs: true, rv: false)"); assertThat(objectNode.get("excrgns").asText()).isEqualTo("[westus2]"); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ConfigsTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ConfigsTests.java index 89885ae320f6..9eb43cea33bb 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ConfigsTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ConfigsTests.java @@ -180,6 +180,72 @@ public void thinClientEnabledTest() { } } + @Test(groups = { "unit" }) + public void thinClientConnectionTimeoutDefaultTest() { + // Default thin client connection timeout should be 5 seconds + System.clearProperty("COSMOS.THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS"); + try { + assertThat(Configs.getThinClientConnectionTimeoutInSeconds()).isEqualTo(5); + } finally { + System.clearProperty("COSMOS.THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS"); + } + } + + @Test(groups = { "unit" }) + public void thinClientConnectionTimeoutOverrideTest() { + System.clearProperty("COSMOS.THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS"); + System.setProperty("COSMOS.THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS", "3"); + try { + assertThat(Configs.getThinClientConnectionTimeoutInSeconds()).isEqualTo(3); + } finally { + System.clearProperty("COSMOS.THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS"); + } + } + + @Test(groups = { "unit" }) + public void thinClientConnectionTimeoutRejectsZeroAndNegative() { + // Zero should fall back to default (5s) + System.setProperty("COSMOS.THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS", "0"); + try { + assertThat(Configs.getThinClientConnectionTimeoutInSeconds()).isEqualTo(5); + } finally { + System.clearProperty("COSMOS.THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS"); + } + + // Negative should fall back to default (5s) + System.setProperty("COSMOS.THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS", "-1"); + try { + assertThat(Configs.getThinClientConnectionTimeoutInSeconds()).isEqualTo(5); + } finally { + System.clearProperty("COSMOS.THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS"); + } + } + + @Test(groups = { "unit" }) + public void httpRequestThinClientFlagDefaultFalse() throws Exception { + // HttpRequest should default to isThinClientRequest=false + com.azure.cosmos.implementation.http.HttpRequest httpRequest = + new com.azure.cosmos.implementation.http.HttpRequest( + io.netty.handler.codec.http.HttpMethod.GET, + new java.net.URI("https://test.documents.azure.com:443/"), + 443, + new com.azure.cosmos.implementation.http.HttpHeaders()); + assertThat(httpRequest.isThinClientRequest()).isFalse(); + } + + @Test(groups = { "unit" }) + public void httpRequestThinClientFlagSetTrue() throws Exception { + // ThinClientStoreModel sets isThinClientRequest=true via withThinClientRequest() + com.azure.cosmos.implementation.http.HttpRequest httpRequest = + new com.azure.cosmos.implementation.http.HttpRequest( + io.netty.handler.codec.http.HttpMethod.POST, + new java.net.URI("https://test.documents.azure.com:10250/"), + 10250, + new com.azure.cosmos.implementation.http.HttpHeaders()) + .withThinClientRequest(true); + assertThat(httpRequest.isThinClientRequest()).isTrue(); + } + @Test(groups = { "emulator" }) public void thinClientEndpointTest() { Configs config = new Configs(); diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 1d3cf27ecc66..dd19b05f9ed9 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -12,6 +12,7 @@ #### Other Changes * Added aggressive HTTP timeout policies for document operations routed to Gateway V2. - [PR 47879](https://github.com/Azure/azure-sdk-for-java/pull/47879) +* Added a default connect timeout of 5s for Gateway V2 (thin client) data-plane endpoints. - See [PR 48174](https://github.com/Azure/azure-sdk-for-java/pull/48174) ### 4.78.0 (2026-02-10) @@ -23,7 +24,7 @@ * Fixed an issue where operation failed with `400` when configured with pre-trigger or post-trigger with non-ascii character. Only impact for gateway mode. See [PR 47881](https://github.com/Azure/azure-sdk-for-java/pull/47881) #### Other Changes -* Added `x-ms-hub-region-processing-only` header to allow hub-region stickiness when 404 `READ SESSION NOT AVAIALBLE` is hit for Single-Writer accounts. - [PR 47631](https://github.com/Azure/azure-sdk-for-java/pull/47631) +* Added `x-ms-hub-region-processing-only` header to allow hub-region stickiness when 404 `READ SESSION NOT AVAILABLE` is hit for Single-Writer accounts. - [PR 47631](https://github.com/Azure/azure-sdk-for-java/pull/47631) ### 4.77.0 (2026-01-26) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java index e043066fd910..d8380bd94fff 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java @@ -58,6 +58,14 @@ public class Configs { private static final String NETTY_HTTP_CLIENT_METRICS_ENABLED = "COSMOS.NETTY_HTTP_CLIENT_METRICS_ENABLED"; private static final String NETTY_HTTP_CLIENT_METRICS_ENABLED_VARIABLE = "COSMOS_NETTY_HTTP_CLIENT_METRICS_ENABLED"; + // Thin client connect/acquire timeout — controls CONNECT_TIMEOUT_MILLIS for Gateway V2 data plane endpoints. + // Data plane requests are routed to the thin client regional endpoint (from RegionalRoutingContext) + // which uses a non-443 port. These get a shorter 5s connect/acquire timeout. + // Metadata requests target Gateway V1 endpoint (port 443) and retain the full 45s/60s timeout (unchanged). + private static final int DEFAULT_THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS = 5; + private static final String THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS = "COSMOS.THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS"; + private static final String THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS_VARIABLE = "COSMOS_THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS"; + private static final String MAX_HTTP_BODY_LENGTH_IN_BYTES = "COSMOS.MAX_HTTP_BODY_LENGTH_IN_BYTES"; private static final String MAX_HTTP_INITIAL_LINE_LENGTH_IN_BYTES = "COSMOS.MAX_HTTP_INITIAL_LINE_LENGTH_IN_BYTES"; private static final String MAX_HTTP_CHUNK_SIZE_IN_BYTES = "COSMOS.MAX_HTTP_CHUNK_SIZE_IN_BYTES"; @@ -574,6 +582,59 @@ public static Duration getConnectionAcquireTimeout() { return CONNECTION_ACQUIRE_TIMEOUT; } + /** + * Returns the TCP connect timeout for thin client data plane endpoints. + * Data plane requests routed via thinclientRegionalEndpoint (from RegionalRoutingContext) + * use this aggressive timeout to fail fast when the proxy is unreachable. + * Metadata requests on port 443 are unaffected and retain the full 45s timeout. + * + * Configurable via system property COSMOS.THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS + * or environment variable COSMOS_THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS. + * Default: 5 seconds. + */ + public static int getThinClientConnectionTimeoutInSeconds() { + int value = DEFAULT_THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS; + + String valueFromSystemProperty = System.getProperty(THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS); + if (valueFromSystemProperty != null && !valueFromSystemProperty.isEmpty()) { + try { + value = Integer.parseInt(valueFromSystemProperty); + } catch (NumberFormatException e) { + logger.warn( + "Invalid non-numeric value '{}' for system property {}. Falling back to environment variable or default.", + valueFromSystemProperty, + THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS); + valueFromSystemProperty = null; + } + } + + if (valueFromSystemProperty == null || valueFromSystemProperty.isEmpty()) { + String valueFromEnvVariable = System.getenv(THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS_VARIABLE); + if (valueFromEnvVariable != null && !valueFromEnvVariable.isEmpty()) { + try { + value = Integer.parseInt(valueFromEnvVariable); + } catch (NumberFormatException e) { + logger.warn( + "Invalid non-numeric value '{}' for environment variable {}. Falling back to default: {}s.", + valueFromEnvVariable, + THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS_VARIABLE, + DEFAULT_THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS); + } + } + } + + // Guard against invalid values — timeout must be at least 1 second + if (value <= 0) { + logger.warn( + "Invalid thin client connection timeout: {}s. Must be > 0. Falling back to default: {}s.", + value, + DEFAULT_THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS); + return DEFAULT_THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS; + } + + return value; + } + public static int getHttpResponseTimeoutInSeconds() { return getJVMConfigAsInt(HTTP_RESPONSE_TIMEOUT_IN_SECONDS, DEFAULT_HTTP_RESPONSE_TIMEOUT_IN_SECONDS); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java index 691c3abfeb06..42172026ad5b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java @@ -1097,7 +1097,7 @@ private static boolean isStoredProcedureMasterOperation(ResourceType resourceTyp return resourceType == ResourceType.StoredProcedure && operationType != OperationType.ExecuteJavaScript; } - private static void safeSilentRelease(Object msg) { + static void safeSilentRelease(Object msg) { try { ReferenceCountUtil.release(msg); } catch (Throwable t) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java index 348eb0e572f8..92d1c197525e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java @@ -18,7 +18,6 @@ import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.HttpMethod; -import io.netty.util.ReferenceCountUtil; import io.netty.util.ResourceLeakDetector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,7 +114,9 @@ public StoreResponse unwrapToStoreResponse( } if (content.readableBytes() == 0) { - safeSilentRelease(content); + if (content.refCnt() > 0) { + safeSilentRelease(content); + } return super.unwrapToStoreResponse(endpoint, request, statusCode, headers, Unpooled.EMPTY_BUFFER); } @@ -234,9 +235,12 @@ public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI reque requestUri, requestUri.getPort(), headers, - Flux.just(contentAsByteArray)); + Flux.just(contentAsByteArray)) + .withThinClientRequest(true); } finally { - safeSilentRelease(byteBuf); + if (byteBuf.refCnt() > 0) { + safeSilentRelease(byteBuf); + } } } @@ -245,15 +249,6 @@ public Map getDefaultHeaders() { return this.defaultHeaders; } - private static void safeSilentRelease(Object msg) { - try { - ReferenceCountUtil.release(msg); - } catch (Throwable t) { - // ReferenceCountUtil.safeRelease would always log a WARN on double-release. - // In this class we only need this for a rare race condition — swallow silently. - } - } - private HttpHeaders getHttpHeaders() { HttpHeaders httpHeaders = new HttpHeaders(); // todo: select only required headers from defaults diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClientConfig.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClientConfig.java index 2f377bab03af..193abf876afc 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClientConfig.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClientConfig.java @@ -34,9 +34,13 @@ public class HttpClientConfig { private boolean serverCertValidationDisabled = false; private Http2ConnectionConfig http2ConnectionConfig; + // Eagerly resolved thin client connect timeout — avoids per-request System.getProperty/getenv calls. + private final int thinClientConnectTimeoutMs; + public HttpClientConfig(Configs configs) { this.configs = configs; this.http2ConnectionConfig = new Http2ConnectionConfig(); + this.thinClientConnectTimeoutMs = Configs.getThinClientConnectionTimeoutInSeconds() * 1000; } public HttpClientConfig withMaxHeaderSize(int maxHeaderSize) { @@ -173,12 +177,27 @@ public Http2ConnectionConfig getHttp2ConnectionConfig() { return this.http2ConnectionConfig; } + /** + * Returns the eagerly resolved thin client connect timeout in milliseconds. + * This avoids per-request System.getProperty/getenv overhead. + * + * @return connect timeout in milliseconds for thin client data-plane requests + */ + public int getThinClientConnectTimeoutMs() { + return this.thinClientConnectTimeoutMs; + } + public String toDiagnosticsString() { - return String.format("(cps:%s, nrto:%s, icto:%s, cto:%s, p:%s, http2:%s)", + String gwV2Cto = Configs.isThinClientEnabled() + ? Duration.ofMillis(this.thinClientConnectTimeoutMs).toString() + : "n/a"; + + return String.format("(cps:%s, nrto:%s, icto:%s, cto:%s, gwV2Cto:%s, p:%s, http2:%s)", maxPoolSize, networkRequestTimeout, maxIdleConnectionTimeout, connectionAcquireTimeout, + gwV2Cto, proxy != null, http2ConnectionConfig == null ? null : httpCfgAccessor.toDiagnosticsString(http2ConnectionConfig)); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpRequest.java index f633eacff36c..2e0f214d1e91 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpRequest.java @@ -20,6 +20,7 @@ public class HttpRequest { private HttpHeaders headers; private Flux body; private ReactorNettyRequestRecord reactorNettyRequestRecord; + private boolean isThinClientRequest; /** * Create a new HttpRequest instance. @@ -215,6 +216,28 @@ public HttpRequest withReactorNettyRequestRecord(ReactorNettyRequestRecord react return this; } + /** + * Gets whether this request targets the thin client proxy. + * Set by {@link com.azure.cosmos.implementation.ThinClientStoreModel} during request construction. + * + * @return true if this is a thin client request, false for standard gateway requests + */ + public boolean isThinClientRequest() { + return this.isThinClientRequest; + } + + /** + * Marks this request as targeting the thin client proxy. + * This is used to apply thin-client-specific transport settings (e.g., connect timeout). + * + * @param isThinClientRequest true if this request targets the thin client proxy + * @return this HttpRequest + */ + public HttpRequest withThinClientRequest(boolean isThinClientRequest) { + this.isThinClientRequest = isThinClientRequest; + return this; + } + /** * Gets ReactorNettyRequestRecord for recording request timeline * diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTimeoutPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTimeoutPolicy.java index 64aa3c025cd7..13a9b65157eb 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTimeoutPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTimeoutPolicy.java @@ -20,8 +20,9 @@ public static final HttpTimeoutPolicy getTimeoutPolicy(RxDocumentServiceRequest if (OperationType.Read.equals(request.getOperationType()) && request.getResourceType() == ResourceType.DatabaseAccount) { return HttpTimeoutPolicyControlPlaneRead.INSTANCE; } - // Use Gateway V2 timeout policies when Thin Client mode is enabled - if (request.useThinClientMode && request.getResourceType() == ResourceType.Document) { + // Use Gateway V2 timeout policies when Thin Client mode is enabled. + // All Document operations route through GwV2 policy — no silent fallback to default. + if (request.useThinClientMode) { OperationType operationType = request.getOperationType(); // Point read operations if (OperationType.Read.equals(operationType)) { @@ -31,6 +32,8 @@ public static final HttpTimeoutPolicy getTimeoutPolicy(RxDocumentServiceRequest if (OperationType.Query.equals(operationType) || request.isChangeFeedRequest()) { return HttpTimeoutPolicyForGatewayV2.INSTANCE_FOR_QUERY_AND_CHANGE_FEED; } + // All other thin client Document operations (Create, Replace, Delete, Patch, Batch, etc.) + return HttpTimeoutPolicyForGatewayV2.DEFAULT; } return HttpTimeoutPolicyDefault.INSTANCE; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTimeoutPolicyForGatewayV2.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTimeoutPolicyForGatewayV2.java index 18e350e329c6..976562ce9e8d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTimeoutPolicyForGatewayV2.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTimeoutPolicyForGatewayV2.java @@ -8,13 +8,18 @@ import java.util.List; /** - * Timeout policy for Gateway V2 (Thin Client) requests. - * This policy has separate configurations for point read operations vs query/change feed operations. + * Timeout policy for Gateway V2 (Thin Client) data-plane requests. + * Applies to all thin client Document operations: point reads, queries, change feed, + * creates, replaces, deletes, patches, and batch operations. + * + * Currently uses a single timeout configuration. Point-read vs query/change-feed + * instances are kept separate so they can diverge in the future without breaking changes. */ public class HttpTimeoutPolicyForGatewayV2 extends HttpTimeoutPolicy { public static final HttpTimeoutPolicy INSTANCE_FOR_POINT_READ = new HttpTimeoutPolicyForGatewayV2(true); public static final HttpTimeoutPolicy INSTANCE_FOR_QUERY_AND_CHANGE_FEED = new HttpTimeoutPolicyForGatewayV2(false); + public static final HttpTimeoutPolicy DEFAULT = new HttpTimeoutPolicyForGatewayV2(false); private final boolean isPointRead; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java index 6206a4d64db9..3e7f763caeb8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java @@ -186,9 +186,20 @@ public Mono send(final HttpRequest request, Duration responseTimeo final AtomicReference responseReference = new AtomicReference<>(); + // Per-request CONNECT_TIMEOUT_MILLIS via reactor-netty's immutable HttpClient. + // .option() returns a new config snapshot — does NOT mutate the shared httpClient. + // Thin client requests (isThinClientRequest=true): connect timeout is configured via + // HttpClientConfig.getThinClientConnectTimeoutMs() (default 5s) to fail fast. + // Standard gateway requests: 45s (default). + // Note: CONNECT_TIMEOUT_MILLIS controls TCP SYN→SYN-ACK timeout for NEW connections. + // For H2, once a TCP connection exists, stream acquisition is near-instant (~sub-ms) + // so pendingAcquireTimeout (pool-level 45s) is effectively never hit. + int connectTimeoutMs = this.resolveConnectTimeoutMs(request); + return this.httpClient .keepAlive(this.httpClientConfig.isConnectionKeepAlive()) .port(request.port()) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs) .responseTimeout(responseTimeout) .request(HttpMethod.valueOf(request.httpMethod().toString())) .uri(request.uri().toASCIIString()) @@ -242,6 +253,27 @@ public void shutdown() { } } + /** + * Resolves the TCP connect timeout (CONNECT_TIMEOUT_MILLIS) based on the request type. + * + * Thin client requests (identified by {@link HttpRequest#isThinClientRequest()}) use the thin + * client connection timeout configured via {@link Configs#getThinClientConnectionTimeoutInSeconds()} + * (default 5s) to fail fast when the thin client proxy is unreachable. + * Standard gateway requests use the configured connection acquire timeout (default 45s). + * + * The thin client timeout is eagerly cached in {@link HttpClientConfig} at construction time + * to avoid per-request System.getProperty/getenv overhead. + * + * @param request the HTTP request + * @return the connect timeout in milliseconds + */ + private int resolveConnectTimeoutMs(HttpRequest request) { + if (request.isThinClientRequest()) { + return this.httpClientConfig.getThinClientConnectTimeoutMs(); + } + return (int) this.httpClientConfig.getConnectionAcquireTimeout().toMillis(); + } + private static ConnectionObserver getConnectionObserver() { return (conn, state) -> { Instant time = Instant.now();