From 2feb908c3c7d96cfa376a2d38c7a1a53a78c0166 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Wed, 31 Dec 2025 18:15:51 +0800 Subject: [PATCH 01/11] [Fix](catalog)Resources should be closed when dropping a Catalog. --- .../iceberg/IcebergExternalCatalog.java | 13 ++++++++++-- .../paimon/PaimonExternalCatalog.java | 12 +++++++++++ .../metastore/AbstractIcebergProperties.java | 9 ++++---- .../IcebergAliyunDLFMetaStoreProperties.java | 6 +++--- .../IcebergFileSystemMetaStoreProperties.java | 6 +++--- .../IcebergGlueMetaStoreProperties.java | 6 +++--- .../IcebergHMSMetaStoreProperties.java | 6 +++--- .../metastore/IcebergRestProperties.java | 8 +++---- .../IcebergS3TablesMetaStoreProperties.java | 6 +++--- .../AbstractIcebergPropertiesTest.java | 21 ++++++++++--------- .../paimon/test_paimon_dlf_catalog.groovy | 2 ++ ...t_paimon_dlf_catalog_miss_dlf_param.groovy | 1 + .../test_paimon_dlf_catalog_new_param.groovy | 2 ++ .../test_paimon_dlf_rest_catalog.groovy | 2 ++ .../paimon/test_paimon_hms_catalog.groovy | 3 +++ .../hive_on_hms_and_dlf.groovy | 16 ++++++++++++++ .../iceberg_and_hive_on_glue.groovy | 17 +++++++++++++++ ...eberg_on_hms_and_filesystem_and_dlf.groovy | 7 +++++++ 18 files changed, 108 insertions(+), 35 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 8d08e3e8eae934..db79432afab5da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -33,14 +33,19 @@ import org.apache.doris.transaction.TransactionManagerFactory; import org.apache.commons.lang3.math.NumberUtils; +import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.catalog.Catalog; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Objects; public abstract class IcebergExternalCatalog extends ExternalCatalog { + private static final Logger LOG = LogManager.getLogger(IcebergExternalCatalog.class); public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type"; public static final String ICEBERG_REST = "rest"; public static final String ICEBERG_HMS = "hms"; @@ -58,7 +63,7 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY_MB = 1024; public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND = 48 * 60 * 60; protected String icebergCatalogType; - protected Catalog catalog; + protected BaseMetastoreCatalog catalog; private AbstractIcebergProperties msProperties; @@ -209,7 +214,11 @@ public List listTableNames(SessionContext ctx, String dbName) { public void onClose() { super.onClose(); if (null != catalog) { - catalog = null; + try { + catalog.close(); + } catch (IOException e) { + LOG.warn("Failed to close iceberg catalog: {}", getName(), e); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java index 09ec08e904dc7b..2dd3c0c8c6b6c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java @@ -190,4 +190,16 @@ public void checkProperties() throws DdlException { super.checkProperties(); catalogProperty.checkMetaStoreAndStorageProperties(AbstractPaimonProperties.class); } + + @Override + public void onClose() { + super.onClose(); + if (null != catalog) { + try { + catalog.close(); + } catch (Exception e) { + LOG.warn("Failed to close paimon catalog: {}", getName(), e); + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java index 88def12d2a599c..715e7274f60b6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java @@ -23,8 +23,8 @@ import lombok.Getter; import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.catalog.Catalog; import java.util.HashMap; import java.util.List; @@ -111,7 +111,8 @@ protected AbstractIcebergProperties(Map props) { * This field is used to perform metadata operations like creating, querying, * and deleting Iceberg tables. */ - public final Catalog initializeCatalog(String catalogName, List storagePropertiesList) { + public final BaseMetastoreCatalog initializeCatalog(String catalogName, + List storagePropertiesList) { Map catalogProps = new HashMap<>(getOrigProps()); if (StringUtils.isNotBlank(warehouse)) { catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse); @@ -120,7 +121,7 @@ public final Catalog initializeCatalog(String catalogName, List catalogProps) { /** * Subclasses must implement this to create the concrete Catalog instance. */ - protected abstract Catalog initCatalog( + protected abstract BaseMetastoreCatalog initCatalog( String catalogName, Map catalogProps, List storagePropertiesList diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergAliyunDLFMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergAliyunDLFMetaStoreProperties.java index 26b7149bbc7af3..980ef051f956bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergAliyunDLFMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergAliyunDLFMetaStoreProperties.java @@ -23,7 +23,7 @@ import com.aliyun.datalake.metastore.common.DataLakeConfig; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.BaseMetastoreCatalog; import java.util.List; import java.util.Map; @@ -45,8 +45,8 @@ public String getIcebergCatalogType() { } @Override - public Catalog initCatalog(String catalogName, Map catalogProps, - List storagePropertiesList) { + public BaseMetastoreCatalog initCatalog(String catalogName, Map catalogProps, + List storagePropertiesList) { DLFCatalog dlfCatalog = new DLFCatalog(); // @see com.aliyun.datalake.metastore.hive.common.utils.ConfigUtils Configuration conf = new Configuration(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java index d644b7b06e0a51..d65c2999cf71a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java @@ -24,7 +24,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.hadoop.HadoopCatalog; import java.util.List; @@ -42,8 +42,8 @@ public IcebergFileSystemMetaStoreProperties(Map props) { } @Override - public Catalog initCatalog(String catalogName, Map catalogProps, - List storagePropertiesList) { + public BaseMetastoreCatalog initCatalog(String catalogName, Map catalogProps, + List storagePropertiesList) { Configuration configuration = buildConfiguration(storagePropertiesList); HadoopCatalog catalog = new HadoopCatalog(); buildCatalogProps(storagePropertiesList); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergGlueMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergGlueMetaStoreProperties.java index 3039a96ea9d3f4..9d578bb0bae993 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergGlueMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergGlueMetaStoreProperties.java @@ -23,11 +23,11 @@ import lombok.Getter; import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.aws.glue.GlueCatalog; import org.apache.iceberg.aws.s3.S3FileIOProperties; -import org.apache.iceberg.catalog.Catalog; import java.util.List; import java.util.Map; @@ -60,8 +60,8 @@ public void initNormalizeAndCheckProps() { } @Override - public Catalog initCatalog(String catalogName, Map catalogProps, - List storagePropertiesList) { + public BaseMetastoreCatalog initCatalog(String catalogName, Map catalogProps, + List storagePropertiesList) { appendS3Props(catalogProps); appendGlueProps(catalogProps); catalogProps.put("client.region", glueProperties.glueRegion); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java index dc6b4b448ae4a7..ffda556e2f8da4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java @@ -25,7 +25,7 @@ import lombok.Getter; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.hive.HiveCatalog; import java.util.HashMap; @@ -64,8 +64,8 @@ public void initNormalizeAndCheckProps() { } @Override - public Catalog initCatalog(String catalogName, Map catalogProps, - List storagePropertiesList) { + public BaseMetastoreCatalog initCatalog(String catalogName, Map catalogProps, + List storagePropertiesList) { checkInitialized(); Configuration conf = buildHiveConfiguration(storagePropertiesList); HiveCatalog hiveCatalog = new HiveCatalog(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java index 688a268522b2cd..99646b8cd55080 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java @@ -27,11 +27,11 @@ import lombok.Getter; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.aws.AwsClientProperties; import org.apache.iceberg.aws.s3.S3FileIOProperties; -import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.rest.auth.OAuth2Properties; import org.apache.logging.log4j.util.Strings; @@ -174,8 +174,8 @@ public String getIcebergCatalogType() { } @Override - public Catalog initCatalog(String catalogName, Map catalogProps, - List storagePropertiesList) { + public BaseMetastoreCatalog initCatalog(String catalogName, Map catalogProps, + List storagePropertiesList) { Map fileIOProperties = Maps.newHashMap(); Configuration conf = new Configuration(); toFileIOProperties(storagePropertiesList, fileIOProperties, conf); @@ -185,7 +185,7 @@ public Catalog initCatalog(String catalogName, Map catalogProps, options.putAll(fileIOProperties); // 4. Build iceberg catalog - return CatalogUtil.buildIcebergCatalog(catalogName, options, conf); + return (BaseMetastoreCatalog) CatalogUtil.buildIcebergCatalog(catalogName, options, conf); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergS3TablesMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergS3TablesMetaStoreProperties.java index 0b101623c2b10c..a68814b6391a87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergS3TablesMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergS3TablesMetaStoreProperties.java @@ -23,7 +23,7 @@ import org.apache.doris.datasource.property.storage.StorageProperties; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.BaseMetastoreCatalog; import software.amazon.s3tables.iceberg.S3TablesCatalog; import java.util.List; @@ -49,8 +49,8 @@ public void initNormalizeAndCheckProps() { } @Override - public Catalog initCatalog(String catalogName, Map catalogProps, - List storagePropertiesList) { + public BaseMetastoreCatalog initCatalog(String catalogName, Map catalogProps, + List storagePropertiesList) { checkInitialized(); buildS3CatalogProperties(catalogProps); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractIcebergPropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractIcebergPropertiesTest.java index 275180c387f197..587fcd4d3bba83 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractIcebergPropertiesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractIcebergPropertiesTest.java @@ -19,6 +19,7 @@ import org.apache.doris.datasource.property.storage.StorageProperties; +import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.catalog.Catalog; import org.junit.jupiter.api.Assertions; @@ -33,10 +34,10 @@ public class AbstractIcebergPropertiesTest { private static class TestIcebergProperties extends AbstractIcebergProperties { - private final Catalog catalogToReturn; + private final BaseMetastoreCatalog catalogToReturn; private Map capturedCatalogProps; - TestIcebergProperties(Map props, Catalog catalogToReturn) { + TestIcebergProperties(Map props, BaseMetastoreCatalog catalogToReturn) { super(props); this.catalogToReturn = catalogToReturn; } @@ -47,9 +48,9 @@ public String getIcebergCatalogType() { } @Override - protected Catalog initCatalog(String catalogName, - Map catalogProps, - List storagePropertiesList) { + protected BaseMetastoreCatalog initCatalog(String catalogName, + Map catalogProps, + List storagePropertiesList) { // Capture the catalogProps for verification this.capturedCatalogProps = new HashMap<>(catalogProps); return catalogToReturn; @@ -62,13 +63,13 @@ Map getCapturedCatalogProps() { @Test void testInitializeCatalogWithWarehouse() { - Catalog mockCatalog = Mockito.mock(Catalog.class); + BaseMetastoreCatalog mockCatalog = Mockito.mock(BaseMetastoreCatalog.class); Mockito.when(mockCatalog.name()).thenReturn("mocked-catalog"); Map props = new HashMap<>(); props.put("k1", "v1"); TestIcebergProperties properties = new TestIcebergProperties(props, mockCatalog); properties.warehouse = "s3://bucket/warehouse"; - Catalog result = properties.initializeCatalog("testCatalog", Collections.emptyList()); + BaseMetastoreCatalog result = properties.initializeCatalog("testCatalog", Collections.emptyList()); Assertions.assertNotNull(result); Assertions.assertEquals("mocked-catalog", result.name()); // Verify that warehouse is included in catalogProps @@ -81,11 +82,11 @@ void testInitializeCatalogWithWarehouse() { @Test void testInitializeCatalogWithoutWarehouse() { - Catalog mockCatalog = Mockito.mock(Catalog.class); + BaseMetastoreCatalog mockCatalog = Mockito.mock(Catalog.class); Mockito.when(mockCatalog.name()).thenReturn("no-warehouse"); TestIcebergProperties properties = new TestIcebergProperties(new HashMap<>(), mockCatalog); properties.warehouse = null; - Catalog result = properties.initializeCatalog("testCatalog", Collections.emptyList()); + BaseMetastoreCatalog result = properties.initializeCatalog("testCatalog", Collections.emptyList()); Assertions.assertNotNull(result); Assertions.assertEquals("no-warehouse", result.name()); // Verify that warehouse key is not present @@ -102,7 +103,7 @@ public String getIcebergCatalogType() { } @Override - protected Catalog initCatalog(String catalogName, + protected BaseMetastoreCatalog initCatalog(String catalogName, Map catalogProps, List storagePropertiesList) { return null; // Simulate a failure case diff --git a/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog.groovy b/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog.groovy index 5a68844baa8c31..bc1e2c96453808 100644 --- a/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog.groovy +++ b/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog.groovy @@ -64,6 +64,8 @@ suite("test_paimon_dlf_catalog", "p2,external,paimon,external_remote,external_re sql """SELECT * FROM tb_simple\$snapshots;""" } finally { sql """set force_jni_scanner=false""" + + sql """drop catalog if exists ${catalog};""" } } diff --git a/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog_miss_dlf_param.groovy b/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog_miss_dlf_param.groovy index b2deafc316acab..11e331481ebbbd 100644 --- a/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog_miss_dlf_param.groovy +++ b/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog_miss_dlf_param.groovy @@ -69,6 +69,7 @@ suite("test_paimon_dlf_catalog_miss_dlf_param", "p2,external,paimon,external_rem } finally { sql """set force_jni_scanner=false""" + sql """drop catalog if exists ${catalog};""" } } diff --git a/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog_new_param.groovy b/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog_new_param.groovy index 1044dc8b778ee4..6594c55dc64065 100644 --- a/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog_new_param.groovy +++ b/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog_new_param.groovy @@ -71,6 +71,8 @@ suite("test_paimon_dlf_catalog_new_param", "p2,external,paimon,external_remote,e } finally { sql """set force_jni_scanner=false""" + + sql """drop catalog if exists ${catalog};""" } } diff --git a/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_rest_catalog.groovy b/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_rest_catalog.groovy index e195c6dd180296..308f0ca8a97c7f 100644 --- a/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_rest_catalog.groovy +++ b/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_rest_catalog.groovy @@ -44,6 +44,8 @@ suite("test_paimon_dlf_rest_catalog", "p2,external,paimon,external_remote,extern } finally { sql """set force_jni_scanner=false""" + + sql """drop catalog if exists ${catalog};""" } } diff --git a/regression-test/suites/external_table_p2/paimon/test_paimon_hms_catalog.groovy b/regression-test/suites/external_table_p2/paimon/test_paimon_hms_catalog.groovy index 790f1095d4e602..79292df1c165ab 100644 --- a/regression-test/suites/external_table_p2/paimon/test_paimon_hms_catalog.groovy +++ b/regression-test/suites/external_table_p2/paimon/test_paimon_hms_catalog.groovy @@ -56,6 +56,9 @@ suite("test_paimon_hms_catalog", "p2,external,paimon,new_catalog_property") { // TODO(zgx): add branch/tag // system table sql """SELECT * FROM external_test_table\$snapshots;""" + + + sql """drop catalog if exists ${catalog};""" } String enabled = context.config.otherConfigs.get("enablePaimonTest") if (enabled == null || !enabled.equalsIgnoreCase("true")) { diff --git a/regression-test/suites/external_table_p2/refactor_catalog_param/hive_on_hms_and_dlf.groovy b/regression-test/suites/external_table_p2/refactor_catalog_param/hive_on_hms_and_dlf.groovy index f0d711c7a8cffa..5926754f88f3f2 100644 --- a/regression-test/suites/external_table_p2/refactor_catalog_param/hive_on_hms_and_dlf.groovy +++ b/regression-test/suites/external_table_p2/refactor_catalog_param/hive_on_hms_and_dlf.groovy @@ -81,6 +81,10 @@ suite("hive_on_hms_and_dlf", "p2,external,new_catalog_property") { show databases like "${db_name}"; """ assert dropResult.size() == 0 + + sql """ + DROP CATALOG IF EXISTS ${catalog_name}; + """ } /*--------test partition table insert---------*/ @@ -181,6 +185,10 @@ suite("hive_on_hms_and_dlf", "p2,external,new_catalog_property") { show databases like "${db_name}"; """ assert dropResult.size() == 0 + + sql """ + DROP CATALOG IF EXISTS ${catalog_name}; + """ } /*--------test insert overwrite---------*/ @@ -278,6 +286,10 @@ suite("hive_on_hms_and_dlf", "p2,external,new_catalog_property") { show databases like "${db_name}"; """ assert dropResult.size() == 0 + + sql """ + DROP CATALOG IF EXISTS ${catalog_name}; + """ } /*--------only execute query---------*/ @@ -311,6 +323,10 @@ suite("hive_on_hms_and_dlf", "p2,external,new_catalog_property") { SELECT count(1) FROM ${table_name}; """ assert queryResult.get(0).get(0) == data_count + + sql """ + DROP CATALOG IF EXISTS ${catalog_name}; + """ } String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") String keytab_root_dir = "/keytabs" diff --git a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_and_hive_on_glue.groovy b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_and_hive_on_glue.groovy index 6212fa8c20898c..208a5557cb2884 100644 --- a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_and_hive_on_glue.groovy +++ b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_and_hive_on_glue.groovy @@ -80,6 +80,9 @@ suite("iceberg_and_hive_on_glue", "p2,external,hive,new_catalog_property") { show databases like "${db_name}"; """ assert dropResult.size() == 0 + sql """ + DROP CATALOG IF EXISTS ${catalog_name}; + """ } def testQueryAndInsertIcerberg = { String catalogProperties, String prefix -> @@ -141,6 +144,9 @@ suite("iceberg_and_hive_on_glue", "p2,external,hive,new_catalog_property") { show databases like "${db_name}"; """ assert dropResult.size() == 0 + sql """ + DROP CATALOG IF EXISTS ${catalog_name}; + """ } /*--------test insert overwrite for hive---------*/ @@ -229,6 +235,9 @@ suite("iceberg_and_hive_on_glue", "p2,external,hive,new_catalog_property") { show databases like "${db_name}"; """ assert dropResult.size() == 0 + sql """ + DROP CATALOG IF EXISTS ${catalog_name}; + """ } /*--------test insert overwrite for iceberg---------*/ @@ -313,6 +322,10 @@ suite("iceberg_and_hive_on_glue", "p2,external,hive,new_catalog_property") { show databases like "${db_name}"; """ assert dropResult.size() == 0 + + sql """ + DROP CATALOG IF EXISTS ${catalog_name}; + """ } /*--------only execute query---------*/ @@ -346,6 +359,10 @@ suite("iceberg_and_hive_on_glue", "p2,external,hive,new_catalog_property") { SELECT count(1) FROM ${table_name}; """ assert queryResult.get(0).get(0) == data_count + + sql """ + DROP CATALOG IF EXISTS ${catalog_name}; + """ } /*--------GLUE START-----------*/ diff --git a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_on_hms_and_filesystem_and_dlf.groovy b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_on_hms_and_filesystem_and_dlf.groovy index 45b18889767cf4..2fbe9b607c597d 100644 --- a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_on_hms_and_filesystem_and_dlf.groovy +++ b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_on_hms_and_filesystem_and_dlf.groovy @@ -178,6 +178,9 @@ suite("iceberg_on_hms_and_filesystem_and_dlf", "p2,external,new_catalog_property show databases like "${db_name}"; """ assert dropResult.size() == 0 + sql """ + DROP CATALOG IF EXISTS ${catalog_name}; + """ } /*--------only execute query---------*/ @@ -211,6 +214,10 @@ suite("iceberg_on_hms_and_filesystem_and_dlf", "p2,external,new_catalog_property SELECT count(1) FROM ${table_name}; """ assert queryResult.get(0).get(0) == data_count + + sql """ + DROP CATALOG IF EXISTS ${catalog_name}; + """ } String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") From 978adb5266dddb305c915bc8577be7f4a2b94bc4 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Wed, 31 Dec 2025 19:06:33 +0800 Subject: [PATCH 02/11] [Fix](catalog)Resources should be closed when dropping a Catalog. --- .../property/metastore/AbstractIcebergPropertiesTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractIcebergPropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractIcebergPropertiesTest.java index 587fcd4d3bba83..74ff4f60954582 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractIcebergPropertiesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractIcebergPropertiesTest.java @@ -21,7 +21,6 @@ import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.catalog.Catalog; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -82,7 +81,7 @@ void testInitializeCatalogWithWarehouse() { @Test void testInitializeCatalogWithoutWarehouse() { - BaseMetastoreCatalog mockCatalog = Mockito.mock(Catalog.class); + BaseMetastoreCatalog mockCatalog = Mockito.mock(BaseMetastoreCatalog.class); Mockito.when(mockCatalog.name()).thenReturn("no-warehouse"); TestIcebergProperties properties = new TestIcebergProperties(new HashMap<>(), mockCatalog); properties.warehouse = null; @@ -119,7 +118,7 @@ protected BaseMetastoreCatalog initCatalog(String catalogName, @Test void testExecutionAuthenticatorNotNull() { - Catalog mockCatalog = Mockito.mock(Catalog.class); + BaseMetastoreCatalog mockCatalog = Mockito.mock(BaseMetastoreCatalog.class); TestIcebergProperties properties = new TestIcebergProperties(new HashMap<>(), mockCatalog); Assertions.assertNotNull(properties.executionAuthenticator); } From 4b7e889a3074b7df5ccef31082ceb1dc744dd320 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Sun, 4 Jan 2026 13:53:40 +0800 Subject: [PATCH 03/11] [Fix](catalog)Resources should be closed when dropping a Catalog. --- .../iceberg/IcebergExternalCatalog.java | 11 +++++----- .../metastore/AbstractIcebergProperties.java | 8 ++++---- .../IcebergAliyunDLFMetaStoreProperties.java | 6 +++--- .../IcebergFileSystemMetaStoreProperties.java | 6 +++--- .../IcebergGlueMetaStoreProperties.java | 6 +++--- .../IcebergHMSMetaStoreProperties.java | 6 +++--- .../metastore/IcebergRestProperties.java | 8 ++++---- .../AbstractIcebergPropertiesTest.java | 20 +++++++++---------- 8 files changed, 36 insertions(+), 35 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index db79432afab5da..13fd5ad69f3921 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -33,12 +33,10 @@ import org.apache.doris.transaction.TransactionManagerFactory; import org.apache.commons.lang3.math.NumberUtils; -import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.catalog.Catalog; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Objects; @@ -63,7 +61,7 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY_MB = 1024; public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND = 48 * 60 * 60; protected String icebergCatalogType; - protected BaseMetastoreCatalog catalog; + protected Catalog catalog; private AbstractIcebergProperties msProperties; @@ -215,8 +213,11 @@ public void onClose() { super.onClose(); if (null != catalog) { try { - catalog.close(); - } catch (IOException e) { + if (catalog instanceof AutoCloseable) { + ((AutoCloseable) catalog).close(); + } + catalog = null; + } catch (Exception e) { LOG.warn("Failed to close iceberg catalog: {}", getName(), e); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java index 715e7274f60b6b..cb2a3e1581688e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java @@ -23,8 +23,8 @@ import lombok.Getter; import org.apache.commons.lang3.StringUtils; -import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Catalog; import java.util.HashMap; import java.util.List; @@ -111,7 +111,7 @@ protected AbstractIcebergProperties(Map props) { * This field is used to perform metadata operations like creating, querying, * and deleting Iceberg tables. */ - public final BaseMetastoreCatalog initializeCatalog(String catalogName, + public final Catalog initializeCatalog(String catalogName, List storagePropertiesList) { Map catalogProps = new HashMap<>(getOrigProps()); if (StringUtils.isNotBlank(warehouse)) { @@ -121,7 +121,7 @@ public final BaseMetastoreCatalog initializeCatalog(String catalogName, // Add manifest cache properties if configured addManifestCacheProperties(catalogProps); - BaseMetastoreCatalog catalog = initCatalog(catalogName, catalogProps, storagePropertiesList); + Catalog catalog = initCatalog(catalogName, catalogProps, storagePropertiesList); if (catalog == null) { throw new IllegalStateException("Catalog must not be null after initialization."); @@ -154,7 +154,7 @@ protected void addManifestCacheProperties(Map catalogProps) { /** * Subclasses must implement this to create the concrete Catalog instance. */ - protected abstract BaseMetastoreCatalog initCatalog( + protected abstract Catalog initCatalog( String catalogName, Map catalogProps, List storagePropertiesList diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergAliyunDLFMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergAliyunDLFMetaStoreProperties.java index 980ef051f956bc..26b7149bbc7af3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergAliyunDLFMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergAliyunDLFMetaStoreProperties.java @@ -23,7 +23,7 @@ import com.aliyun.datalake.metastore.common.DataLakeConfig; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.catalog.Catalog; import java.util.List; import java.util.Map; @@ -45,8 +45,8 @@ public String getIcebergCatalogType() { } @Override - public BaseMetastoreCatalog initCatalog(String catalogName, Map catalogProps, - List storagePropertiesList) { + public Catalog initCatalog(String catalogName, Map catalogProps, + List storagePropertiesList) { DLFCatalog dlfCatalog = new DLFCatalog(); // @see com.aliyun.datalake.metastore.hive.common.utils.ConfigUtils Configuration conf = new Configuration(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java index d65c2999cf71a0..d644b7b06e0a51 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java @@ -24,7 +24,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.hadoop.HadoopCatalog; import java.util.List; @@ -42,8 +42,8 @@ public IcebergFileSystemMetaStoreProperties(Map props) { } @Override - public BaseMetastoreCatalog initCatalog(String catalogName, Map catalogProps, - List storagePropertiesList) { + public Catalog initCatalog(String catalogName, Map catalogProps, + List storagePropertiesList) { Configuration configuration = buildConfiguration(storagePropertiesList); HadoopCatalog catalog = new HadoopCatalog(); buildCatalogProps(storagePropertiesList); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergGlueMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergGlueMetaStoreProperties.java index 9d578bb0bae993..3039a96ea9d3f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergGlueMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergGlueMetaStoreProperties.java @@ -23,11 +23,11 @@ import lombok.Getter; import org.apache.commons.lang3.StringUtils; -import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.aws.glue.GlueCatalog; import org.apache.iceberg.aws.s3.S3FileIOProperties; +import org.apache.iceberg.catalog.Catalog; import java.util.List; import java.util.Map; @@ -60,8 +60,8 @@ public void initNormalizeAndCheckProps() { } @Override - public BaseMetastoreCatalog initCatalog(String catalogName, Map catalogProps, - List storagePropertiesList) { + public Catalog initCatalog(String catalogName, Map catalogProps, + List storagePropertiesList) { appendS3Props(catalogProps); appendGlueProps(catalogProps); catalogProps.put("client.region", glueProperties.glueRegion); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java index ffda556e2f8da4..dc6b4b448ae4a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java @@ -25,7 +25,7 @@ import lombok.Getter; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.hive.HiveCatalog; import java.util.HashMap; @@ -64,8 +64,8 @@ public void initNormalizeAndCheckProps() { } @Override - public BaseMetastoreCatalog initCatalog(String catalogName, Map catalogProps, - List storagePropertiesList) { + public Catalog initCatalog(String catalogName, Map catalogProps, + List storagePropertiesList) { checkInitialized(); Configuration conf = buildHiveConfiguration(storagePropertiesList); HiveCatalog hiveCatalog = new HiveCatalog(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java index 99646b8cd55080..29ffc7a9d6b879 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java @@ -27,11 +27,11 @@ import lombok.Getter; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.aws.AwsClientProperties; import org.apache.iceberg.aws.s3.S3FileIOProperties; +import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.rest.auth.OAuth2Properties; import org.apache.logging.log4j.util.Strings; @@ -174,8 +174,8 @@ public String getIcebergCatalogType() { } @Override - public BaseMetastoreCatalog initCatalog(String catalogName, Map catalogProps, - List storagePropertiesList) { + public Catalog initCatalog(String catalogName, Map catalogProps, + List storagePropertiesList) { Map fileIOProperties = Maps.newHashMap(); Configuration conf = new Configuration(); toFileIOProperties(storagePropertiesList, fileIOProperties, conf); @@ -185,7 +185,7 @@ public BaseMetastoreCatalog initCatalog(String catalogName, Map options.putAll(fileIOProperties); // 4. Build iceberg catalog - return (BaseMetastoreCatalog) CatalogUtil.buildIcebergCatalog(catalogName, options, conf); + return CatalogUtil.buildIcebergCatalog(catalogName, options, conf); } @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractIcebergPropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractIcebergPropertiesTest.java index 74ff4f60954582..228afe85b618f4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractIcebergPropertiesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractIcebergPropertiesTest.java @@ -19,8 +19,8 @@ import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Catalog; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -33,10 +33,10 @@ public class AbstractIcebergPropertiesTest { private static class TestIcebergProperties extends AbstractIcebergProperties { - private final BaseMetastoreCatalog catalogToReturn; + private final Catalog catalogToReturn; private Map capturedCatalogProps; - TestIcebergProperties(Map props, BaseMetastoreCatalog catalogToReturn) { + TestIcebergProperties(Map props, Catalog catalogToReturn) { super(props); this.catalogToReturn = catalogToReturn; } @@ -47,7 +47,7 @@ public String getIcebergCatalogType() { } @Override - protected BaseMetastoreCatalog initCatalog(String catalogName, + protected Catalog initCatalog(String catalogName, Map catalogProps, List storagePropertiesList) { // Capture the catalogProps for verification @@ -62,13 +62,13 @@ Map getCapturedCatalogProps() { @Test void testInitializeCatalogWithWarehouse() { - BaseMetastoreCatalog mockCatalog = Mockito.mock(BaseMetastoreCatalog.class); + Catalog mockCatalog = Mockito.mock(Catalog.class); Mockito.when(mockCatalog.name()).thenReturn("mocked-catalog"); Map props = new HashMap<>(); props.put("k1", "v1"); TestIcebergProperties properties = new TestIcebergProperties(props, mockCatalog); properties.warehouse = "s3://bucket/warehouse"; - BaseMetastoreCatalog result = properties.initializeCatalog("testCatalog", Collections.emptyList()); + Catalog result = properties.initializeCatalog("testCatalog", Collections.emptyList()); Assertions.assertNotNull(result); Assertions.assertEquals("mocked-catalog", result.name()); // Verify that warehouse is included in catalogProps @@ -81,11 +81,11 @@ void testInitializeCatalogWithWarehouse() { @Test void testInitializeCatalogWithoutWarehouse() { - BaseMetastoreCatalog mockCatalog = Mockito.mock(BaseMetastoreCatalog.class); + Catalog mockCatalog = Mockito.mock(Catalog.class); Mockito.when(mockCatalog.name()).thenReturn("no-warehouse"); TestIcebergProperties properties = new TestIcebergProperties(new HashMap<>(), mockCatalog); properties.warehouse = null; - BaseMetastoreCatalog result = properties.initializeCatalog("testCatalog", Collections.emptyList()); + Catalog result = properties.initializeCatalog("testCatalog", Collections.emptyList()); Assertions.assertNotNull(result); Assertions.assertEquals("no-warehouse", result.name()); // Verify that warehouse key is not present @@ -102,7 +102,7 @@ public String getIcebergCatalogType() { } @Override - protected BaseMetastoreCatalog initCatalog(String catalogName, + protected Catalog initCatalog(String catalogName, Map catalogProps, List storagePropertiesList) { return null; // Simulate a failure case @@ -118,7 +118,7 @@ protected BaseMetastoreCatalog initCatalog(String catalogName, @Test void testExecutionAuthenticatorNotNull() { - BaseMetastoreCatalog mockCatalog = Mockito.mock(BaseMetastoreCatalog.class); + Catalog mockCatalog = Mockito.mock(Catalog.class); TestIcebergProperties properties = new TestIcebergProperties(new HashMap<>(), mockCatalog); Assertions.assertNotNull(properties.executionAuthenticator); } From fd76081772a3ae0f38c5f135f5eca76b84853701 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Sun, 4 Jan 2026 13:56:11 +0800 Subject: [PATCH 04/11] [Fix](catalog)Resources should be closed when dropping a Catalog. --- .../property/metastore/IcebergRestProperties.java | 2 +- .../metastore/IcebergS3TablesMetaStoreProperties.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java index 29ffc7a9d6b879..688a268522b2cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java @@ -175,7 +175,7 @@ public String getIcebergCatalogType() { @Override public Catalog initCatalog(String catalogName, Map catalogProps, - List storagePropertiesList) { + List storagePropertiesList) { Map fileIOProperties = Maps.newHashMap(); Configuration conf = new Configuration(); toFileIOProperties(storagePropertiesList, fileIOProperties, conf); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergS3TablesMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergS3TablesMetaStoreProperties.java index a68814b6391a87..0b101623c2b10c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergS3TablesMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergS3TablesMetaStoreProperties.java @@ -23,7 +23,7 @@ import org.apache.doris.datasource.property.storage.StorageProperties; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.catalog.Catalog; import software.amazon.s3tables.iceberg.S3TablesCatalog; import java.util.List; @@ -49,8 +49,8 @@ public void initNormalizeAndCheckProps() { } @Override - public BaseMetastoreCatalog initCatalog(String catalogName, Map catalogProps, - List storagePropertiesList) { + public Catalog initCatalog(String catalogName, Map catalogProps, + List storagePropertiesList) { checkInitialized(); buildS3CatalogProperties(catalogProps); From 3c1ddaa25de454ad6e3eeef5569395d71d3470f4 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Mon, 5 Jan 2026 15:39:41 +0800 Subject: [PATCH 05/11] [Fix](catalog)Resources should be closed when dropping a Catalog. --- fe/check/checkstyle/suppressions.xml | 1 + fe/fe-core/pom.xml | 8 + .../hadoop/fs/s3a/DefaultS3ClientFactory.java | 445 ++++++++++++++++++ 3 files changed, 454 insertions(+) create mode 100644 fe/fe-core/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java diff --git a/fe/check/checkstyle/suppressions.xml b/fe/check/checkstyle/suppressions.xml index 7340c4c5bd5fe9..a6ac87dca44dae 100644 --- a/fe/check/checkstyle/suppressions.xml +++ b/fe/check/checkstyle/suppressions.xml @@ -74,4 +74,5 @@ under the License. + diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 287285edd27ca8..9e3dfd6006feb4 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -345,6 +345,14 @@ under the License. software.amazon.awssdk s3 + + software.amazon.awssdk + apache-client + + + software.amazon.awssdk + netty-nio-client + com.tencentcloudapi diff --git a/fe/fe-core/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/fe/fe-core/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java new file mode 100644 index 00000000000000..e164b0a3ad7074 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.s3a.impl.AWSClientConfig; +import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector; +import org.apache.hadoop.fs.store.LogExactlyOnce; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.awscore.util.AwsHostNameUtils; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; +import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.auth.spi.scheme.AuthScheme; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity; +import software.amazon.awssdk.metrics.LoggingMetricPublisher; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; +import software.amazon.awssdk.services.s3.S3BaseClientBuilder; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.services.s3.multipart.MultipartConfiguration; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.utils.ThreadFactoryBuilder; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_DEFAULT_REGION; +import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_S3; +import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SECURE_CONNECTIONS; +import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_CLASS_NAME; +import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED_DEFAULT; +import static org.apache.hadoop.fs.s3a.Constants.SECURE_CONNECTIONS; +import static org.apache.hadoop.fs.s3a.auth.SignerFactory.createHttpSigner; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.REQUESTER_PAYS_HEADER; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AUTH_SCHEME_AWS_SIGV_4; +import static org.apache.hadoop.util.Preconditions.checkArgument; + + +/** + * The default {@link S3ClientFactory} implementation. + * This calls the AWS SDK to configure and create an + * {@code AmazonS3Client} that communicates with the S3 service. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class DefaultS3ClientFactory extends Configured + implements S3ClientFactory { + + private static final String REQUESTER_PAYS_HEADER_VALUE = "requester"; + + private static final String S3_SERVICE_NAME = "s3"; + + private static final Pattern VPC_ENDPOINT_PATTERN = + Pattern.compile("^(?:.+\\.)?([a-z0-9-]+)\\.vpce\\.amazonaws\\.(?:com|com\\.cn)$"); + + /** + * Subclasses refer to this. + */ + protected static final Logger LOG = + LoggerFactory.getLogger(DefaultS3ClientFactory.class); + + /** + * A one-off warning of default region chains in use. + */ + private static final LogExactlyOnce WARN_OF_DEFAULT_REGION_CHAIN = + new LogExactlyOnce(LOG); + + /** + * Warning message printed when the SDK Region chain is in use. + */ + private static final String SDK_REGION_CHAIN_IN_USE = + "S3A filesystem client is using" + + " the SDK region resolution chain."; + + + /** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */ + private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce(LOG); + + /** + * Error message when an endpoint is set with FIPS enabled: {@value}. + */ + @VisibleForTesting + public static final String ERROR_ENDPOINT_WITH_FIPS = + "Non central endpoint cannot be set when " + FIPS_ENDPOINT + " is true"; + + @Override + public S3Client createS3Client( + final URI uri, + final S3ClientCreationParameters parameters) throws IOException { + + Configuration conf = getConf(); + String bucket = uri.getHost(); + + ApacheHttpClient.Builder httpClientBuilder = AWSClientConfig + .createHttpClientBuilder(conf) + .proxyConfiguration(AWSClientConfig.createProxyConfiguration(conf, bucket)); + return configureClientBuilder(S3Client.builder(), parameters, conf, bucket) + .httpClientBuilder(httpClientBuilder) + .build(); + } + + @Override + public S3AsyncClient createS3AsyncClient( + final URI uri, + final S3ClientCreationParameters parameters) throws IOException { + + Configuration conf = getConf(); + String bucket = uri.getHost(); + + NettyNioAsyncHttpClient.Builder httpClientBuilder = AWSClientConfig + .createAsyncHttpClientBuilder(conf) + .proxyConfiguration(AWSClientConfig.createAsyncProxyConfiguration(conf, bucket)); + + MultipartConfiguration multipartConfiguration = MultipartConfiguration.builder() + .minimumPartSizeInBytes(parameters.getMinimumPartSize()) + .thresholdInBytes(parameters.getMultiPartThreshold()) + .build(); + + S3AsyncClientBuilder s3AsyncClientBuilder = + configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket) + .httpClientBuilder(httpClientBuilder); + + // multipart upload pending with HADOOP-19326. + if (!parameters.isClientSideEncryptionEnabled() && + !parameters.isAnalyticsAcceleratorEnabled()) { + s3AsyncClientBuilder.multipartConfiguration(multipartConfiguration) + .multipartEnabled(parameters.isMultipartCopy()); + } + + return s3AsyncClientBuilder.build(); + } + + @Override + public S3TransferManager createS3TransferManager(final S3AsyncClient s3AsyncClient) { + return S3TransferManager.builder() + .s3Client(s3AsyncClient) + .build(); + } + + /** + * Configure a sync or async S3 client builder. + * This method handles all shared configuration, including + * path style access, credentials and whether or not to use S3Express + * CreateSession. + * @param builder S3 client builder + * @param parameters parameter object + * @param conf configuration object + * @param bucket bucket name + * @return the builder object + * @param S3 client builder type + * @param S3 client type + */ + private , ClientT> BuilderT configureClientBuilder( + BuilderT builder, S3ClientCreationParameters parameters, Configuration conf, String bucket) + throws IOException { + + configureEndpointAndRegion(builder, parameters, conf); + + S3Configuration serviceConfiguration = S3Configuration.builder() + .pathStyleAccessEnabled(parameters.isPathStyleAccess()) + .checksumValidationEnabled(parameters.isChecksumValidationEnabled()) + .build(); + + final ClientOverrideConfiguration.Builder override = + createClientOverrideConfiguration(parameters, conf); + ScheduledExecutorService awsSdkScheduler=Executors.newScheduledThreadPool(20, new ThreadFactoryBuilder().threadNamePrefix("aws-sdk-scheduler-%d").build()); + S3BaseClientBuilder s3BaseClientBuilder = builder + .overrideConfiguration(override.build()) + .credentialsProvider(parameters.getCredentialSet()) + .disableS3ExpressSessionAuth(!parameters.isExpressCreateSession()) + .serviceConfiguration(serviceConfiguration); + s3BaseClientBuilder.overrideConfiguration(ClientOverrideConfiguration.builder().scheduledExecutorService(awsSdkScheduler).build()); + if (LOG.isTraceEnabled()) { + // if this log is set to "trace" then we turn on logging of SDK metrics. + // The metrics itself will log at info; it is just that reflection work + // would be needed to change that setting safely for shaded and unshaded aws artifacts. + s3BaseClientBuilder.overrideConfiguration(o -> + o.addMetricPublisher(LoggingMetricPublisher.create())); + } + + if (conf.getBoolean(HTTP_SIGNER_ENABLED, HTTP_SIGNER_ENABLED_DEFAULT)) { + // use an http signer through an AuthScheme + final AuthScheme signer = + createHttpSigner(conf, AUTH_SCHEME_AWS_SIGV_4, HTTP_SIGNER_CLASS_NAME); + builder.putAuthScheme(signer); + } + return (BuilderT) s3BaseClientBuilder; + } + + /** + * Create an override configuration for an S3 client. + * @param parameters parameter object + * @param conf configuration object + * @throws IOException any IOE raised, or translated exception + * @throws RuntimeException some failures creating an http signer + * @return the override configuration + * @throws IOException any IOE raised, or translated exception + */ + protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration( + S3ClientCreationParameters parameters, Configuration conf) throws IOException { + final ClientOverrideConfiguration.Builder clientOverrideConfigBuilder = + AWSClientConfig.createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3); + + // add any headers + parameters.getHeaders().forEach((h, v) -> clientOverrideConfigBuilder.putHeader(h, v)); + + if (parameters.isRequesterPays()) { + // All calls must acknowledge requester will pay via header. + clientOverrideConfigBuilder.putHeader(REQUESTER_PAYS_HEADER, REQUESTER_PAYS_HEADER_VALUE); + } + + if (!StringUtils.isEmpty(parameters.getUserAgentSuffix())) { + clientOverrideConfigBuilder.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, + parameters.getUserAgentSuffix()); + } + + if (parameters.getExecutionInterceptors() != null) { + for (ExecutionInterceptor interceptor : parameters.getExecutionInterceptors()) { + clientOverrideConfigBuilder.addExecutionInterceptor(interceptor); + } + } + + if (parameters.getMetrics() != null) { + clientOverrideConfigBuilder.addMetricPublisher( + new AwsStatisticsCollector(parameters.getMetrics())); + } + + final RetryPolicy.Builder retryPolicyBuilder = AWSClientConfig.createRetryPolicyBuilder(conf); + clientOverrideConfigBuilder.retryPolicy(retryPolicyBuilder.build()); + + return clientOverrideConfigBuilder; + } + + /** + * This method configures the endpoint and region for a S3 client. + * The order of configuration is: + * + *
    + *
  1. If region is configured via fs.s3a.endpoint.region, use it.
  2. + *
  3. If endpoint is configured via via fs.s3a.endpoint, set it. + * If no region is configured, try to parse region from endpoint.
  4. + *
  5. If no region is configured, and it could not be parsed from the endpoint, + * set the default region as US_EAST_2
  6. + *
  7. If configured region is empty, fallback to SDK resolution chain.
  8. + *
  9. S3 cross region is enabled by default irrespective of region or endpoint + * is set or not.
  10. + *
+ * + * @param builder S3 client builder. + * @param parameters parameter object + * @param conf conf configuration object + * @param S3 client builder type + * @param S3 client type + * @throws IllegalArgumentException if endpoint is set when FIPS is enabled. + */ + private , ClientT> void configureEndpointAndRegion( + BuilderT builder, S3ClientCreationParameters parameters, Configuration conf) { + final String endpointStr = parameters.getEndpoint(); + final URI endpoint = getS3Endpoint(endpointStr, conf); + + final String configuredRegion = parameters.getRegion(); + Region region = null; + String origin = ""; + + // If the region was configured, set it. + if (configuredRegion != null && !configuredRegion.isEmpty()) { + origin = AWS_REGION; + region = Region.of(configuredRegion); + } + + // FIPs? Log it, then reject any attempt to set an endpoint + final boolean fipsEnabled = parameters.isFipsEnabled(); + if (fipsEnabled) { + LOG.debug("Enabling FIPS mode"); + } + // always setting it guarantees the value is non-null, + // which tests expect. + builder.fipsEnabled(fipsEnabled); + + if (endpoint != null) { + boolean endpointEndsWithCentral = + endpointStr.endsWith(CENTRAL_ENDPOINT); + checkArgument(!fipsEnabled || endpointEndsWithCentral, "%s : %s", + ERROR_ENDPOINT_WITH_FIPS, + endpoint); + + // No region was configured, + // determine the region from the endpoint. + if (region == null) { + region = getS3RegionFromEndpoint(endpointStr, + endpointEndsWithCentral); + if (region != null) { + origin = "endpoint"; + } + } + + // No need to override endpoint with "s3.amazonaws.com". + // Let the client take care of endpoint resolution. Overriding + // the endpoint with "s3.amazonaws.com" causes 400 Bad Request + // errors for non-existent buckets and objects. + // ref: https://github.com/aws/aws-sdk-java-v2/issues/4846 + if (!endpointEndsWithCentral) { + builder.endpointOverride(endpoint); + LOG.debug("Setting endpoint to {}", endpoint); + } else { + origin = "central endpoint with cross region access"; + LOG.debug("Enabling cross region access for endpoint {}", + endpointStr); + } + } + + if (region != null) { + builder.region(region); + } else if (configuredRegion == null) { + // no region is configured, and none could be determined from the endpoint. + // Use US_EAST_2 as default. + region = Region.of(AWS_S3_DEFAULT_REGION); + builder.region(region); + origin = "cross region access fallback"; + } else if (configuredRegion.isEmpty()) { + // region configuration was set to empty string. + // allow this if people really want it; it is OK to rely on this + // when deployed in EC2. + WARN_OF_DEFAULT_REGION_CHAIN.warn(SDK_REGION_CHAIN_IN_USE); + LOG.debug(SDK_REGION_CHAIN_IN_USE); + origin = "SDK region chain"; + } + boolean isCrossRegionAccessEnabled = conf.getBoolean(AWS_S3_CROSS_REGION_ACCESS_ENABLED, + AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT); + // s3 cross region access + if (isCrossRegionAccessEnabled) { + builder.crossRegionAccessEnabled(true); + } + LOG.debug("Setting region to {} from {} with cross region access {}", + region, origin, isCrossRegionAccessEnabled); + } + + /** + * Given a endpoint string, create the endpoint URI. + * + * @param endpoint possibly null endpoint. + * @param conf config to build the URI from. + * @return an endpoint uri + */ + protected static URI getS3Endpoint(String endpoint, final Configuration conf) { + + boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS); + + String protocol = secureConnections ? "https" : "http"; + + if (endpoint == null || endpoint.isEmpty()) { + // don't set an endpoint if none is configured, instead let the SDK figure it out. + return null; + } + + if (!endpoint.contains("://")) { + endpoint = String.format("%s://%s", protocol, endpoint); + } + + try { + return new URI(endpoint); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } + } + + /** + * Parses the endpoint to get the region. + * If endpoint is the central one, use US_EAST_2. + * + * @param endpoint the configure endpoint. + * @param endpointEndsWithCentral true if the endpoint is configured as central. + * @return the S3 region, null if unable to resolve from endpoint. + */ + @VisibleForTesting + static Region getS3RegionFromEndpoint(final String endpoint, + final boolean endpointEndsWithCentral) { + + if (!endpointEndsWithCentral) { + // S3 VPC endpoint parsing + Matcher matcher = VPC_ENDPOINT_PATTERN.matcher(endpoint); + if (matcher.find()) { + LOG.debug("Mapping to VPCE"); + LOG.debug("Endpoint {} is vpc endpoint; parsing region as {}", endpoint, matcher.group(1)); + return Region.of(matcher.group(1)); + } + + LOG.debug("Endpoint {} is not the default; parsing", endpoint); + return AwsHostNameUtils.parseSigningRegion(endpoint, S3_SERVICE_NAME).orElse(null); + } + + // Select default region here to enable cross-region access. + // If both "fs.s3a.endpoint" and "fs.s3a.endpoint.region" are empty, + // Spark sets "fs.s3a.endpoint" to "s3.amazonaws.com". + // This applies to Spark versions with the changes of SPARK-35878. + // ref: + // https://github.com/apache/spark/blob/v3.5.0/core/ + // src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L528 + // If we do not allow cross region access, Spark would not be able to + // access any bucket that is not present in the given region. + // Hence, we should use default region us-east-2 to allow cross-region + // access. + return Region.of(AWS_S3_DEFAULT_REGION); + } + +} From 13e9d257ebef08bd7f899d19fc2216b5b939098e Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Mon, 5 Jan 2026 17:37:17 +0800 Subject: [PATCH 06/11] [Fix](catalog)Resources should be closed when dropping a Catalog. --- fe/check/checkstyle/suppressions.xml | 2 +- .../hadoop/fs/s3a/DefaultS3ClientFactory.java | 445 ----------- .../builder/SdkDefaultClientBuilder.java | 702 ++++++++++++++++++ 3 files changed, 703 insertions(+), 446 deletions(-) delete mode 100644 fe/fe-core/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java create mode 100644 fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java diff --git a/fe/check/checkstyle/suppressions.xml b/fe/check/checkstyle/suppressions.xml index a6ac87dca44dae..8e1c40e31620fe 100644 --- a/fe/check/checkstyle/suppressions.xml +++ b/fe/check/checkstyle/suppressions.xml @@ -74,5 +74,5 @@ under the License. - + diff --git a/fe/fe-core/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/fe/fe-core/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java deleted file mode 100644 index e164b0a3ad7074..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java +++ /dev/null @@ -1,445 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.classification.VisibleForTesting; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.s3a.impl.AWSClientConfig; -import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector; -import org.apache.hadoop.fs.store.LogExactlyOnce; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.awscore.util.AwsHostNameUtils; -import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; -import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; -import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; -import software.amazon.awssdk.core.retry.RetryPolicy; -import software.amazon.awssdk.http.apache.ApacheHttpClient; -import software.amazon.awssdk.http.auth.spi.scheme.AuthScheme; -import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; -import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity; -import software.amazon.awssdk.metrics.LoggingMetricPublisher; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; -import software.amazon.awssdk.services.s3.S3BaseClientBuilder; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.S3Configuration; -import software.amazon.awssdk.services.s3.multipart.MultipartConfiguration; -import software.amazon.awssdk.transfer.s3.S3TransferManager; -import software.amazon.awssdk.utils.ThreadFactoryBuilder; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION; -import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED; -import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT; -import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_DEFAULT_REGION; -import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_S3; -import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT; -import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SECURE_CONNECTIONS; -import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT; -import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_CLASS_NAME; -import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED; -import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED_DEFAULT; -import static org.apache.hadoop.fs.s3a.Constants.SECURE_CONNECTIONS; -import static org.apache.hadoop.fs.s3a.auth.SignerFactory.createHttpSigner; -import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.REQUESTER_PAYS_HEADER; -import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AUTH_SCHEME_AWS_SIGV_4; -import static org.apache.hadoop.util.Preconditions.checkArgument; - - -/** - * The default {@link S3ClientFactory} implementation. - * This calls the AWS SDK to configure and create an - * {@code AmazonS3Client} that communicates with the S3 service. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class DefaultS3ClientFactory extends Configured - implements S3ClientFactory { - - private static final String REQUESTER_PAYS_HEADER_VALUE = "requester"; - - private static final String S3_SERVICE_NAME = "s3"; - - private static final Pattern VPC_ENDPOINT_PATTERN = - Pattern.compile("^(?:.+\\.)?([a-z0-9-]+)\\.vpce\\.amazonaws\\.(?:com|com\\.cn)$"); - - /** - * Subclasses refer to this. - */ - protected static final Logger LOG = - LoggerFactory.getLogger(DefaultS3ClientFactory.class); - - /** - * A one-off warning of default region chains in use. - */ - private static final LogExactlyOnce WARN_OF_DEFAULT_REGION_CHAIN = - new LogExactlyOnce(LOG); - - /** - * Warning message printed when the SDK Region chain is in use. - */ - private static final String SDK_REGION_CHAIN_IN_USE = - "S3A filesystem client is using" - + " the SDK region resolution chain."; - - - /** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */ - private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce(LOG); - - /** - * Error message when an endpoint is set with FIPS enabled: {@value}. - */ - @VisibleForTesting - public static final String ERROR_ENDPOINT_WITH_FIPS = - "Non central endpoint cannot be set when " + FIPS_ENDPOINT + " is true"; - - @Override - public S3Client createS3Client( - final URI uri, - final S3ClientCreationParameters parameters) throws IOException { - - Configuration conf = getConf(); - String bucket = uri.getHost(); - - ApacheHttpClient.Builder httpClientBuilder = AWSClientConfig - .createHttpClientBuilder(conf) - .proxyConfiguration(AWSClientConfig.createProxyConfiguration(conf, bucket)); - return configureClientBuilder(S3Client.builder(), parameters, conf, bucket) - .httpClientBuilder(httpClientBuilder) - .build(); - } - - @Override - public S3AsyncClient createS3AsyncClient( - final URI uri, - final S3ClientCreationParameters parameters) throws IOException { - - Configuration conf = getConf(); - String bucket = uri.getHost(); - - NettyNioAsyncHttpClient.Builder httpClientBuilder = AWSClientConfig - .createAsyncHttpClientBuilder(conf) - .proxyConfiguration(AWSClientConfig.createAsyncProxyConfiguration(conf, bucket)); - - MultipartConfiguration multipartConfiguration = MultipartConfiguration.builder() - .minimumPartSizeInBytes(parameters.getMinimumPartSize()) - .thresholdInBytes(parameters.getMultiPartThreshold()) - .build(); - - S3AsyncClientBuilder s3AsyncClientBuilder = - configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket) - .httpClientBuilder(httpClientBuilder); - - // multipart upload pending with HADOOP-19326. - if (!parameters.isClientSideEncryptionEnabled() && - !parameters.isAnalyticsAcceleratorEnabled()) { - s3AsyncClientBuilder.multipartConfiguration(multipartConfiguration) - .multipartEnabled(parameters.isMultipartCopy()); - } - - return s3AsyncClientBuilder.build(); - } - - @Override - public S3TransferManager createS3TransferManager(final S3AsyncClient s3AsyncClient) { - return S3TransferManager.builder() - .s3Client(s3AsyncClient) - .build(); - } - - /** - * Configure a sync or async S3 client builder. - * This method handles all shared configuration, including - * path style access, credentials and whether or not to use S3Express - * CreateSession. - * @param builder S3 client builder - * @param parameters parameter object - * @param conf configuration object - * @param bucket bucket name - * @return the builder object - * @param S3 client builder type - * @param S3 client type - */ - private , ClientT> BuilderT configureClientBuilder( - BuilderT builder, S3ClientCreationParameters parameters, Configuration conf, String bucket) - throws IOException { - - configureEndpointAndRegion(builder, parameters, conf); - - S3Configuration serviceConfiguration = S3Configuration.builder() - .pathStyleAccessEnabled(parameters.isPathStyleAccess()) - .checksumValidationEnabled(parameters.isChecksumValidationEnabled()) - .build(); - - final ClientOverrideConfiguration.Builder override = - createClientOverrideConfiguration(parameters, conf); - ScheduledExecutorService awsSdkScheduler=Executors.newScheduledThreadPool(20, new ThreadFactoryBuilder().threadNamePrefix("aws-sdk-scheduler-%d").build()); - S3BaseClientBuilder s3BaseClientBuilder = builder - .overrideConfiguration(override.build()) - .credentialsProvider(parameters.getCredentialSet()) - .disableS3ExpressSessionAuth(!parameters.isExpressCreateSession()) - .serviceConfiguration(serviceConfiguration); - s3BaseClientBuilder.overrideConfiguration(ClientOverrideConfiguration.builder().scheduledExecutorService(awsSdkScheduler).build()); - if (LOG.isTraceEnabled()) { - // if this log is set to "trace" then we turn on logging of SDK metrics. - // The metrics itself will log at info; it is just that reflection work - // would be needed to change that setting safely for shaded and unshaded aws artifacts. - s3BaseClientBuilder.overrideConfiguration(o -> - o.addMetricPublisher(LoggingMetricPublisher.create())); - } - - if (conf.getBoolean(HTTP_SIGNER_ENABLED, HTTP_SIGNER_ENABLED_DEFAULT)) { - // use an http signer through an AuthScheme - final AuthScheme signer = - createHttpSigner(conf, AUTH_SCHEME_AWS_SIGV_4, HTTP_SIGNER_CLASS_NAME); - builder.putAuthScheme(signer); - } - return (BuilderT) s3BaseClientBuilder; - } - - /** - * Create an override configuration for an S3 client. - * @param parameters parameter object - * @param conf configuration object - * @throws IOException any IOE raised, or translated exception - * @throws RuntimeException some failures creating an http signer - * @return the override configuration - * @throws IOException any IOE raised, or translated exception - */ - protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration( - S3ClientCreationParameters parameters, Configuration conf) throws IOException { - final ClientOverrideConfiguration.Builder clientOverrideConfigBuilder = - AWSClientConfig.createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3); - - // add any headers - parameters.getHeaders().forEach((h, v) -> clientOverrideConfigBuilder.putHeader(h, v)); - - if (parameters.isRequesterPays()) { - // All calls must acknowledge requester will pay via header. - clientOverrideConfigBuilder.putHeader(REQUESTER_PAYS_HEADER, REQUESTER_PAYS_HEADER_VALUE); - } - - if (!StringUtils.isEmpty(parameters.getUserAgentSuffix())) { - clientOverrideConfigBuilder.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, - parameters.getUserAgentSuffix()); - } - - if (parameters.getExecutionInterceptors() != null) { - for (ExecutionInterceptor interceptor : parameters.getExecutionInterceptors()) { - clientOverrideConfigBuilder.addExecutionInterceptor(interceptor); - } - } - - if (parameters.getMetrics() != null) { - clientOverrideConfigBuilder.addMetricPublisher( - new AwsStatisticsCollector(parameters.getMetrics())); - } - - final RetryPolicy.Builder retryPolicyBuilder = AWSClientConfig.createRetryPolicyBuilder(conf); - clientOverrideConfigBuilder.retryPolicy(retryPolicyBuilder.build()); - - return clientOverrideConfigBuilder; - } - - /** - * This method configures the endpoint and region for a S3 client. - * The order of configuration is: - * - *
    - *
  1. If region is configured via fs.s3a.endpoint.region, use it.
  2. - *
  3. If endpoint is configured via via fs.s3a.endpoint, set it. - * If no region is configured, try to parse region from endpoint.
  4. - *
  5. If no region is configured, and it could not be parsed from the endpoint, - * set the default region as US_EAST_2
  6. - *
  7. If configured region is empty, fallback to SDK resolution chain.
  8. - *
  9. S3 cross region is enabled by default irrespective of region or endpoint - * is set or not.
  10. - *
- * - * @param builder S3 client builder. - * @param parameters parameter object - * @param conf conf configuration object - * @param S3 client builder type - * @param S3 client type - * @throws IllegalArgumentException if endpoint is set when FIPS is enabled. - */ - private , ClientT> void configureEndpointAndRegion( - BuilderT builder, S3ClientCreationParameters parameters, Configuration conf) { - final String endpointStr = parameters.getEndpoint(); - final URI endpoint = getS3Endpoint(endpointStr, conf); - - final String configuredRegion = parameters.getRegion(); - Region region = null; - String origin = ""; - - // If the region was configured, set it. - if (configuredRegion != null && !configuredRegion.isEmpty()) { - origin = AWS_REGION; - region = Region.of(configuredRegion); - } - - // FIPs? Log it, then reject any attempt to set an endpoint - final boolean fipsEnabled = parameters.isFipsEnabled(); - if (fipsEnabled) { - LOG.debug("Enabling FIPS mode"); - } - // always setting it guarantees the value is non-null, - // which tests expect. - builder.fipsEnabled(fipsEnabled); - - if (endpoint != null) { - boolean endpointEndsWithCentral = - endpointStr.endsWith(CENTRAL_ENDPOINT); - checkArgument(!fipsEnabled || endpointEndsWithCentral, "%s : %s", - ERROR_ENDPOINT_WITH_FIPS, - endpoint); - - // No region was configured, - // determine the region from the endpoint. - if (region == null) { - region = getS3RegionFromEndpoint(endpointStr, - endpointEndsWithCentral); - if (region != null) { - origin = "endpoint"; - } - } - - // No need to override endpoint with "s3.amazonaws.com". - // Let the client take care of endpoint resolution. Overriding - // the endpoint with "s3.amazonaws.com" causes 400 Bad Request - // errors for non-existent buckets and objects. - // ref: https://github.com/aws/aws-sdk-java-v2/issues/4846 - if (!endpointEndsWithCentral) { - builder.endpointOverride(endpoint); - LOG.debug("Setting endpoint to {}", endpoint); - } else { - origin = "central endpoint with cross region access"; - LOG.debug("Enabling cross region access for endpoint {}", - endpointStr); - } - } - - if (region != null) { - builder.region(region); - } else if (configuredRegion == null) { - // no region is configured, and none could be determined from the endpoint. - // Use US_EAST_2 as default. - region = Region.of(AWS_S3_DEFAULT_REGION); - builder.region(region); - origin = "cross region access fallback"; - } else if (configuredRegion.isEmpty()) { - // region configuration was set to empty string. - // allow this if people really want it; it is OK to rely on this - // when deployed in EC2. - WARN_OF_DEFAULT_REGION_CHAIN.warn(SDK_REGION_CHAIN_IN_USE); - LOG.debug(SDK_REGION_CHAIN_IN_USE); - origin = "SDK region chain"; - } - boolean isCrossRegionAccessEnabled = conf.getBoolean(AWS_S3_CROSS_REGION_ACCESS_ENABLED, - AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT); - // s3 cross region access - if (isCrossRegionAccessEnabled) { - builder.crossRegionAccessEnabled(true); - } - LOG.debug("Setting region to {} from {} with cross region access {}", - region, origin, isCrossRegionAccessEnabled); - } - - /** - * Given a endpoint string, create the endpoint URI. - * - * @param endpoint possibly null endpoint. - * @param conf config to build the URI from. - * @return an endpoint uri - */ - protected static URI getS3Endpoint(String endpoint, final Configuration conf) { - - boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS); - - String protocol = secureConnections ? "https" : "http"; - - if (endpoint == null || endpoint.isEmpty()) { - // don't set an endpoint if none is configured, instead let the SDK figure it out. - return null; - } - - if (!endpoint.contains("://")) { - endpoint = String.format("%s://%s", protocol, endpoint); - } - - try { - return new URI(endpoint); - } catch (URISyntaxException e) { - throw new IllegalArgumentException(e); - } - } - - /** - * Parses the endpoint to get the region. - * If endpoint is the central one, use US_EAST_2. - * - * @param endpoint the configure endpoint. - * @param endpointEndsWithCentral true if the endpoint is configured as central. - * @return the S3 region, null if unable to resolve from endpoint. - */ - @VisibleForTesting - static Region getS3RegionFromEndpoint(final String endpoint, - final boolean endpointEndsWithCentral) { - - if (!endpointEndsWithCentral) { - // S3 VPC endpoint parsing - Matcher matcher = VPC_ENDPOINT_PATTERN.matcher(endpoint); - if (matcher.find()) { - LOG.debug("Mapping to VPCE"); - LOG.debug("Endpoint {} is vpc endpoint; parsing region as {}", endpoint, matcher.group(1)); - return Region.of(matcher.group(1)); - } - - LOG.debug("Endpoint {} is not the default; parsing", endpoint); - return AwsHostNameUtils.parseSigningRegion(endpoint, S3_SERVICE_NAME).orElse(null); - } - - // Select default region here to enable cross-region access. - // If both "fs.s3a.endpoint" and "fs.s3a.endpoint.region" are empty, - // Spark sets "fs.s3a.endpoint" to "s3.amazonaws.com". - // This applies to Spark versions with the changes of SPARK-35878. - // ref: - // https://github.com/apache/spark/blob/v3.5.0/core/ - // src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L528 - // If we do not allow cross region access, Spark would not be able to - // access any bucket that is not present in the given region. - // Hence, we should use default region us-east-2 to allow cross-region - // access. - return Region.of(AWS_S3_DEFAULT_REGION); - } - -} diff --git a/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java b/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java new file mode 100644 index 00000000000000..086fef1c560c68 --- /dev/null +++ b/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java @@ -0,0 +1,702 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.client.builder; + +import software.amazon.awssdk.annotations.SdkPreviewApi; +import software.amazon.awssdk.annotations.SdkProtectedApi; +import software.amazon.awssdk.annotations.SdkTestInternalApi; +import software.amazon.awssdk.core.ClientEndpointProvider; +import software.amazon.awssdk.core.ClientType; +import software.amazon.awssdk.core.CompressionConfiguration; +import software.amazon.awssdk.core.SdkPlugin; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.client.config.SdkClientConfiguration; +import software.amazon.awssdk.core.client.config.SdkClientOption; +import software.amazon.awssdk.core.interceptor.ClasspathInterceptorChainFactory; +import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; +import software.amazon.awssdk.core.internal.http.loader.DefaultSdkAsyncHttpClientBuilder; +import software.amazon.awssdk.core.internal.http.loader.DefaultSdkHttpClientBuilder; +import software.amazon.awssdk.core.internal.http.pipeline.stages.CompressRequestStage; +import software.amazon.awssdk.core.internal.interceptor.HttpChecksumValidationInterceptor; +import software.amazon.awssdk.core.internal.retry.SdkDefaultRetryStrategy; +import software.amazon.awssdk.core.internal.useragent.AppIdResolver; +import software.amazon.awssdk.core.internal.useragent.SdkClientUserAgentProperties; +import software.amazon.awssdk.core.internal.useragent.SdkUserAgentBuilder; +import software.amazon.awssdk.core.retry.RetryMode; +import software.amazon.awssdk.core.util.SystemUserAgent; +import software.amazon.awssdk.http.ExecutableHttpRequest; +import software.amazon.awssdk.http.HttpExecuteRequest; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.async.AsyncExecuteRequest; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.identity.spi.IdentityProviders; +import software.amazon.awssdk.metrics.MetricPublisher; +import software.amazon.awssdk.profiles.ProfileFile; +import software.amazon.awssdk.profiles.ProfileFileSystemSetting; +import software.amazon.awssdk.profiles.ProfileProperty; +import software.amazon.awssdk.retries.api.RetryStrategy; +import software.amazon.awssdk.utils.AttributeMap; +import software.amazon.awssdk.utils.AttributeMap.LazyValueSource; +import software.amazon.awssdk.utils.Either; +import software.amazon.awssdk.utils.Lazy; +import software.amazon.awssdk.utils.OptionalUtils; +import software.amazon.awssdk.utils.StringUtils; +import software.amazon.awssdk.utils.ThreadFactoryBuilder; +import software.amazon.awssdk.utils.Validate; +import software.amazon.awssdk.utils.http.SdkHttpUtils; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; + +import static software.amazon.awssdk.core.ClientType.ASYNC; +import static software.amazon.awssdk.core.ClientType.SYNC; +import static software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR; +import static software.amazon.awssdk.core.client.config.SdkAdvancedClientOption.USER_AGENT_PREFIX; +import static software.amazon.awssdk.core.client.config.SdkAdvancedClientOption.USER_AGENT_SUFFIX; +import static software.amazon.awssdk.core.client.config.SdkClientOption.ADDITIONAL_HTTP_HEADERS; +import static software.amazon.awssdk.core.client.config.SdkClientOption.ASYNC_HTTP_CLIENT; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CLIENT_TYPE; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CLIENT_USER_AGENT; +import static software.amazon.awssdk.core.client.config.SdkClientOption.COMPRESSION_CONFIGURATION; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_ASYNC_HTTP_CLIENT; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_ASYNC_HTTP_CLIENT_BUILDER; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_COMPRESSION_CONFIGURATION; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_RETRY_CONFIGURATOR; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_RETRY_MODE; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_RETRY_STRATEGY; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_SCHEDULED_EXECUTOR_SERVICE; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_SYNC_HTTP_CLIENT; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_SYNC_HTTP_CLIENT_BUILDER; +import static software.amazon.awssdk.core.client.config.SdkClientOption.CRC32_FROM_COMPRESSED_DATA_ENABLED; +import static software.amazon.awssdk.core.client.config.SdkClientOption.DEFAULT_RETRY_MODE; +import static software.amazon.awssdk.core.client.config.SdkClientOption.EXECUTION_INTERCEPTORS; +import static software.amazon.awssdk.core.client.config.SdkClientOption.HTTP_CLIENT_CONFIG; +import static software.amazon.awssdk.core.client.config.SdkClientOption.IDENTITY_PROVIDERS; +import static software.amazon.awssdk.core.client.config.SdkClientOption.INTERNAL_USER_AGENT; +import static software.amazon.awssdk.core.client.config.SdkClientOption.METRIC_PUBLISHERS; +import static software.amazon.awssdk.core.client.config.SdkClientOption.PROFILE_FILE; +import static software.amazon.awssdk.core.client.config.SdkClientOption.PROFILE_FILE_SUPPLIER; +import static software.amazon.awssdk.core.client.config.SdkClientOption.PROFILE_NAME; +import static software.amazon.awssdk.core.client.config.SdkClientOption.RETRY_STRATEGY; +import static software.amazon.awssdk.core.client.config.SdkClientOption.SCHEDULED_EXECUTOR_SERVICE; +import static software.amazon.awssdk.core.client.config.SdkClientOption.SYNC_HTTP_CLIENT; +import static software.amazon.awssdk.core.client.config.SdkClientOption.USER_AGENT_APP_ID; +import static software.amazon.awssdk.core.internal.useragent.UserAgentConstant.APP_ID; +import static software.amazon.awssdk.core.internal.useragent.UserAgentConstant.HTTP; +import static software.amazon.awssdk.core.internal.useragent.UserAgentConstant.INTERNAL_METADATA_MARKER; +import static software.amazon.awssdk.core.internal.useragent.UserAgentConstant.IO; +import static software.amazon.awssdk.utils.CollectionUtils.mergeLists; +import static software.amazon.awssdk.utils.Validate.paramNotNull; + +/** + * An SDK-internal implementation of the methods in {@link SdkClientBuilder}, {@link SdkAsyncClientBuilder} and + * {@link SdkSyncClientBuilder}. This implements all methods required by those interfaces, allowing service-specific builders to + * just implement the configuration they wish to add. + * + *

By implementing both the sync and async interface's methods, service-specific builders can share code between their sync + * and + * async variants without needing one to extend the other. Note: This only defines the methods in the sync and async builder + * interfaces. It does not implement the interfaces themselves. This is because the sync and async client builder interfaces both + * require a type-constrained parameter for use in fluent chaining, and a generic type parameter conflict is introduced into the + * class hierarchy by this interface extending the builder interfaces themselves.

+ * + *

Like all {@link SdkClientBuilder}s, this class is not thread safe.

+ * + * @param The type of builder, for chaining. + * @param The type of client generated by this builder. + */ +@SdkProtectedApi +public abstract class SdkDefaultClientBuilder, C> implements SdkClientBuilder { + + private static final SdkHttpClient.Builder DEFAULT_HTTP_CLIENT_BUILDER = new DefaultSdkHttpClientBuilder(); + private static final SdkAsyncHttpClient.Builder DEFAULT_ASYNC_HTTP_CLIENT_BUILDER = new DefaultSdkAsyncHttpClientBuilder(); + + protected final SdkClientConfiguration.Builder clientConfiguration = SdkClientConfiguration.builder(); + + protected final AttributeMap.Builder clientContextParams = AttributeMap.builder(); + protected ClientOverrideConfiguration overrideConfig; + private final SdkHttpClient.Builder defaultHttpClientBuilder; + private final SdkAsyncHttpClient.Builder defaultAsyncHttpClientBuilder; + private final List plugins = new ArrayList<>(); + private static final ScheduledExecutorService awsSdkScheduler=Executors.newScheduledThreadPool(20, new ThreadFactoryBuilder().threadNamePrefix("aws-sdk-scheduler-%d").build()); + + + + protected SdkDefaultClientBuilder() { + this(DEFAULT_HTTP_CLIENT_BUILDER, DEFAULT_ASYNC_HTTP_CLIENT_BUILDER); + } + + @SdkTestInternalApi + protected SdkDefaultClientBuilder(SdkHttpClient.Builder defaultHttpClientBuilder, + SdkAsyncHttpClient.Builder defaultAsyncHttpClientBuilder) { + this.defaultHttpClientBuilder = defaultHttpClientBuilder; + this.defaultAsyncHttpClientBuilder = defaultAsyncHttpClientBuilder; + } + + /** + * Build a client using the current state of this builder. This is marked final in order to allow this class to add standard + * "build" logic between all service clients. Service clients are expected to implement the {@link #buildClient} method, that + * accepts the immutable client configuration generated by this build method. + */ + @Override + public final C build() { + return buildClient(); + } + + /** + * Implemented by child classes to create a client using the provided immutable configuration objects. The async and sync + * configurations are not yet immutable. Child classes will need to make them immutable in order to validate them and pass + * them to the client's constructor. + * + * @return A client based on the provided configuration. + */ + protected abstract C buildClient(); + + /** + * Return a client configuration object, populated with the following chain of priorities. + *
    + *
  1. Client Configuration Overrides
  2. + *
  3. Customer Configuration
  4. + *
  5. Service-Specific Defaults
  6. + *
  7. Global Defaults
  8. + *
+ */ + protected final SdkClientConfiguration syncClientConfiguration() { + clientConfiguration.option(SdkClientOption.CLIENT_CONTEXT_PARAMS, clientContextParams.build()); + SdkClientConfiguration configuration = clientConfiguration.build(); + + // Apply overrides + configuration = setOverrides(configuration); + + // Apply defaults + configuration = mergeChildDefaults(configuration); + configuration = mergeGlobalDefaults(configuration); + + // Create additional configuration from the default-applied configuration + configuration = finalizeChildConfiguration(configuration); + configuration = finalizeSyncConfiguration(configuration); + configuration = finalizeConfiguration(configuration); + + // Invoke the plugins + configuration = invokePlugins(configuration); + + return configuration; + } + + /** + * Return a client configuration object, populated with the following chain of priorities. + *
    + *
  1. Client Configuration Overrides
  2. + *
  3. Customer Configuration
  4. + *
  5. Implementation/Service-Specific Configuration
  6. + *
  7. Global Default Configuration
  8. + *
+ */ + protected final SdkClientConfiguration asyncClientConfiguration() { + clientConfiguration.option(SdkClientOption.CLIENT_CONTEXT_PARAMS, clientContextParams.build()); + SdkClientConfiguration configuration = clientConfiguration.build(); + + // Apply overrides + configuration = setOverrides(configuration); + + // Apply defaults + configuration = mergeChildDefaults(configuration); + configuration = mergeGlobalDefaults(configuration); + + // Create additional configuration from the default-applied configuration + configuration = finalizeChildConfiguration(configuration); + configuration = finalizeAsyncConfiguration(configuration); + configuration = finalizeConfiguration(configuration); + + // Invoke the plugins + configuration = invokePlugins(configuration); + + return configuration; + } + + /** + * Apply the client override configuration to the provided configuration. This generally does not need to be overridden by + * child classes, but some previous client versions override it. + */ + protected SdkClientConfiguration setOverrides(SdkClientConfiguration configuration) { + if (overrideConfig == null) { + return configuration; + } + SdkClientConfiguration.Builder builder = configuration.toBuilder(); + overrideConfig.retryStrategy().ifPresent(retryStrategy -> builder.option(RETRY_STRATEGY, retryStrategy)); + overrideConfig.retryMode().ifPresent(retryMode -> builder.option(RETRY_STRATEGY, + SdkDefaultRetryStrategy.forRetryMode(retryMode))); + overrideConfig.retryStrategyConfigurator().ifPresent(configurator -> { + RetryStrategy.Builder defaultBuilder = SdkDefaultRetryStrategy.defaultRetryStrategy().toBuilder(); + configurator.accept(defaultBuilder); + builder.option(RETRY_STRATEGY, defaultBuilder.build()); + }); + builder.putAll(overrideConfig); + // Forget anything we configured in the override configuration else it might be re-applied. + builder.option(CONFIGURED_RETRY_MODE, null); + builder.option(CONFIGURED_RETRY_STRATEGY, null); + builder.option(CONFIGURED_RETRY_CONFIGURATOR, null); + return builder.build(); + } + + + /** + * Optionally overridden by child implementations to apply implementation-specific default configuration. + * (eg. AWS's default credentials providers) + */ + protected SdkClientConfiguration mergeChildDefaults(SdkClientConfiguration configuration) { + return configuration; + } + + /** + * Apply global default configuration + */ + private SdkClientConfiguration mergeGlobalDefaults(SdkClientConfiguration configuration) { + Supplier defaultProfileFileSupplier = new Lazy<>(ProfileFile::defaultProfileFile)::getValue; + + configuration = configuration.merge(c -> c.option(EXECUTION_INTERCEPTORS, new ArrayList<>()) + .option(METRIC_PUBLISHERS, new ArrayList<>()) + .option(ADDITIONAL_HTTP_HEADERS, new LinkedHashMap<>()) + .option(PROFILE_FILE_SUPPLIER, defaultProfileFileSupplier) + .lazyOption(PROFILE_FILE, conf -> conf.get(PROFILE_FILE_SUPPLIER).get()) + .option(PROFILE_NAME, + ProfileFileSystemSetting.AWS_PROFILE.getStringValueOrThrow()) + .option(USER_AGENT_PREFIX, "") + .option(USER_AGENT_SUFFIX, "") + .option(CRC32_FROM_COMPRESSED_DATA_ENABLED, false) + .option(CONFIGURED_COMPRESSION_CONFIGURATION, + CompressionConfiguration.builder().build())); + return configuration; + } + + /** + * Optionally overridden by child implementations to derive implementation-specific configuration from the + * default-applied configuration. (eg. AWS's endpoint, derived from the region). + */ + protected SdkClientConfiguration finalizeChildConfiguration(SdkClientConfiguration configuration) { + return configuration; + } + + /** + * Finalize sync-specific configuration from the default-applied configuration. + */ + private SdkClientConfiguration finalizeSyncConfiguration(SdkClientConfiguration config) { + return config.toBuilder() + .lazyOption(SdkClientOption.SYNC_HTTP_CLIENT, c -> resolveSyncHttpClient(c, config)) + .option(SdkClientOption.CLIENT_TYPE, SYNC) + .build(); + } + + /** + * Finalize async-specific configuration from the default-applied configuration. + */ + private SdkClientConfiguration finalizeAsyncConfiguration(SdkClientConfiguration config) { + return config.toBuilder() + .lazyOptionIfAbsent(FUTURE_COMPLETION_EXECUTOR, this::resolveAsyncFutureCompletionExecutor) + .lazyOption(ASYNC_HTTP_CLIENT, c -> resolveAsyncHttpClient(c, config)) + .option(SdkClientOption.CLIENT_TYPE, ASYNC) + .build(); + } + + /** + * Finalize global configuration from the default-applied configuration. + */ + private SdkClientConfiguration finalizeConfiguration(SdkClientConfiguration config) { + return config.toBuilder() + .lazyOption(SCHEDULED_EXECUTOR_SERVICE, this::resolveScheduledExecutorService) + .lazyOptionIfAbsent(RETRY_STRATEGY, this::resolveRetryStrategy) + .option(EXECUTION_INTERCEPTORS, resolveExecutionInterceptors(config)) + .lazyOption(CLIENT_USER_AGENT, this::resolveClientUserAgent) + .lazyOption(COMPRESSION_CONFIGURATION, this::resolveCompressionConfiguration) + .lazyOptionIfAbsent(IDENTITY_PROVIDERS, c -> IdentityProviders.builder().build()) + .build(); + } + + private CompressionConfiguration resolveCompressionConfiguration(LazyValueSource config) { + CompressionConfiguration compressionConfig = config.get(CONFIGURED_COMPRESSION_CONFIGURATION); + return compressionConfig.toBuilder() + .requestCompressionEnabled(resolveCompressionEnabled(config, compressionConfig)) + .minimumCompressionThresholdInBytes(resolveMinCompressionThreshold(config, compressionConfig)) + .build(); + } + + private Boolean resolveCompressionEnabled(LazyValueSource config, CompressionConfiguration compressionConfig) { + Supplier> systemSettingConfiguration = + () -> SdkSystemSetting.AWS_DISABLE_REQUEST_COMPRESSION.getBooleanValue() + .map(v -> !v); + + Supplier> profileFileConfiguration = + () -> config.get(PROFILE_FILE_SUPPLIER).get() + .profile(config.get(PROFILE_NAME)) + .flatMap(p -> p.booleanProperty(ProfileProperty.DISABLE_REQUEST_COMPRESSION)) + .map(v -> !v); + + return OptionalUtils.firstPresent(Optional.ofNullable(compressionConfig.requestCompressionEnabled()), + systemSettingConfiguration, + profileFileConfiguration) + .orElse(true); + } + + private Integer resolveMinCompressionThreshold(LazyValueSource config, CompressionConfiguration compressionConfig) { + Supplier> systemSettingConfiguration = + SdkSystemSetting.AWS_REQUEST_MIN_COMPRESSION_SIZE_BYTES::getIntegerValue; + + Supplier> profileFileConfiguration = + () -> config.get(PROFILE_FILE_SUPPLIER).get() + .profile(config.get(PROFILE_NAME)) + .flatMap(p -> p.property(ProfileProperty.REQUEST_MIN_COMPRESSION_SIZE_BYTES)) + .map(Integer::parseInt); + + return OptionalUtils.firstPresent(Optional.ofNullable(compressionConfig.minimumCompressionThresholdInBytes()), + systemSettingConfiguration, + profileFileConfiguration) + .orElse(CompressRequestStage.DEFAULT_MIN_COMPRESSION_SIZE); + } + + /** + * By default, returns the configuration as-is. Classes extending this method will take care of running the plugins and + * return the updated configuration if plugins are supported. + */ + @SdkPreviewApi + protected SdkClientConfiguration invokePlugins(SdkClientConfiguration config) { + return config; + } + + private String resolveClientUserAgent(LazyValueSource config) { + SdkClientUserAgentProperties clientProperties = new SdkClientUserAgentProperties(); + + ClientType clientType = config.get(CLIENT_TYPE); + ClientType resolvedClientType = clientType == null ? ClientType.UNKNOWN : clientType; + + clientProperties.putProperty(INTERNAL_METADATA_MARKER, StringUtils.trimToEmpty(config.get(INTERNAL_USER_AGENT))); + clientProperties.putProperty(IO, StringUtils.lowerCase(resolvedClientType.name())); + clientProperties.putProperty(HTTP, SdkHttpUtils.urlEncode(clientName(resolvedClientType, + config.get(SYNC_HTTP_CLIENT), + config.get(ASYNC_HTTP_CLIENT)))); + String appId = config.get(USER_AGENT_APP_ID); + String resolvedAppId = appId == null ? resolveAppId(config) : appId; + clientProperties.putProperty(APP_ID, resolvedAppId); + return SdkUserAgentBuilder.buildClientUserAgentString(SystemUserAgent.getOrCreate(), clientProperties); + } + + private String resolveAppId(LazyValueSource config) { + Optional appIdFromConfig = AppIdResolver.create() + .profileFile(config.get(PROFILE_FILE_SUPPLIER)) + .profileName(config.get(PROFILE_NAME)) + .resolve(); + return appIdFromConfig.orElse(null); + } + + private static String clientName(ClientType clientType, SdkHttpClient syncHttpClient, SdkAsyncHttpClient asyncHttpClient) { + if (clientType == SYNC) { + return syncHttpClient == null ? "null" : syncHttpClient.clientName(); + } + + if (clientType == ASYNC) { + return asyncHttpClient == null ? "null" : asyncHttpClient.clientName(); + } + + return ClientType.UNKNOWN.name(); + } + + private RetryStrategy resolveRetryStrategy(LazyValueSource config) { + RetryMode retryMode = RetryMode.resolver() + .profileFile(config.get(PROFILE_FILE_SUPPLIER)) + .profileName(config.get(PROFILE_NAME)) + .defaultRetryMode(config.get(DEFAULT_RETRY_MODE)) + .resolve(); + return SdkDefaultRetryStrategy.forRetryMode(retryMode); + } + + /** + * Finalize which sync HTTP client will be used for the created client. + */ + private SdkHttpClient resolveSyncHttpClient(LazyValueSource config, + SdkClientConfiguration deprecatedConfigDoNotUseThis) { + SdkHttpClient httpClient = config.get(CONFIGURED_SYNC_HTTP_CLIENT); + SdkHttpClient.Builder httpClientBuilder = config.get(CONFIGURED_SYNC_HTTP_CLIENT_BUILDER); + Validate.isTrue(httpClient == null || + httpClientBuilder == null, + "The httpClient and the httpClientBuilder can't both be configured."); + + AttributeMap httpClientConfig = getHttpClientConfig(config, deprecatedConfigDoNotUseThis); + + return Either.fromNullable(httpClient, httpClientBuilder) + .map(e -> e.map(Function.identity(), b -> b.buildWithDefaults(httpClientConfig))) + .orElseGet(() -> defaultHttpClientBuilder.buildWithDefaults(httpClientConfig)); + } + + /** + * Finalize which async HTTP client will be used for the created client. + */ + private SdkAsyncHttpClient resolveAsyncHttpClient(LazyValueSource config, + SdkClientConfiguration deprecatedConfigDoNotUseThis) { + Validate.isTrue(config.get(CONFIGURED_ASYNC_HTTP_CLIENT) == null || + config.get(CONFIGURED_ASYNC_HTTP_CLIENT_BUILDER) == null, + "The asyncHttpClient and the asyncHttpClientBuilder can't both be configured."); + + AttributeMap httpClientConfig = getHttpClientConfig(config, deprecatedConfigDoNotUseThis); + + return Either.fromNullable(config.get(CONFIGURED_ASYNC_HTTP_CLIENT), config.get(CONFIGURED_ASYNC_HTTP_CLIENT_BUILDER)) + .map(e -> e.map(Function.identity(), b -> b.buildWithDefaults(httpClientConfig))) + .orElseGet(() -> defaultAsyncHttpClientBuilder.buildWithDefaults(httpClientConfig)); + } + + private AttributeMap getHttpClientConfig(LazyValueSource config, SdkClientConfiguration deprecatedConfigDoNotUseThis) { + AttributeMap httpClientConfig = config.get(HTTP_CLIENT_CONFIG); + if (httpClientConfig == null) { + // We must be using an old client, use the deprecated way of loading HTTP_CLIENT_CONFIG, instead. This won't take + // into account any configuration changes (e.g. defaults mode) from plugins, but this is the best we can do without + // breaking protected APIs. TODO: if we ever break protected APIs, remove these "childHttpConfig" hooks. + httpClientConfig = childHttpConfig(deprecatedConfigDoNotUseThis); + } + return httpClientConfig; + } + + /** + * @deprecated Configure {@link SdkClientOption#HTTP_CLIENT_CONFIG} from {@link #finalizeChildConfiguration} instead. + */ + @Deprecated + protected AttributeMap childHttpConfig(SdkClientConfiguration configuration) { + return childHttpConfig(); + } + + /** + * @deprecated Configure {@link SdkClientOption#HTTP_CLIENT_CONFIG} from {@link #finalizeChildConfiguration} instead. + */ + @Deprecated + protected AttributeMap childHttpConfig() { + return AttributeMap.empty(); + } + + /** + * Finalize which async executor service will be used for the created client. The default async executor + * service has at least 8 core threads and can scale up to at least 64 threads when needed depending + * on the number of processors available. + */ + private Executor resolveAsyncFutureCompletionExecutor(LazyValueSource config) { + int processors = Runtime.getRuntime().availableProcessors(); + int corePoolSize = Math.max(8, processors); + int maxPoolSize = Math.max(64, processors * 2); + ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, + 10, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(1_000), + new ThreadFactoryBuilder() + .threadNamePrefix("sdk-async-response").build()); + // Allow idle core threads to time out + executor.allowCoreThreadTimeOut(true); + return executor; + } + + /** + * Finalize the internal SDK scheduled executor service that is used for scheduling tasks such as async retry attempts and + * timeout task. + */ + private ScheduledExecutorService resolveScheduledExecutorService(LazyValueSource c) { + ScheduledExecutorService executor = c.get(CONFIGURED_SCHEDULED_EXECUTOR_SERVICE); + if (executor != null) { + return executor; + } + + return awsSdkScheduler; + } + + /** + * Finalize which execution interceptors will be used for the created client. + */ + private List resolveExecutionInterceptors(SdkClientConfiguration config) { + List globalInterceptors = new ArrayList<>(); + globalInterceptors.addAll(sdkInterceptors()); + globalInterceptors.addAll(new ClasspathInterceptorChainFactory().getGlobalInterceptors()); + return mergeLists(globalInterceptors, config.option(EXECUTION_INTERCEPTORS)); + } + + + /** + * The set of interceptors that should be included with all services. + */ + private List sdkInterceptors() { + return Collections.unmodifiableList(Arrays.asList( + new HttpChecksumValidationInterceptor() + )); + } + + @Override + public final B endpointOverride(URI endpointOverride) { + if (endpointOverride == null) { + clientConfiguration.option(SdkClientOption.CLIENT_ENDPOINT_PROVIDER, null); + } else { + clientConfiguration.option(SdkClientOption.CLIENT_ENDPOINT_PROVIDER, + ClientEndpointProvider.forEndpointOverride(endpointOverride)); + } + return thisBuilder(); + } + + public final void setEndpointOverride(URI endpointOverride) { + endpointOverride(endpointOverride); + } + + public final B asyncConfiguration(ClientAsyncConfiguration asyncConfiguration) { + clientConfiguration.option(FUTURE_COMPLETION_EXECUTOR, asyncConfiguration.advancedOption(FUTURE_COMPLETION_EXECUTOR)); + return thisBuilder(); + } + + public final void setAsyncConfiguration(ClientAsyncConfiguration asyncConfiguration) { + asyncConfiguration(asyncConfiguration); + } + + @Override + public final B overrideConfiguration(ClientOverrideConfiguration overrideConfig) { + this.overrideConfig = overrideConfig; + return thisBuilder(); + } + + public final void setOverrideConfiguration(ClientOverrideConfiguration overrideConfiguration) { + overrideConfiguration(overrideConfiguration); + } + + @Override + public final ClientOverrideConfiguration overrideConfiguration() { + if (overrideConfig == null) { + return ClientOverrideConfiguration.builder().build(); + } + return overrideConfig; + } + + public final B httpClient(SdkHttpClient httpClient) { + if (httpClient != null) { + httpClient = new NonManagedSdkHttpClient(httpClient); + } + clientConfiguration.option(CONFIGURED_SYNC_HTTP_CLIENT, httpClient); + return thisBuilder(); + } + + public final B httpClientBuilder(SdkHttpClient.Builder httpClientBuilder) { + clientConfiguration.option(CONFIGURED_SYNC_HTTP_CLIENT_BUILDER, httpClientBuilder); + return thisBuilder(); + } + + public final B httpClient(SdkAsyncHttpClient httpClient) { + if (httpClient != null) { + httpClient = new NonManagedSdkAsyncHttpClient(httpClient); + } + clientConfiguration.option(CONFIGURED_ASYNC_HTTP_CLIENT, httpClient); + return thisBuilder(); + } + + public final B httpClientBuilder(SdkAsyncHttpClient.Builder httpClientBuilder) { + clientConfiguration.option(CONFIGURED_ASYNC_HTTP_CLIENT_BUILDER, httpClientBuilder); + return thisBuilder(); + } + + public final B metricPublishers(List metricPublishers) { + clientConfiguration.option(METRIC_PUBLISHERS, metricPublishers); + return thisBuilder(); + } + + @Override + public final B addPlugin(SdkPlugin plugin) { + plugins.add(paramNotNull(plugin, "plugin")); + return thisBuilder(); + } + + @Override + public final List plugins() { + return Collections.unmodifiableList(plugins); + } + + /** + * Return "this" for method chaining. + */ + @SuppressWarnings("unchecked") + protected B thisBuilder() { + return (B) this; + } + + /** + * Wrapper around {@link SdkHttpClient} to prevent it from being closed. Used when the customer provides + * an already built client in which case they are responsible for the lifecycle of it. + */ + @SdkTestInternalApi + public static final class NonManagedSdkHttpClient implements SdkHttpClient { + + private final SdkHttpClient delegate; + + private NonManagedSdkHttpClient(SdkHttpClient delegate) { + this.delegate = paramNotNull(delegate, "SdkHttpClient"); + } + + @Override + public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) { + return delegate.prepareRequest(request); + } + + @Override + public void close() { + // Do nothing, this client is managed by the customer. + } + + @Override + public String clientName() { + return delegate.clientName(); + } + } + + /** + * Wrapper around {@link SdkAsyncHttpClient} to prevent it from being closed. Used when the customer provides + * an already built client in which case they are responsible for the lifecycle of it. + */ + @SdkTestInternalApi + public static final class NonManagedSdkAsyncHttpClient implements SdkAsyncHttpClient { + + private final SdkAsyncHttpClient delegate; + + NonManagedSdkAsyncHttpClient(SdkAsyncHttpClient delegate) { + this.delegate = paramNotNull(delegate, "SdkAsyncHttpClient"); + } + + @Override + public CompletableFuture execute(AsyncExecuteRequest request) { + return delegate.execute(request); + } + + @Override + public String clientName() { + return delegate.clientName(); + } + + @Override + public void close() { + // Do nothing, this client is managed by the customer. + } + } +} From 2bf7308de164dfc4ca48cf03f71d8c40c0ac0d64 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Mon, 5 Jan 2026 17:43:58 +0800 Subject: [PATCH 07/11] [Fix](catalog)Resources should be closed when dropping a Catalog. --- .../awssdk/core/client/builder/SdkDefaultClientBuilder.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java b/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java index 086fef1c560c68..92a4e65b0eb3fd 100644 --- a/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java +++ b/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java @@ -15,6 +15,8 @@ package software.amazon.awssdk.core.client.builder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import software.amazon.awssdk.annotations.SdkPreviewApi; import software.amazon.awssdk.annotations.SdkProtectedApi; import software.amazon.awssdk.annotations.SdkTestInternalApi; @@ -136,6 +138,7 @@ */ @SdkProtectedApi public abstract class SdkDefaultClientBuilder, C> implements SdkClientBuilder { + private static final Logger LOG = LogManager.getLogger(SdkDefaultClientBuilder.class); private static final SdkHttpClient.Builder DEFAULT_HTTP_CLIENT_BUILDER = new DefaultSdkHttpClientBuilder(); private static final SdkAsyncHttpClient.Builder DEFAULT_ASYNC_HTTP_CLIENT_BUILDER = new DefaultSdkAsyncHttpClientBuilder(); From a17ad1d5989ef8829769e9008d189414de30ced1 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Mon, 5 Jan 2026 19:30:38 +0800 Subject: [PATCH 08/11] [Fix](catalog)Resources should be closed when dropping a Catalog. --- fe/check/checkstyle/suppressions.xml | 1 + .../builder/SdkDefaultClientBuilder.java | 15 +- .../UncloseableScheduledExecutorService.java | 139 ++++++++++++++++++ 3 files changed, 154 insertions(+), 1 deletion(-) create mode 100644 fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/UncloseableScheduledExecutorService.java diff --git a/fe/check/checkstyle/suppressions.xml b/fe/check/checkstyle/suppressions.xml index 8e1c40e31620fe..5ac4e39f408953 100644 --- a/fe/check/checkstyle/suppressions.xml +++ b/fe/check/checkstyle/suppressions.xml @@ -75,4 +75,5 @@ under the License. + diff --git a/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java b/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java index 92a4e65b0eb3fd..bbff46bc624729 100644 --- a/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java +++ b/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java @@ -150,8 +150,21 @@ public abstract class SdkDefaultClientBuilder, private final SdkHttpClient.Builder defaultHttpClientBuilder; private final SdkAsyncHttpClient.Builder defaultAsyncHttpClientBuilder; private final List plugins = new ArrayList<>(); - private static final ScheduledExecutorService awsSdkScheduler=Executors.newScheduledThreadPool(20, new ThreadFactoryBuilder().threadNamePrefix("aws-sdk-scheduler-%d").build()); + private static final ScheduledExecutorService awsSdkScheduler; + static { + ScheduledExecutorService realScheduler = + Executors.newScheduledThreadPool( + 20, + r -> { + Thread t = new Thread(r, "aws-sdk-scheduler"); + t.setDaemon(true); + return t; + } + ); + + awsSdkScheduler = new UncloseableScheduledExecutorService(realScheduler); + } protected SdkDefaultClientBuilder() { diff --git a/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/UncloseableScheduledExecutorService.java b/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/UncloseableScheduledExecutorService.java new file mode 100644 index 00000000000000..d5645f285fd1e0 --- /dev/null +++ b/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/UncloseableScheduledExecutorService.java @@ -0,0 +1,139 @@ +package software.amazon.awssdk.core.client.builder; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public final class UncloseableScheduledExecutorService + implements ScheduledExecutorService { + + private final ScheduledExecutorService delegate; + + public UncloseableScheduledExecutorService( + ScheduledExecutorService delegate) { + this.delegate = Objects.requireNonNull(delegate); + } + + // ================= Lifecycle methods (NO-OP) ================= + + @Override + public void shutdown() { + // NO-OP + } + + @Override + public List shutdownNow() { + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) { + return false; + } + + // ================= Scheduled methods ================= + + @Override + public ScheduledFuture schedule( + Runnable command, long delay, TimeUnit unit) { + return delegate.schedule(command, delay, unit); + } + + @Override + public ScheduledFuture schedule( + Callable callable, long delay, TimeUnit unit) { + return delegate.schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate( + Runnable command, + long initialDelay, + long period, + TimeUnit unit) { + return delegate.scheduleAtFixedRate( + command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay( + Runnable command, + long initialDelay, + long delay, + TimeUnit unit) { + return delegate.scheduleWithFixedDelay( + command, initialDelay, delay, unit); + } + + // ================= Executor methods ================= + + @Override + public void execute(Runnable command) { + delegate.execute(command); + } + + @Override + public Future submit(Callable task) { + return delegate.submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return delegate.submit(task, result); + } + + @Override + public Future submit(Runnable task) { + return delegate.submit(task); + } + + @Override + public List> invokeAll( + Collection> tasks) + throws InterruptedException { + return delegate.invokeAll(tasks); + } + + @Override + public List> invokeAll( + Collection> tasks, + long timeout, + TimeUnit unit) + throws InterruptedException { + return delegate.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny( + Collection> tasks) + throws InterruptedException, ExecutionException { + return delegate.invokeAny(tasks); + } + + @Override + public T invokeAny( + Collection> tasks, + long timeout, + TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate.invokeAny(tasks, timeout, unit); + } +} \ No newline at end of file From ac4a6b9e2bb921570526fa38d65b9204b334b28a Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Tue, 6 Jan 2026 11:02:42 +0800 Subject: [PATCH 09/11] [Fix](catalog)Resources should be closed when dropping a Catalog. --- .licenserc.yaml | 1 + .../java/org/apache/doris/common/Config.java | 7 +++++++ .../builder/SdkDefaultClientBuilder.java | 3 ++- .../UncloseableScheduledExecutorService.java | 20 ++++++++++++++++--- 4 files changed, 27 insertions(+), 4 deletions(-) diff --git a/.licenserc.yaml b/.licenserc.yaml index fa146036fdc529..8bdc7d8113bc4c 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -44,6 +44,7 @@ header: - "**/*.parquet" - "docs/.markdownlintignore" - "fe/fe-core/src/test/resources/data/net_snmp_normal" + - "fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java" - "fe/fe-core/src/main/antlr4/org/apache/doris/nereids/JavaLexer.g4" - "fe/fe-core/src/main/antlr4/org/apache/doris/nereids/JavaParser.g4" - "be/dict/ik/*" diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 029420e5f19019..58e0562daf37ab 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3771,6 +3771,13 @@ public static int metaServiceRpcRetryTimes() { @ConfField(mutable = true) public static String aws_credentials_provider_version = "v2"; + @ConfField(description = { + "AWS SDK 用于调度异步重试、超时任务以及其他后台操作的线程池大小,全局共享", + "The thread pool size used by the AWS SDK to schedule asynchronous retries, timeout tasks, " + + "and other background operations, shared globally" + }) + public static int aws_sdk_async_scheduler_thread_pool_size = 20; + @ConfField(description = { "agent tasks 健康检查的时间间隔,默认五分钟,小于等于 0 时不做健康检查", "agent tasks health check interval, default is five minutes, no health check when less than or equal to 0" diff --git a/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java b/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java index bbff46bc624729..3607e8035f6ee1 100644 --- a/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java +++ b/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java @@ -15,6 +15,7 @@ package software.amazon.awssdk.core.client.builder; +import org.apache.doris.common.Config; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import software.amazon.awssdk.annotations.SdkPreviewApi; @@ -155,7 +156,7 @@ public abstract class SdkDefaultClientBuilder, static { ScheduledExecutorService realScheduler = Executors.newScheduledThreadPool( - 20, + Config.aws_sdk_async_scheduler_thread_pool_size, r -> { Thread t = new Thread(r, "aws-sdk-scheduler"); t.setDaemon(true); diff --git a/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/UncloseableScheduledExecutorService.java b/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/UncloseableScheduledExecutorService.java index d5645f285fd1e0..1324668b27b333 100644 --- a/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/UncloseableScheduledExecutorService.java +++ b/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/UncloseableScheduledExecutorService.java @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package software.amazon.awssdk.core.client.builder; import java.util.Collection; @@ -23,7 +40,6 @@ public UncloseableScheduledExecutorService( } // ================= Lifecycle methods (NO-OP) ================= - @Override public void shutdown() { // NO-OP @@ -50,7 +66,6 @@ public boolean awaitTermination(long timeout, TimeUnit unit) { } // ================= Scheduled methods ================= - @Override public ScheduledFuture schedule( Runnable command, long delay, TimeUnit unit) { @@ -84,7 +99,6 @@ public ScheduledFuture scheduleWithFixedDelay( } // ================= Executor methods ================= - @Override public void execute(Runnable command) { delegate.execute(command); From a34190b405ca56086e87c2245e08deb102157cc4 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Tue, 6 Jan 2026 18:15:46 +0800 Subject: [PATCH 10/11] [Fix](catalog)Resources should be closed when dropping a Catalog. --- fe/fe-core/pom.xml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 9e3dfd6006feb4..287285edd27ca8 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -345,14 +345,6 @@ under the License. software.amazon.awssdk s3
- - software.amazon.awssdk - apache-client - - - software.amazon.awssdk - netty-nio-client - com.tencentcloudapi From a132dc90439208b382dacac4e192375c8f8e835d Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Wed, 7 Jan 2026 10:48:07 +0800 Subject: [PATCH 11/11] [Fix](catalog)Resources should be closed when dropping a Catalog. --- LICENSE.txt | 7 +++++ NOTICE.txt | 30 +++++++++++++++++++ .../UncloseableScheduledExecutorService.java | 4 +-- .../builder/SdkDefaultClientBuilder.java | 5 ++++ .../paimon/test_paimon_dlf_catalog.groovy | 2 -- ...t_paimon_dlf_catalog_miss_dlf_param.groovy | 1 - .../test_paimon_dlf_catalog_new_param.groovy | 2 -- .../test_paimon_dlf_rest_catalog.groovy | 2 -- .../paimon/test_paimon_hms_catalog.groovy | 3 -- .../hive_on_hms_and_dlf.groovy | 16 ---------- .../iceberg_and_hive_on_glue.groovy | 17 ----------- ...eberg_on_hms_and_filesystem_and_dlf.groovy | 7 ----- 12 files changed, 44 insertions(+), 52 deletions(-) rename fe/fe-core/src/main/java/{software/amazon/awssdk/core/client/builder => org/apache/doris/common/util}/UncloseableScheduledExecutorService.java (98%) diff --git a/LICENSE.txt b/LICENSE.txt index ce0e98bc85111c..1d1ecffc1de8d5 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -202,6 +202,13 @@ -------------------------------------------------------------------------------- +The following components are provided under the Apache License. See project link for details. +The text of each license is the standard Apache 2.0 license. + +software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder from AWS SDK v2 (sdk-core 2.29.52) + +-------------------------------------------------------------------------------- + be/src/common/status.* : BSD-style license Copyright (c) 2011 The LevelDB Authors. All rights reserved. diff --git a/NOTICE.txt b/NOTICE.txt index 5f1ae973c4f23a..62fc72efbadca3 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -52,3 +52,33 @@ its NOTICE file: This product includes cryptographic software written by Eric Young (eay@cryptsoft.com). This product includes software written by Tim Hudson (tjh@cryptsoft.com). + +-------------------------------------------------------------------------------- +This product includes code from AWS SDK, which includes the following in +its NOTICE file: + +AWS SDK for Java 2.0 +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +This product includes software developed by +Amazon Technologies, Inc (http://www.amazon.com/). + +********************** +THIRD PARTY COMPONENTS +********************** +This software includes third party software subject to the following copyrights: +- XML parsing and utility functions from JetS3t - Copyright 2006-2009 James Murty. +- PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc. +- Apache Commons Lang - https://github.com/apache/commons-lang +- Netty Reactive Streams - https://github.com/playframework/netty-reactive-streams +- Jackson-core - https://github.com/FasterXML/jackson-core +- Jackson-dataformat-cbor - https://github.com/FasterXML/jackson-dataformats-binary + +The licenses for these third party components are included in LICENSE.txt + +- For Apache Commons Lang see also this required NOTICE: + Apache Commons Lang + Copyright 2001-2020 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (https://www.apache.org/). diff --git a/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/UncloseableScheduledExecutorService.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/UncloseableScheduledExecutorService.java similarity index 98% rename from fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/UncloseableScheduledExecutorService.java rename to fe/fe-core/src/main/java/org/apache/doris/common/util/UncloseableScheduledExecutorService.java index 1324668b27b333..7cb50b98d239f7 100644 --- a/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/UncloseableScheduledExecutorService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/UncloseableScheduledExecutorService.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package software.amazon.awssdk.core.client.builder; +package org.apache.doris.common.util; import java.util.Collection; import java.util.Collections; @@ -150,4 +150,4 @@ public T invokeAny( throws InterruptedException, ExecutionException, TimeoutException { return delegate.invokeAny(tasks, timeout, unit); } -} \ No newline at end of file +} diff --git a/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java b/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java index 3607e8035f6ee1..d92e0fcbf1e2bf 100644 --- a/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java +++ b/fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java @@ -16,6 +16,7 @@ package software.amazon.awssdk.core.client.builder; import org.apache.doris.common.Config; +import org.apache.doris.common.util.UncloseableScheduledExecutorService; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import software.amazon.awssdk.annotations.SdkPreviewApi; @@ -137,6 +138,10 @@ * @param The type of builder, for chaining. * @param The type of client generated by this builder. */ +/** + * This class(software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder) is copied from AWS SDK v2 (sdk-core 2.29.52), + * with minor modifications. + */ @SdkProtectedApi public abstract class SdkDefaultClientBuilder, C> implements SdkClientBuilder { private static final Logger LOG = LogManager.getLogger(SdkDefaultClientBuilder.class); diff --git a/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog.groovy b/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog.groovy index bc1e2c96453808..5a68844baa8c31 100644 --- a/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog.groovy +++ b/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog.groovy @@ -64,8 +64,6 @@ suite("test_paimon_dlf_catalog", "p2,external,paimon,external_remote,external_re sql """SELECT * FROM tb_simple\$snapshots;""" } finally { sql """set force_jni_scanner=false""" - - sql """drop catalog if exists ${catalog};""" } } diff --git a/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog_miss_dlf_param.groovy b/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog_miss_dlf_param.groovy index 11e331481ebbbd..b2deafc316acab 100644 --- a/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog_miss_dlf_param.groovy +++ b/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog_miss_dlf_param.groovy @@ -69,7 +69,6 @@ suite("test_paimon_dlf_catalog_miss_dlf_param", "p2,external,paimon,external_rem } finally { sql """set force_jni_scanner=false""" - sql """drop catalog if exists ${catalog};""" } } diff --git a/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog_new_param.groovy b/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog_new_param.groovy index 6594c55dc64065..1044dc8b778ee4 100644 --- a/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog_new_param.groovy +++ b/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_catalog_new_param.groovy @@ -71,8 +71,6 @@ suite("test_paimon_dlf_catalog_new_param", "p2,external,paimon,external_remote,e } finally { sql """set force_jni_scanner=false""" - - sql """drop catalog if exists ${catalog};""" } } diff --git a/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_rest_catalog.groovy b/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_rest_catalog.groovy index 308f0ca8a97c7f..e195c6dd180296 100644 --- a/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_rest_catalog.groovy +++ b/regression-test/suites/external_table_p2/paimon/test_paimon_dlf_rest_catalog.groovy @@ -44,8 +44,6 @@ suite("test_paimon_dlf_rest_catalog", "p2,external,paimon,external_remote,extern } finally { sql """set force_jni_scanner=false""" - - sql """drop catalog if exists ${catalog};""" } } diff --git a/regression-test/suites/external_table_p2/paimon/test_paimon_hms_catalog.groovy b/regression-test/suites/external_table_p2/paimon/test_paimon_hms_catalog.groovy index 79292df1c165ab..790f1095d4e602 100644 --- a/regression-test/suites/external_table_p2/paimon/test_paimon_hms_catalog.groovy +++ b/regression-test/suites/external_table_p2/paimon/test_paimon_hms_catalog.groovy @@ -56,9 +56,6 @@ suite("test_paimon_hms_catalog", "p2,external,paimon,new_catalog_property") { // TODO(zgx): add branch/tag // system table sql """SELECT * FROM external_test_table\$snapshots;""" - - - sql """drop catalog if exists ${catalog};""" } String enabled = context.config.otherConfigs.get("enablePaimonTest") if (enabled == null || !enabled.equalsIgnoreCase("true")) { diff --git a/regression-test/suites/external_table_p2/refactor_catalog_param/hive_on_hms_and_dlf.groovy b/regression-test/suites/external_table_p2/refactor_catalog_param/hive_on_hms_and_dlf.groovy index 5926754f88f3f2..f0d711c7a8cffa 100644 --- a/regression-test/suites/external_table_p2/refactor_catalog_param/hive_on_hms_and_dlf.groovy +++ b/regression-test/suites/external_table_p2/refactor_catalog_param/hive_on_hms_and_dlf.groovy @@ -81,10 +81,6 @@ suite("hive_on_hms_and_dlf", "p2,external,new_catalog_property") { show databases like "${db_name}"; """ assert dropResult.size() == 0 - - sql """ - DROP CATALOG IF EXISTS ${catalog_name}; - """ } /*--------test partition table insert---------*/ @@ -185,10 +181,6 @@ suite("hive_on_hms_and_dlf", "p2,external,new_catalog_property") { show databases like "${db_name}"; """ assert dropResult.size() == 0 - - sql """ - DROP CATALOG IF EXISTS ${catalog_name}; - """ } /*--------test insert overwrite---------*/ @@ -286,10 +278,6 @@ suite("hive_on_hms_and_dlf", "p2,external,new_catalog_property") { show databases like "${db_name}"; """ assert dropResult.size() == 0 - - sql """ - DROP CATALOG IF EXISTS ${catalog_name}; - """ } /*--------only execute query---------*/ @@ -323,10 +311,6 @@ suite("hive_on_hms_and_dlf", "p2,external,new_catalog_property") { SELECT count(1) FROM ${table_name}; """ assert queryResult.get(0).get(0) == data_count - - sql """ - DROP CATALOG IF EXISTS ${catalog_name}; - """ } String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") String keytab_root_dir = "/keytabs" diff --git a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_and_hive_on_glue.groovy b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_and_hive_on_glue.groovy index 208a5557cb2884..6212fa8c20898c 100644 --- a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_and_hive_on_glue.groovy +++ b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_and_hive_on_glue.groovy @@ -80,9 +80,6 @@ suite("iceberg_and_hive_on_glue", "p2,external,hive,new_catalog_property") { show databases like "${db_name}"; """ assert dropResult.size() == 0 - sql """ - DROP CATALOG IF EXISTS ${catalog_name}; - """ } def testQueryAndInsertIcerberg = { String catalogProperties, String prefix -> @@ -144,9 +141,6 @@ suite("iceberg_and_hive_on_glue", "p2,external,hive,new_catalog_property") { show databases like "${db_name}"; """ assert dropResult.size() == 0 - sql """ - DROP CATALOG IF EXISTS ${catalog_name}; - """ } /*--------test insert overwrite for hive---------*/ @@ -235,9 +229,6 @@ suite("iceberg_and_hive_on_glue", "p2,external,hive,new_catalog_property") { show databases like "${db_name}"; """ assert dropResult.size() == 0 - sql """ - DROP CATALOG IF EXISTS ${catalog_name}; - """ } /*--------test insert overwrite for iceberg---------*/ @@ -322,10 +313,6 @@ suite("iceberg_and_hive_on_glue", "p2,external,hive,new_catalog_property") { show databases like "${db_name}"; """ assert dropResult.size() == 0 - - sql """ - DROP CATALOG IF EXISTS ${catalog_name}; - """ } /*--------only execute query---------*/ @@ -359,10 +346,6 @@ suite("iceberg_and_hive_on_glue", "p2,external,hive,new_catalog_property") { SELECT count(1) FROM ${table_name}; """ assert queryResult.get(0).get(0) == data_count - - sql """ - DROP CATALOG IF EXISTS ${catalog_name}; - """ } /*--------GLUE START-----------*/ diff --git a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_on_hms_and_filesystem_and_dlf.groovy b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_on_hms_and_filesystem_and_dlf.groovy index 2fbe9b607c597d..45b18889767cf4 100644 --- a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_on_hms_and_filesystem_and_dlf.groovy +++ b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_on_hms_and_filesystem_and_dlf.groovy @@ -178,9 +178,6 @@ suite("iceberg_on_hms_and_filesystem_and_dlf", "p2,external,new_catalog_property show databases like "${db_name}"; """ assert dropResult.size() == 0 - sql """ - DROP CATALOG IF EXISTS ${catalog_name}; - """ } /*--------only execute query---------*/ @@ -214,10 +211,6 @@ suite("iceberg_on_hms_and_filesystem_and_dlf", "p2,external,new_catalog_property SELECT count(1) FROM ${table_name}; """ assert queryResult.get(0).get(0) == data_count - - sql """ - DROP CATALOG IF EXISTS ${catalog_name}; - """ } String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")