diff --git a/.licenserc.yaml b/.licenserc.yaml index fa146036fdc529..8bdc7d8113bc4c 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -44,6 +44,7 @@ header: - "**/*.parquet" - "docs/.markdownlintignore" - "fe/fe-core/src/test/resources/data/net_snmp_normal" + - "fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java" - "fe/fe-core/src/main/antlr4/org/apache/doris/nereids/JavaLexer.g4" - "fe/fe-core/src/main/antlr4/org/apache/doris/nereids/JavaParser.g4" - "be/dict/ik/*" diff --git a/LICENSE.txt b/LICENSE.txt index ce0e98bc85111c..1d1ecffc1de8d5 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -202,6 +202,13 @@ -------------------------------------------------------------------------------- +The following components are provided under the Apache License. See project link for details. +The text of each license is the standard Apache 2.0 license. + +software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder from AWS SDK v2 (sdk-core 2.29.52) + +-------------------------------------------------------------------------------- + be/src/common/status.* : BSD-style license Copyright (c) 2011 The LevelDB Authors. All rights reserved. diff --git a/NOTICE.txt b/NOTICE.txt index 5f1ae973c4f23a..62fc72efbadca3 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -52,3 +52,33 @@ its NOTICE file: This product includes cryptographic software written by Eric Young (eay@cryptsoft.com). This product includes software written by Tim Hudson (tjh@cryptsoft.com). + +-------------------------------------------------------------------------------- +This product includes code from AWS SDK, which includes the following in +its NOTICE file: + +AWS SDK for Java 2.0 +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +This product includes software developed by +Amazon Technologies, Inc (http://www.amazon.com/). + +********************** +THIRD PARTY COMPONENTS +********************** +This software includes third party software subject to the following copyrights: +- XML parsing and utility functions from JetS3t - Copyright 2006-2009 James Murty. +- PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc. +- Apache Commons Lang - https://github.com/apache/commons-lang +- Netty Reactive Streams - https://github.com/playframework/netty-reactive-streams +- Jackson-core - https://github.com/FasterXML/jackson-core +- Jackson-dataformat-cbor - https://github.com/FasterXML/jackson-dataformats-binary + +The licenses for these third party components are included in LICENSE.txt + +- For Apache Commons Lang see also this required NOTICE: + Apache Commons Lang + Copyright 2001-2020 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (https://www.apache.org/). diff --git a/fe/check/checkstyle/suppressions.xml b/fe/check/checkstyle/suppressions.xml index 7340c4c5bd5fe9..5ac4e39f408953 100644 --- a/fe/check/checkstyle/suppressions.xml +++ b/fe/check/checkstyle/suppressions.xml @@ -74,4 +74,6 @@ under the License. + + diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 029420e5f19019..58e0562daf37ab 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3771,6 +3771,13 @@ public static int metaServiceRpcRetryTimes() { @ConfField(mutable = true) public static String aws_credentials_provider_version = "v2"; + @ConfField(description = { + "AWS SDK 用于调度异步重试、超时任务以及其他后台操作的线程池大小,全局共享", + "The thread pool size used by the AWS SDK to schedule asynchronous retries, timeout tasks, " + + "and other background operations, shared globally" + }) + public static int aws_sdk_async_scheduler_thread_pool_size = 20; + @ConfField(description = { "agent tasks 健康检查的时间间隔,默认五分钟,小于等于 0 时不做健康检查", "agent tasks health check interval, default is five minutes, no health check when less than or equal to 0" diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/UncloseableScheduledExecutorService.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/UncloseableScheduledExecutorService.java new file mode 100644 index 00000000000000..7cb50b98d239f7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/UncloseableScheduledExecutorService.java @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public final class UncloseableScheduledExecutorService + implements ScheduledExecutorService { + + private final ScheduledExecutorService delegate; + + public UncloseableScheduledExecutorService( + ScheduledExecutorService delegate) { + this.delegate = Objects.requireNonNull(delegate); + } + + // ================= Lifecycle methods (NO-OP) ================= + @Override + public void shutdown() { + // NO-OP + } + + @Override + public List shutdownNow() { + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) { + return false; + } + + // ================= Scheduled methods ================= + @Override + public ScheduledFuture schedule( + Runnable command, long delay, TimeUnit unit) { + return delegate.schedule(command, delay, unit); + } + + @Override + public ScheduledFuture schedule( + Callable callable, long delay, TimeUnit unit) { + return delegate.schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate( + Runnable command, + long initialDelay, + long period, + TimeUnit unit) { + return delegate.scheduleAtFixedRate( + command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay( + Runnable command, + long initialDelay, + long delay, + TimeUnit unit) { + return delegate.scheduleWithFixedDelay( + command, initialDelay, delay, unit); + } + + // ================= Executor methods ================= + @Override + public void execute(Runnable command) { + delegate.execute(command); + } + + @Override + public Future submit(Callable task) { + return delegate.submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return delegate.submit(task, result); + } + + @Override + public Future submit(Runnable task) { + return delegate.submit(task); + } + + @Override + public List> invokeAll( + Collection> tasks) + throws InterruptedException { + return delegate.invokeAll(tasks); + } + + @Override + public List> invokeAll( + Collection> tasks, + long timeout, + TimeUnit unit) + throws InterruptedException { + return delegate.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny( + Collection> tasks) + throws InterruptedException, ExecutionException { + return delegate.invokeAny(tasks); + } + + @Override + public T invokeAny( + Collection> tasks, + long timeout, + TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate.invokeAny(tasks, timeout, unit); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 8d08e3e8eae934..13fd5ad69f3921 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -34,6 +34,8 @@ import org.apache.commons.lang3.math.NumberUtils; import org.apache.iceberg.catalog.Catalog; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Map; @@ -41,6 +43,7 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { + private static final Logger LOG = LogManager.getLogger(IcebergExternalCatalog.class); public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type"; public static final String ICEBERG_REST = "rest"; public static final String ICEBERG_HMS = "hms"; @@ -209,7 +212,14 @@ public List listTableNames(SessionContext ctx, String dbName) { public void onClose() { super.onClose(); if (null != catalog) { - catalog = null; + try { + if (catalog instanceof AutoCloseable) { + ((AutoCloseable) catalog).close(); + } + catalog = null; + } catch (Exception e) { + LOG.warn("Failed to close iceberg catalog: {}", getName(), e); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java index 09ec08e904dc7b..2dd3c0c8c6b6c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java @@ -190,4 +190,16 @@ public void checkProperties() throws DdlException { super.checkProperties(); catalogProperty.checkMetaStoreAndStorageProperties(AbstractPaimonProperties.class); } + + @Override + public void onClose() { + super.onClose(); + if (null != catalog) { + try { + catalog.close(); + } catch (Exception e) { + LOG.warn("Failed to close paimon catalog: {}", getName(), e); + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java index 88def12d2a599c..cb2a3e1581688e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java @@ -111,7 +111,8 @@ protected AbstractIcebergProperties(Map props) { * This field is used to perform metadata operations like creating, querying, * and deleting Iceberg tables. */ - public final Catalog initializeCatalog(String catalogName, List storagePropertiesList) { + public final Catalog initializeCatalog(String catalogName, + List storagePropertiesList) { Map catalogProps = new HashMap<>(getOrigProps()); if (StringUtils.isNotBlank(warehouse)) { catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse); diff --git a/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java b/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java new file mode 100644 index 00000000000000..d92e0fcbf1e2bf --- /dev/null +++ b/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java @@ -0,0 +1,724 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.client.builder; + +import org.apache.doris.common.Config; +import org.apache.doris.common.util.UncloseableScheduledExecutorService; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import software.amazon.awssdk.annotations.SdkPreviewApi; +import software.amazon.awssdk.annotations.SdkProtectedApi; +import software.amazon.awssdk.annotations.SdkTestInternalApi; +import software.amazon.awssdk.core.ClientEndpointProvider; +import software.amazon.awssdk.core.ClientType; +import software.amazon.awssdk.core.CompressionConfiguration; +import software.amazon.awssdk.core.SdkPlugin; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.client.config.SdkClientConfiguration; +import software.amazon.awssdk.core.client.config.SdkClientOption; +import software.amazon.awssdk.core.interceptor.ClasspathInterceptorChainFactory; +import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; +import software.amazon.awssdk.core.internal.http.loader.DefaultSdkAsyncHttpClientBuilder; +import software.amazon.awssdk.core.internal.http.loader.DefaultSdkHttpClientBuilder; +import software.amazon.awssdk.core.internal.http.pipeline.stages.CompressRequestStage; +import software.amazon.awssdk.core.internal.interceptor.HttpChecksumValidationInterceptor; +import software.amazon.awssdk.core.internal.retry.SdkDefaultRetryStrategy; +import software.amazon.awssdk.core.internal.useragent.AppIdResolver; +import software.amazon.awssdk.core.internal.useragent.SdkClientUserAgentProperties; +import software.amazon.awssdk.core.internal.useragent.SdkUserAgentBuilder; +import software.amazon.awssdk.core.retry.RetryMode; +import software.amazon.awssdk.core.util.SystemUserAgent; +import software.amazon.awssdk.http.ExecutableHttpRequest; +import software.amazon.awssdk.http.HttpExecuteRequest; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.async.AsyncExecuteRequest; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.identity.spi.IdentityProviders; +import software.amazon.awssdk.metrics.MetricPublisher; +import software.amazon.awssdk.profiles.ProfileFile; +import software.amazon.awssdk.profiles.ProfileFileSystemSetting; +import software.amazon.awssdk.profiles.ProfileProperty; +import software.amazon.awssdk.retries.api.RetryStrategy; +import software.amazon.awssdk.utils.AttributeMap; +import software.amazon.awssdk.utils.AttributeMap.LazyValueSource; +import software.amazon.awssdk.utils.Either; +import software.amazon.awssdk.utils.Lazy; +import software.amazon.awssdk.utils.OptionalUtils; +import software.amazon.awssdk.utils.StringUtils; +import software.amazon.awssdk.utils.ThreadFactoryBuilder; +import software.amazon.awssdk.utils.Validate; +import software.amazon.awssdk.utils.http.SdkHttpUtils; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; + +import static software.amazon.awssdk.core.ClientType.ASYNC; +import static software.amazon.awssdk.core.ClientType.SYNC; +import static software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR; +import static software.amazon.awssdk.core.client.config.SdkAdvancedClientOption.USER_AGENT_PREFIX; +import static software.amazon.awssdk.core.client.config.SdkAdvancedClientOption.USER_AGENT_SUFFIX; +import static software.amazon.awssdk.core.client.config.SdkClientOption.ADDITIONAL_HTTP_HEADERS; +import static software.amazon.awssdk.core.client.config.SdkClientOption.ASYNC_HTTP_CLIENT; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CLIENT_TYPE; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CLIENT_USER_AGENT; +import static software.amazon.awssdk.core.client.config.SdkClientOption.COMPRESSION_CONFIGURATION; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_ASYNC_HTTP_CLIENT; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_ASYNC_HTTP_CLIENT_BUILDER; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_COMPRESSION_CONFIGURATION; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_RETRY_CONFIGURATOR; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_RETRY_MODE; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_RETRY_STRATEGY; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_SCHEDULED_EXECUTOR_SERVICE; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_SYNC_HTTP_CLIENT; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_SYNC_HTTP_CLIENT_BUILDER; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CRC32_FROM_COMPRESSED_DATA_ENABLED; +import static software.amazon.awssdk.core.client.config.SdkClientOption.DEFAULT_RETRY_MODE; +import static software.amazon.awssdk.core.client.config.SdkClientOption.EXECUTION_INTERCEPTORS; +import static software.amazon.awssdk.core.client.config.SdkClientOption.HTTP_CLIENT_CONFIG; +import static software.amazon.awssdk.core.client.config.SdkClientOption.IDENTITY_PROVIDERS; +import static software.amazon.awssdk.core.client.config.SdkClientOption.INTERNAL_USER_AGENT; +import static software.amazon.awssdk.core.client.config.SdkClientOption.METRIC_PUBLISHERS; +import static software.amazon.awssdk.core.client.config.SdkClientOption.PROFILE_FILE; +import static software.amazon.awssdk.core.client.config.SdkClientOption.PROFILE_FILE_SUPPLIER; +import static software.amazon.awssdk.core.client.config.SdkClientOption.PROFILE_NAME; +import static software.amazon.awssdk.core.client.config.SdkClientOption.RETRY_STRATEGY; +import static software.amazon.awssdk.core.client.config.SdkClientOption.SCHEDULED_EXECUTOR_SERVICE; +import static software.amazon.awssdk.core.client.config.SdkClientOption.SYNC_HTTP_CLIENT; +import static software.amazon.awssdk.core.client.config.SdkClientOption.USER_AGENT_APP_ID; +import static software.amazon.awssdk.core.internal.useragent.UserAgentConstant.APP_ID; +import static software.amazon.awssdk.core.internal.useragent.UserAgentConstant.HTTP; +import static software.amazon.awssdk.core.internal.useragent.UserAgentConstant.INTERNAL_METADATA_MARKER; +import static software.amazon.awssdk.core.internal.useragent.UserAgentConstant.IO; +import static software.amazon.awssdk.utils.CollectionUtils.mergeLists; +import static software.amazon.awssdk.utils.Validate.paramNotNull; + +/** + * An SDK-internal implementation of the methods in {@link SdkClientBuilder}, {@link SdkAsyncClientBuilder} and + * {@link SdkSyncClientBuilder}. This implements all methods required by those interfaces, allowing service-specific builders to + * just implement the configuration they wish to add. + * + *

By implementing both the sync and async interface's methods, service-specific builders can share code between their sync + * and + * async variants without needing one to extend the other. Note: This only defines the methods in the sync and async builder + * interfaces. It does not implement the interfaces themselves. This is because the sync and async client builder interfaces both + * require a type-constrained parameter for use in fluent chaining, and a generic type parameter conflict is introduced into the + * class hierarchy by this interface extending the builder interfaces themselves.

+ * + *

Like all {@link SdkClientBuilder}s, this class is not thread safe.

+ * + * @param The type of builder, for chaining. + * @param The type of client generated by this builder. + */ +/** + * This class(software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder) is copied from AWS SDK v2 (sdk-core 2.29.52), + * with minor modifications. + */ +@SdkProtectedApi +public abstract class SdkDefaultClientBuilder, C> implements SdkClientBuilder { + private static final Logger LOG = LogManager.getLogger(SdkDefaultClientBuilder.class); + + private static final SdkHttpClient.Builder DEFAULT_HTTP_CLIENT_BUILDER = new DefaultSdkHttpClientBuilder(); + private static final SdkAsyncHttpClient.Builder DEFAULT_ASYNC_HTTP_CLIENT_BUILDER = new DefaultSdkAsyncHttpClientBuilder(); + + protected final SdkClientConfiguration.Builder clientConfiguration = SdkClientConfiguration.builder(); + + protected final AttributeMap.Builder clientContextParams = AttributeMap.builder(); + protected ClientOverrideConfiguration overrideConfig; + private final SdkHttpClient.Builder defaultHttpClientBuilder; + private final SdkAsyncHttpClient.Builder defaultAsyncHttpClientBuilder; + private final List plugins = new ArrayList<>(); + private static final ScheduledExecutorService awsSdkScheduler; + + static { + ScheduledExecutorService realScheduler = + Executors.newScheduledThreadPool( + Config.aws_sdk_async_scheduler_thread_pool_size, + r -> { + Thread t = new Thread(r, "aws-sdk-scheduler"); + t.setDaemon(true); + return t; + } + ); + + awsSdkScheduler = new UncloseableScheduledExecutorService(realScheduler); + } + + + protected SdkDefaultClientBuilder() { + this(DEFAULT_HTTP_CLIENT_BUILDER, DEFAULT_ASYNC_HTTP_CLIENT_BUILDER); + } + + @SdkTestInternalApi + protected SdkDefaultClientBuilder(SdkHttpClient.Builder defaultHttpClientBuilder, + SdkAsyncHttpClient.Builder defaultAsyncHttpClientBuilder) { + this.defaultHttpClientBuilder = defaultHttpClientBuilder; + this.defaultAsyncHttpClientBuilder = defaultAsyncHttpClientBuilder; + } + + /** + * Build a client using the current state of this builder. This is marked final in order to allow this class to add standard + * "build" logic between all service clients. Service clients are expected to implement the {@link #buildClient} method, that + * accepts the immutable client configuration generated by this build method. + */ + @Override + public final C build() { + return buildClient(); + } + + /** + * Implemented by child classes to create a client using the provided immutable configuration objects. The async and sync + * configurations are not yet immutable. Child classes will need to make them immutable in order to validate them and pass + * them to the client's constructor. + * + * @return A client based on the provided configuration. + */ + protected abstract C buildClient(); + + /** + * Return a client configuration object, populated with the following chain of priorities. + *
    + *
  1. Client Configuration Overrides
  2. + *
  3. Customer Configuration
  4. + *
  5. Service-Specific Defaults
  6. + *
  7. Global Defaults
  8. + *
+ */ + protected final SdkClientConfiguration syncClientConfiguration() { + clientConfiguration.option(SdkClientOption.CLIENT_CONTEXT_PARAMS, clientContextParams.build()); + SdkClientConfiguration configuration = clientConfiguration.build(); + + // Apply overrides + configuration = setOverrides(configuration); + + // Apply defaults + configuration = mergeChildDefaults(configuration); + configuration = mergeGlobalDefaults(configuration); + + // Create additional configuration from the default-applied configuration + configuration = finalizeChildConfiguration(configuration); + configuration = finalizeSyncConfiguration(configuration); + configuration = finalizeConfiguration(configuration); + + // Invoke the plugins + configuration = invokePlugins(configuration); + + return configuration; + } + + /** + * Return a client configuration object, populated with the following chain of priorities. + *
    + *
  1. Client Configuration Overrides
  2. + *
  3. Customer Configuration
  4. + *
  5. Implementation/Service-Specific Configuration
  6. + *
  7. Global Default Configuration
  8. + *
+ */ + protected final SdkClientConfiguration asyncClientConfiguration() { + clientConfiguration.option(SdkClientOption.CLIENT_CONTEXT_PARAMS, clientContextParams.build()); + SdkClientConfiguration configuration = clientConfiguration.build(); + + // Apply overrides + configuration = setOverrides(configuration); + + // Apply defaults + configuration = mergeChildDefaults(configuration); + configuration = mergeGlobalDefaults(configuration); + + // Create additional configuration from the default-applied configuration + configuration = finalizeChildConfiguration(configuration); + configuration = finalizeAsyncConfiguration(configuration); + configuration = finalizeConfiguration(configuration); + + // Invoke the plugins + configuration = invokePlugins(configuration); + + return configuration; + } + + /** + * Apply the client override configuration to the provided configuration. This generally does not need to be overridden by + * child classes, but some previous client versions override it. + */ + protected SdkClientConfiguration setOverrides(SdkClientConfiguration configuration) { + if (overrideConfig == null) { + return configuration; + } + SdkClientConfiguration.Builder builder = configuration.toBuilder(); + overrideConfig.retryStrategy().ifPresent(retryStrategy -> builder.option(RETRY_STRATEGY, retryStrategy)); + overrideConfig.retryMode().ifPresent(retryMode -> builder.option(RETRY_STRATEGY, + SdkDefaultRetryStrategy.forRetryMode(retryMode))); + overrideConfig.retryStrategyConfigurator().ifPresent(configurator -> { + RetryStrategy.Builder defaultBuilder = SdkDefaultRetryStrategy.defaultRetryStrategy().toBuilder(); + configurator.accept(defaultBuilder); + builder.option(RETRY_STRATEGY, defaultBuilder.build()); + }); + builder.putAll(overrideConfig); + // Forget anything we configured in the override configuration else it might be re-applied. + builder.option(CONFIGURED_RETRY_MODE, null); + builder.option(CONFIGURED_RETRY_STRATEGY, null); + builder.option(CONFIGURED_RETRY_CONFIGURATOR, null); + return builder.build(); + } + + + /** + * Optionally overridden by child implementations to apply implementation-specific default configuration. + * (eg. AWS's default credentials providers) + */ + protected SdkClientConfiguration mergeChildDefaults(SdkClientConfiguration configuration) { + return configuration; + } + + /** + * Apply global default configuration + */ + private SdkClientConfiguration mergeGlobalDefaults(SdkClientConfiguration configuration) { + Supplier defaultProfileFileSupplier = new Lazy<>(ProfileFile::defaultProfileFile)::getValue; + + configuration = configuration.merge(c -> c.option(EXECUTION_INTERCEPTORS, new ArrayList<>()) + .option(METRIC_PUBLISHERS, new ArrayList<>()) + .option(ADDITIONAL_HTTP_HEADERS, new LinkedHashMap<>()) + .option(PROFILE_FILE_SUPPLIER, defaultProfileFileSupplier) + .lazyOption(PROFILE_FILE, conf -> conf.get(PROFILE_FILE_SUPPLIER).get()) + .option(PROFILE_NAME, + ProfileFileSystemSetting.AWS_PROFILE.getStringValueOrThrow()) + .option(USER_AGENT_PREFIX, "") + .option(USER_AGENT_SUFFIX, "") + .option(CRC32_FROM_COMPRESSED_DATA_ENABLED, false) + .option(CONFIGURED_COMPRESSION_CONFIGURATION, + CompressionConfiguration.builder().build())); + return configuration; + } + + /** + * Optionally overridden by child implementations to derive implementation-specific configuration from the + * default-applied configuration. (eg. AWS's endpoint, derived from the region). + */ + protected SdkClientConfiguration finalizeChildConfiguration(SdkClientConfiguration configuration) { + return configuration; + } + + /** + * Finalize sync-specific configuration from the default-applied configuration. + */ + private SdkClientConfiguration finalizeSyncConfiguration(SdkClientConfiguration config) { + return config.toBuilder() + .lazyOption(SdkClientOption.SYNC_HTTP_CLIENT, c -> resolveSyncHttpClient(c, config)) + .option(SdkClientOption.CLIENT_TYPE, SYNC) + .build(); + } + + /** + * Finalize async-specific configuration from the default-applied configuration. + */ + private SdkClientConfiguration finalizeAsyncConfiguration(SdkClientConfiguration config) { + return config.toBuilder() + .lazyOptionIfAbsent(FUTURE_COMPLETION_EXECUTOR, this::resolveAsyncFutureCompletionExecutor) + .lazyOption(ASYNC_HTTP_CLIENT, c -> resolveAsyncHttpClient(c, config)) + .option(SdkClientOption.CLIENT_TYPE, ASYNC) + .build(); + } + + /** + * Finalize global configuration from the default-applied configuration. + */ + private SdkClientConfiguration finalizeConfiguration(SdkClientConfiguration config) { + return config.toBuilder() + .lazyOption(SCHEDULED_EXECUTOR_SERVICE, this::resolveScheduledExecutorService) + .lazyOptionIfAbsent(RETRY_STRATEGY, this::resolveRetryStrategy) + .option(EXECUTION_INTERCEPTORS, resolveExecutionInterceptors(config)) + .lazyOption(CLIENT_USER_AGENT, this::resolveClientUserAgent) + .lazyOption(COMPRESSION_CONFIGURATION, this::resolveCompressionConfiguration) + .lazyOptionIfAbsent(IDENTITY_PROVIDERS, c -> IdentityProviders.builder().build()) + .build(); + } + + private CompressionConfiguration resolveCompressionConfiguration(LazyValueSource config) { + CompressionConfiguration compressionConfig = config.get(CONFIGURED_COMPRESSION_CONFIGURATION); + return compressionConfig.toBuilder() + .requestCompressionEnabled(resolveCompressionEnabled(config, compressionConfig)) + .minimumCompressionThresholdInBytes(resolveMinCompressionThreshold(config, compressionConfig)) + .build(); + } + + private Boolean resolveCompressionEnabled(LazyValueSource config, CompressionConfiguration compressionConfig) { + Supplier> systemSettingConfiguration = + () -> SdkSystemSetting.AWS_DISABLE_REQUEST_COMPRESSION.getBooleanValue() + .map(v -> !v); + + Supplier> profileFileConfiguration = + () -> config.get(PROFILE_FILE_SUPPLIER).get() + .profile(config.get(PROFILE_NAME)) + .flatMap(p -> p.booleanProperty(ProfileProperty.DISABLE_REQUEST_COMPRESSION)) + .map(v -> !v); + + return OptionalUtils.firstPresent(Optional.ofNullable(compressionConfig.requestCompressionEnabled()), + systemSettingConfiguration, + profileFileConfiguration) + .orElse(true); + } + + private Integer resolveMinCompressionThreshold(LazyValueSource config, CompressionConfiguration compressionConfig) { + Supplier> systemSettingConfiguration = + SdkSystemSetting.AWS_REQUEST_MIN_COMPRESSION_SIZE_BYTES::getIntegerValue; + + Supplier> profileFileConfiguration = + () -> config.get(PROFILE_FILE_SUPPLIER).get() + .profile(config.get(PROFILE_NAME)) + .flatMap(p -> p.property(ProfileProperty.REQUEST_MIN_COMPRESSION_SIZE_BYTES)) + .map(Integer::parseInt); + + return OptionalUtils.firstPresent(Optional.ofNullable(compressionConfig.minimumCompressionThresholdInBytes()), + systemSettingConfiguration, + profileFileConfiguration) + .orElse(CompressRequestStage.DEFAULT_MIN_COMPRESSION_SIZE); + } + + /** + * By default, returns the configuration as-is. Classes extending this method will take care of running the plugins and + * return the updated configuration if plugins are supported. + */ + @SdkPreviewApi + protected SdkClientConfiguration invokePlugins(SdkClientConfiguration config) { + return config; + } + + private String resolveClientUserAgent(LazyValueSource config) { + SdkClientUserAgentProperties clientProperties = new SdkClientUserAgentProperties(); + + ClientType clientType = config.get(CLIENT_TYPE); + ClientType resolvedClientType = clientType == null ? ClientType.UNKNOWN : clientType; + + clientProperties.putProperty(INTERNAL_METADATA_MARKER, StringUtils.trimToEmpty(config.get(INTERNAL_USER_AGENT))); + clientProperties.putProperty(IO, StringUtils.lowerCase(resolvedClientType.name())); + clientProperties.putProperty(HTTP, SdkHttpUtils.urlEncode(clientName(resolvedClientType, + config.get(SYNC_HTTP_CLIENT), + config.get(ASYNC_HTTP_CLIENT)))); + String appId = config.get(USER_AGENT_APP_ID); + String resolvedAppId = appId == null ? resolveAppId(config) : appId; + clientProperties.putProperty(APP_ID, resolvedAppId); + return SdkUserAgentBuilder.buildClientUserAgentString(SystemUserAgent.getOrCreate(), clientProperties); + } + + private String resolveAppId(LazyValueSource config) { + Optional appIdFromConfig = AppIdResolver.create() + .profileFile(config.get(PROFILE_FILE_SUPPLIER)) + .profileName(config.get(PROFILE_NAME)) + .resolve(); + return appIdFromConfig.orElse(null); + } + + private static String clientName(ClientType clientType, SdkHttpClient syncHttpClient, SdkAsyncHttpClient asyncHttpClient) { + if (clientType == SYNC) { + return syncHttpClient == null ? "null" : syncHttpClient.clientName(); + } + + if (clientType == ASYNC) { + return asyncHttpClient == null ? "null" : asyncHttpClient.clientName(); + } + + return ClientType.UNKNOWN.name(); + } + + private RetryStrategy resolveRetryStrategy(LazyValueSource config) { + RetryMode retryMode = RetryMode.resolver() + .profileFile(config.get(PROFILE_FILE_SUPPLIER)) + .profileName(config.get(PROFILE_NAME)) + .defaultRetryMode(config.get(DEFAULT_RETRY_MODE)) + .resolve(); + return SdkDefaultRetryStrategy.forRetryMode(retryMode); + } + + /** + * Finalize which sync HTTP client will be used for the created client. + */ + private SdkHttpClient resolveSyncHttpClient(LazyValueSource config, + SdkClientConfiguration deprecatedConfigDoNotUseThis) { + SdkHttpClient httpClient = config.get(CONFIGURED_SYNC_HTTP_CLIENT); + SdkHttpClient.Builder httpClientBuilder = config.get(CONFIGURED_SYNC_HTTP_CLIENT_BUILDER); + Validate.isTrue(httpClient == null || + httpClientBuilder == null, + "The httpClient and the httpClientBuilder can't both be configured."); + + AttributeMap httpClientConfig = getHttpClientConfig(config, deprecatedConfigDoNotUseThis); + + return Either.fromNullable(httpClient, httpClientBuilder) + .map(e -> e.map(Function.identity(), b -> b.buildWithDefaults(httpClientConfig))) + .orElseGet(() -> defaultHttpClientBuilder.buildWithDefaults(httpClientConfig)); + } + + /** + * Finalize which async HTTP client will be used for the created client. + */ + private SdkAsyncHttpClient resolveAsyncHttpClient(LazyValueSource config, + SdkClientConfiguration deprecatedConfigDoNotUseThis) { + Validate.isTrue(config.get(CONFIGURED_ASYNC_HTTP_CLIENT) == null || + config.get(CONFIGURED_ASYNC_HTTP_CLIENT_BUILDER) == null, + "The asyncHttpClient and the asyncHttpClientBuilder can't both be configured."); + + AttributeMap httpClientConfig = getHttpClientConfig(config, deprecatedConfigDoNotUseThis); + + return Either.fromNullable(config.get(CONFIGURED_ASYNC_HTTP_CLIENT), config.get(CONFIGURED_ASYNC_HTTP_CLIENT_BUILDER)) + .map(e -> e.map(Function.identity(), b -> b.buildWithDefaults(httpClientConfig))) + .orElseGet(() -> defaultAsyncHttpClientBuilder.buildWithDefaults(httpClientConfig)); + } + + private AttributeMap getHttpClientConfig(LazyValueSource config, SdkClientConfiguration deprecatedConfigDoNotUseThis) { + AttributeMap httpClientConfig = config.get(HTTP_CLIENT_CONFIG); + if (httpClientConfig == null) { + // We must be using an old client, use the deprecated way of loading HTTP_CLIENT_CONFIG, instead. This won't take + // into account any configuration changes (e.g. defaults mode) from plugins, but this is the best we can do without + // breaking protected APIs. TODO: if we ever break protected APIs, remove these "childHttpConfig" hooks. + httpClientConfig = childHttpConfig(deprecatedConfigDoNotUseThis); + } + return httpClientConfig; + } + + /** + * @deprecated Configure {@link SdkClientOption#HTTP_CLIENT_CONFIG} from {@link #finalizeChildConfiguration} instead. + */ + @Deprecated + protected AttributeMap childHttpConfig(SdkClientConfiguration configuration) { + return childHttpConfig(); + } + + /** + * @deprecated Configure {@link SdkClientOption#HTTP_CLIENT_CONFIG} from {@link #finalizeChildConfiguration} instead. + */ + @Deprecated + protected AttributeMap childHttpConfig() { + return AttributeMap.empty(); + } + + /** + * Finalize which async executor service will be used for the created client. The default async executor + * service has at least 8 core threads and can scale up to at least 64 threads when needed depending + * on the number of processors available. + */ + private Executor resolveAsyncFutureCompletionExecutor(LazyValueSource config) { + int processors = Runtime.getRuntime().availableProcessors(); + int corePoolSize = Math.max(8, processors); + int maxPoolSize = Math.max(64, processors * 2); + ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, + 10, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(1_000), + new ThreadFactoryBuilder() + .threadNamePrefix("sdk-async-response").build()); + // Allow idle core threads to time out + executor.allowCoreThreadTimeOut(true); + return executor; + } + + /** + * Finalize the internal SDK scheduled executor service that is used for scheduling tasks such as async retry attempts and + * timeout task. + */ + private ScheduledExecutorService resolveScheduledExecutorService(LazyValueSource c) { + ScheduledExecutorService executor = c.get(CONFIGURED_SCHEDULED_EXECUTOR_SERVICE); + if (executor != null) { + return executor; + } + + return awsSdkScheduler; + } + + /** + * Finalize which execution interceptors will be used for the created client. + */ + private List resolveExecutionInterceptors(SdkClientConfiguration config) { + List globalInterceptors = new ArrayList<>(); + globalInterceptors.addAll(sdkInterceptors()); + globalInterceptors.addAll(new ClasspathInterceptorChainFactory().getGlobalInterceptors()); + return mergeLists(globalInterceptors, config.option(EXECUTION_INTERCEPTORS)); + } + + + /** + * The set of interceptors that should be included with all services. + */ + private List sdkInterceptors() { + return Collections.unmodifiableList(Arrays.asList( + new HttpChecksumValidationInterceptor() + )); + } + + @Override + public final B endpointOverride(URI endpointOverride) { + if (endpointOverride == null) { + clientConfiguration.option(SdkClientOption.CLIENT_ENDPOINT_PROVIDER, null); + } else { + clientConfiguration.option(SdkClientOption.CLIENT_ENDPOINT_PROVIDER, + ClientEndpointProvider.forEndpointOverride(endpointOverride)); + } + return thisBuilder(); + } + + public final void setEndpointOverride(URI endpointOverride) { + endpointOverride(endpointOverride); + } + + public final B asyncConfiguration(ClientAsyncConfiguration asyncConfiguration) { + clientConfiguration.option(FUTURE_COMPLETION_EXECUTOR, asyncConfiguration.advancedOption(FUTURE_COMPLETION_EXECUTOR)); + return thisBuilder(); + } + + public final void setAsyncConfiguration(ClientAsyncConfiguration asyncConfiguration) { + asyncConfiguration(asyncConfiguration); + } + + @Override + public final B overrideConfiguration(ClientOverrideConfiguration overrideConfig) { + this.overrideConfig = overrideConfig; + return thisBuilder(); + } + + public final void setOverrideConfiguration(ClientOverrideConfiguration overrideConfiguration) { + overrideConfiguration(overrideConfiguration); + } + + @Override + public final ClientOverrideConfiguration overrideConfiguration() { + if (overrideConfig == null) { + return ClientOverrideConfiguration.builder().build(); + } + return overrideConfig; + } + + public final B httpClient(SdkHttpClient httpClient) { + if (httpClient != null) { + httpClient = new NonManagedSdkHttpClient(httpClient); + } + clientConfiguration.option(CONFIGURED_SYNC_HTTP_CLIENT, httpClient); + return thisBuilder(); + } + + public final B httpClientBuilder(SdkHttpClient.Builder httpClientBuilder) { + clientConfiguration.option(CONFIGURED_SYNC_HTTP_CLIENT_BUILDER, httpClientBuilder); + return thisBuilder(); + } + + public final B httpClient(SdkAsyncHttpClient httpClient) { + if (httpClient != null) { + httpClient = new NonManagedSdkAsyncHttpClient(httpClient); + } + clientConfiguration.option(CONFIGURED_ASYNC_HTTP_CLIENT, httpClient); + return thisBuilder(); + } + + public final B httpClientBuilder(SdkAsyncHttpClient.Builder httpClientBuilder) { + clientConfiguration.option(CONFIGURED_ASYNC_HTTP_CLIENT_BUILDER, httpClientBuilder); + return thisBuilder(); + } + + public final B metricPublishers(List metricPublishers) { + clientConfiguration.option(METRIC_PUBLISHERS, metricPublishers); + return thisBuilder(); + } + + @Override + public final B addPlugin(SdkPlugin plugin) { + plugins.add(paramNotNull(plugin, "plugin")); + return thisBuilder(); + } + + @Override + public final List plugins() { + return Collections.unmodifiableList(plugins); + } + + /** + * Return "this" for method chaining. + */ + @SuppressWarnings("unchecked") + protected B thisBuilder() { + return (B) this; + } + + /** + * Wrapper around {@link SdkHttpClient} to prevent it from being closed. Used when the customer provides + * an already built client in which case they are responsible for the lifecycle of it. + */ + @SdkTestInternalApi + public static final class NonManagedSdkHttpClient implements SdkHttpClient { + + private final SdkHttpClient delegate; + + private NonManagedSdkHttpClient(SdkHttpClient delegate) { + this.delegate = paramNotNull(delegate, "SdkHttpClient"); + } + + @Override + public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) { + return delegate.prepareRequest(request); + } + + @Override + public void close() { + // Do nothing, this client is managed by the customer. + } + + @Override + public String clientName() { + return delegate.clientName(); + } + } + + /** + * Wrapper around {@link SdkAsyncHttpClient} to prevent it from being closed. Used when the customer provides + * an already built client in which case they are responsible for the lifecycle of it. + */ + @SdkTestInternalApi + public static final class NonManagedSdkAsyncHttpClient implements SdkAsyncHttpClient { + + private final SdkAsyncHttpClient delegate; + + NonManagedSdkAsyncHttpClient(SdkAsyncHttpClient delegate) { + this.delegate = paramNotNull(delegate, "SdkAsyncHttpClient"); + } + + @Override + public CompletableFuture execute(AsyncExecuteRequest request) { + return delegate.execute(request); + } + + @Override + public String clientName() { + return delegate.clientName(); + } + + @Override + public void close() { + // Do nothing, this client is managed by the customer. + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractIcebergPropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractIcebergPropertiesTest.java index 275180c387f197..228afe85b618f4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractIcebergPropertiesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractIcebergPropertiesTest.java @@ -48,8 +48,8 @@ public String getIcebergCatalogType() { @Override protected Catalog initCatalog(String catalogName, - Map catalogProps, - List storagePropertiesList) { + Map catalogProps, + List storagePropertiesList) { // Capture the catalogProps for verification this.capturedCatalogProps = new HashMap<>(catalogProps); return catalogToReturn;