From 5c0356b9f31c48302da0da57a816196df00b9d8a Mon Sep 17 00:00:00 2001 From: zhangli20 Date: Tue, 27 Jan 2026 17:43:25 +0800 Subject: [PATCH] Refactor SparkAuronConfiguration and remove deprecated AuronConf classes --- .../configuration/AuronConfiguration.java | 92 +- .../auron/configuration/ConfigOption.java | 109 ++- .../auron/configuration/ConfigOptions.java | 174 ---- .../java/org/apache/auron/jni/JniBridge.java | 34 + .../configuration/AuronConfigurationTest.java | 54 -- .../auron/configuration/ConfigOptionTest.java | 51 -- .../configuration/MockAuronConfiguration.java | 64 +- native-engine/auron-jni-bridge/src/conf.rs | 10 +- .../auron-jni-bridge/src/jni_bridge.rs | 95 +- ...ForceApplyShuffledHashJoinInterceptor.java | 3 +- .../apache/spark/sql/auron/ShimsImpl.scala | 14 +- .../org/apache/auron/AuronQuerySuite.scala | 6 +- .../apache/auron/NativeConvertersSuite.scala | 10 +- .../apache/auron/jni/SparkAuronAdaptor.java | 3 +- .../SparkAuronConfiguration.java | 832 ++++++++++++------ .../SparkAuronConfigurationDocGenerator.java | 73 ++ .../org/apache/spark/sql/auron/AuronConf.java | 197 ----- .../org/apache/spark/sql/auron/JniBridge.java | 146 --- .../sql/auron/AuronCallNativeWrapper.scala | 238 ----- .../spark/sql/auron/AuronConverters.scala | 76 +- .../auron/AuronSparkSessionExtension.scala | 11 +- .../spark/sql/auron/NativeConverters.scala | 38 +- .../org/apache/spark/sql/auron/Shims.scala | 1 + .../SparkAuronConfigurationTest.java | 77 -- .../auron/paimon/PaimonConvertProvider.scala | 6 +- 25 files changed, 904 insertions(+), 1510 deletions(-) delete mode 100644 auron-core/src/main/java/org/apache/auron/configuration/ConfigOptions.java delete mode 100644 auron-core/src/test/java/org/apache/auron/configuration/AuronConfigurationTest.java delete mode 100644 auron-core/src/test/java/org/apache/auron/configuration/ConfigOptionTest.java create mode 100644 spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfigurationDocGenerator.java delete mode 100644 spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java delete mode 100644 spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java delete mode 100644 spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala delete mode 100644 spark-extension/src/test/java/org/apache/auron/spark/configuration/SparkAuronConfigurationTest.java diff --git a/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java b/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java index d6acc4897..31017d8bd 100644 --- a/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java +++ b/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java @@ -23,26 +23,24 @@ */ public abstract class AuronConfiguration { - public static final ConfigOption BATCH_SIZE = ConfigOptions.key("auron.batchSize") - .description("Suggested batch size for arrow batches.") - .intType() - .defaultValue(10000); - - public static final ConfigOption MEMORY_FRACTION = ConfigOptions.key("auron.memoryFraction") - .description("Suggested fraction of off-heap memory used in native execution. " + public static final ConfigOption BATCH_SIZE = new ConfigOption<>(Integer.class) + .withKey("auron.batchSize") + .withDescription("Suggested batch size for arrow batches.") + .withDefaultValue(10000); + + public static final ConfigOption MEMORY_FRACTION = new ConfigOption<>(Double.class) + .withKey("auron.memoryFraction") + .withDescription("Suggested fraction of off-heap memory used in native execution. " + "actual off-heap memory usage is expected to be spark.executor.memoryOverhead * fraction.") - .doubleType() - .defaultValue(0.6); + .withDefaultValue(0.6); - public static final ConfigOption NATIVE_LOG_LEVEL = ConfigOptions.key("auron.native.log.level") - .description("Log level for native execution.") - .stringType() - .defaultValue("info"); + public static final ConfigOption NATIVE_LOG_LEVEL = new ConfigOption<>(String.class) + .withKey("auron.native.log.level") + .withDescription("Log level for native execution.") + .withDefaultValue("info"); public abstract Optional getOptional(ConfigOption option); - public abstract Optional getOptional(String key); - public T get(ConfigOption option) { return getOptional(option).orElseGet(() -> getOptionDefaultValue(option)); } @@ -57,18 +55,6 @@ public String getString(ConfigOption configOption) { return getOptional(configOption).orElseGet(() -> getOptionDefaultValue(configOption)); } - /** - * Returns the value associated with the given config option as a string. If no value is mapped - * under any key of the option, it returns the specified default instead of the option's default - * value. - * - * @param configOption The configuration option - * @return the (default) value associated with the given config option - */ - public String getString(ConfigOption configOption, String overrideDefault) { - return getOptional(configOption).orElse(overrideDefault); - } - /** * Returns the value associated with the given config option as an integer. * @@ -79,19 +65,6 @@ public int getInteger(ConfigOption configOption) { return getOptional(configOption).orElseGet(() -> getOptionDefaultValue(configOption)); } - /** - * Returns the value associated with the given config option as an integer. If no value is - * mapped under any key of the option, it returns the specified default instead of the option's - * default value. - * - * @param configOption The configuration option - * @param overrideDefault The value to return if no value was mapped for any key of the option - * @return the configured value associated with the given config option, or the overrideDefault - */ - public int getInteger(ConfigOption configOption, int overrideDefault) { - return getOptional(configOption).orElse(overrideDefault); - } - /** * Returns the value associated with the given config option as a long integer. * @@ -102,19 +75,6 @@ public long getLong(ConfigOption configOption) { return getOptional(configOption).orElseGet(() -> getOptionDefaultValue(configOption)); } - /** - * Returns the value associated with the given config option as a long integer. If no value is - * mapped under any key of the option, it returns the specified default instead of the option's - * default value. - * - * @param configOption The configuration option - * @param overrideDefault The value to return if no value was mapped for any key of the option - * @return the configured value associated with the given config option, or the overrideDefault - */ - public long getLong(ConfigOption configOption, long overrideDefault) { - return getOptional(configOption).orElse(overrideDefault); - } - /** * Returns the value associated with the given config option as a boolean. * @@ -148,19 +108,6 @@ public float getFloat(ConfigOption configOption) { return getOptional(configOption).orElseGet(() -> getOptionDefaultValue(configOption)); } - /** - * Returns the value associated with the given config option as a float. If no value is mapped - * under any key of the option, it returns the specified default instead of the option's default - * value. - * - * @param configOption The configuration option - * @param overrideDefault The value to return if no value was mapped for any key of the option - * @return the configured value associated with the given config option, or the overrideDefault - */ - public float getFloat(ConfigOption configOption, float overrideDefault) { - return getOptional(configOption).orElse(overrideDefault); - } - /** * Returns the value associated with the given config option as a {@code double}. * @@ -171,19 +118,6 @@ public double getDouble(ConfigOption configOption) { return getOptional(configOption).orElseGet(() -> getOptionDefaultValue(configOption)); } - /** - * Returns the value associated with the given config option as a {@code double}. If no value is - * mapped under any key of the option, it returns the specified default instead of the option's - * default value. - * - * @param configOption The configuration option - * @param overrideDefault The value to return if no value was mapped for any key of the option - * @return the configured value associated with the given config option, or the overrideDefault - */ - public double getDouble(ConfigOption configOption, double overrideDefault) { - return getOptional(configOption).orElse(overrideDefault); - } - /** * Returns the value associated with the given config option as a {@code double}. * diff --git a/auron-core/src/main/java/org/apache/auron/configuration/ConfigOption.java b/auron-core/src/main/java/org/apache/auron/configuration/ConfigOption.java index 925f3ddd8..04a4e0f22 100644 --- a/auron-core/src/main/java/org/apache/auron/configuration/ConfigOption.java +++ b/auron-core/src/main/java/org/apache/auron/configuration/ConfigOption.java @@ -16,9 +16,10 @@ */ package org.apache.auron.configuration; -import static org.apache.auron.util.Preconditions.checkNotNull; - +import java.util.ArrayList; +import java.util.List; import java.util.function.Function; +import org.apache.auron.jni.AuronAdaptor; /** * A {@code ConfigOption} describes a configuration parameter. It encapsulates the configuration @@ -35,16 +36,22 @@ public class ConfigOption { public static final String EMPTY_DESCRIPTION = ""; /** The current key for that config option. */ - private final String key; + private String key; + + /** The current key for that config option. */ + private List altKeys = new ArrayList<>(); /** The default value for this option. */ - private final T defaultValue; + private T defaultValue; + + /** The current category for that config option. */ + private String category = "Uncategorized"; /** The description for this option. */ - private final String description; + private String description; /** The function to compute the default value. */ - private final Function dynamicDefaultValueFunction; + private Function dynamicDefaultValueFunction; /** * Type of the value that this ConfigOption describes. @@ -55,27 +62,40 @@ public class ConfigOption { *
  • typeClass == atomic class and isList == true for {@code ConfigOption>} * */ - private final Class clazz; + private Class clazz; - /** - * Creates a new config option with fallback keys. - * - * @param key The current key for that config option - * @param clazz describes type of the ConfigOption, see description of the clazz field - * @param description Description for that option - * @param defaultValue The default value for this option - */ - ConfigOption( - String key, - Class clazz, - T defaultValue, - String description, - Function dynamicDefaultValueFunction) { - this.key = checkNotNull(key); - this.description = description; + public ConfigOption(Class clazz) { + this.clazz = clazz; + } + + public ConfigOption withKey(String key) { + this.key = key; + return this; + } + + public ConfigOption addAltKey(String altKey) { + this.altKeys.add(altKey); + return this; + } + + public ConfigOption withDefaultValue(T defaultValue) { this.defaultValue = defaultValue; - this.clazz = checkNotNull(clazz); + return this; + } + + public ConfigOption withCategory(String category) { + this.category = category; + return this; + } + + public ConfigOption withDescription(String description) { + this.description = description; + return this; + } + + public ConfigOption withDynamicDefaultValue(Function dynamicDefaultValueFunction) { this.dynamicDefaultValueFunction = dynamicDefaultValueFunction; + return this; } /** @@ -88,21 +108,24 @@ public String key() { } /** - * Gets the description of configuration key - * - * @return + * Gets the alternative configuration keys. */ - public String description() { - return description; + public List altKeys() { + return altKeys; } /** - * Checks if this option has a default value. - * - * @return True if it has a default value, false if not. + * Gets the category of configuration key */ - public boolean hasDefaultValue() { - return defaultValue != null; + public String category() { + return category; + } + + /** + * Gets the description of configuration key + */ + public String description() { + return description; } /** @@ -131,4 +154,22 @@ public boolean hasDynamicDefaultValue() { public Function dynamicDefaultValueFunction() { return dynamicDefaultValueFunction; } + + /** + * Gets the type class of the value that this ConfigOption describes. + * + * @return The type class of the value that this ConfigOption describes. + */ + public Class getValueClass() { + return clazz; + } + + /** + * Retrieves the current value of this configuration option. + * + * @return the current value associated with this configuration option. + */ + public T get() { + return AuronAdaptor.getInstance().getAuronConfiguration().get(this); + } } diff --git a/auron-core/src/main/java/org/apache/auron/configuration/ConfigOptions.java b/auron-core/src/main/java/org/apache/auron/configuration/ConfigOptions.java deleted file mode 100644 index 47efc98e3..000000000 --- a/auron-core/src/main/java/org/apache/auron/configuration/ConfigOptions.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.auron.configuration; - -import static org.apache.auron.util.Preconditions.checkNotNull; - -import java.util.function.Function; - -/** - * Refer to the design of the Flink engine. - * {@code ConfigOptions} are used to build a {@link ConfigOption}. The option is typically built in - * one of the following pattern: - * - *
    {@code
    - * // simple string-valued option with a default value
    - * ConfigOption tempDirs = ConfigOptions
    - *     .key("tmp.dir")
    - *     .stringType()
    - *     .defaultValue("/tmp");
    - *
    - * // simple string-valued option with a default value and with the description
    - * ConfigOption tempDirs = ConfigOptions
    - *     .key("tmp.dir")
    - *     .description("this is a example of string")
    - *     .stringType()
    - *     .defaultValue("/tmp");
    - *
    - * // simple integer-valued option with a default value
    - * ConfigOption batchSize = ConfigOptions
    - *     .key("batch.size")
    - *     .intType()
    - *     .defaultValue(100);
    - *
    - * // option with no default value
    - * ConfigOption userName = ConfigOptions
    - *     .key("user.name")
    - *     .stringType()
    - *     .noDefaultValue();
    - * }
    - */ -public class ConfigOptions { - - /** - * Starts building a new {@link ConfigOption}. - * - * @param key The key for the config option. - * @return The builder for the config option with the given key. - */ - public static OptionBuilder key(String key) { - checkNotNull(key); - return new OptionBuilder(key); - } - - // ------------------------------------------------------------------------ - - /** - * The option builder is used to create a {@link ConfigOption}. It is instantiated via {@link - * ConfigOptions#key(String)}. - */ - public static final class OptionBuilder { - - /** The key for the config option. */ - private final String key; - - private String description = ConfigOption.EMPTY_DESCRIPTION; - - /** - * Creates a new OptionBuilder. - * @param key The key for the config option - */ - OptionBuilder(String key) { - this.key = key; - } - - OptionBuilder(String key, String description) { - this.key = key; - this.description = description; - } - - public OptionBuilder description(String description) { - return new OptionBuilder(key, description); - } - - /** Defines that the value of the option should be of {@link Boolean} type. */ - public TypedConfigOptionBuilder booleanType() { - return new TypedConfigOptionBuilder<>(key, Boolean.class, description); - } - - /** Defines that the value of the option should be of {@link Integer} type. */ - public TypedConfigOptionBuilder intType() { - return new TypedConfigOptionBuilder<>(key, Integer.class, description); - } - - /** Defines that the value of the option should be of {@link Long} type. */ - public TypedConfigOptionBuilder longType() { - return new TypedConfigOptionBuilder<>(key, Long.class, description); - } - - /** Defines that the value of the option should be of {@link Float} type. */ - public TypedConfigOptionBuilder floatType() { - return new TypedConfigOptionBuilder<>(key, Float.class, description); - } - - /** Defines that the value of the option should be of {@link Double} type. */ - public TypedConfigOptionBuilder doubleType() { - return new TypedConfigOptionBuilder<>(key, Double.class, description); - } - - /** Defines that the value of the option should be of {@link String} type. */ - public TypedConfigOptionBuilder stringType() { - return new TypedConfigOptionBuilder<>(key, String.class, description); - } - } - - /** - * Builder for {@link ConfigOption} with a defined atomic type. - * - * @param atomic type of the option - */ - public static class TypedConfigOptionBuilder { - private final String key; - private final Class clazz; - - private final String description; - - TypedConfigOptionBuilder(String key, Class clazz, String description) { - this.key = key; - this.clazz = clazz; - this.description = description; - } - - /** - * Creates a ConfigOption with the given default value. - * - * @param value The default value for the config option - * @return The config option with the default value. - */ - public ConfigOption defaultValue(T value) { - return new ConfigOption<>(key, clazz, value, description, null); - } - - /** - * Creates a ConfigOption without a default value. - * - * @return The config option without a default value. - */ - public ConfigOption noDefaultValue() { - return new ConfigOption<>(key, clazz, null, description, null); - } - - public ConfigOption dynamicDefaultValue(Function dynamicDefaultValueFunction) { - return new ConfigOption<>(key, clazz, null, description, dynamicDefaultValueFunction); - } - } - - // ------------------------------------------------------------------------ - - /** Not intended to be instantiated. */ - private ConfigOptions() {} -} diff --git a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java index 4d7edbf8f..121970c24 100644 --- a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java +++ b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java @@ -23,10 +23,13 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import org.apache.auron.configuration.AuronConfiguration; +import org.apache.auron.configuration.ConfigOption; import org.apache.auron.functions.AuronUDFWrapperContext; import org.apache.auron.hadoop.fs.FSDataInputWrapper; import org.apache.auron.hadoop.fs.FSDataOutputWrapper; import org.apache.auron.memory.OnHeapSpillManager; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -106,4 +109,35 @@ public static String getDirectWriteSpillToDiskFile() throws IOException { public static AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer udfSerialized) { return AuronAdaptor.getInstance().getAuronUDFWrapperContext(udfSerialized); } + + public static int intConf(String confKey) { + return getConfValue(confKey); + } + + public static long longConf(String confKey) { + return getConfValue(confKey); + } + + public static double doubleConf(String confKey) { + return getConfValue(confKey); + } + + public static boolean booleanConf(String confKey) { + return getConfValue(confKey); + } + + public static String stringConf(String confKey) { + return getConfValue(confKey); + } + + static T getConfValue(String confKey) { + Class confClass = + AuronAdaptor.getInstance().getAuronConfiguration().getClass(); + try { + ConfigOption configOption = (ConfigOption) FieldUtils.readStaticField(confClass, confKey); + return configOption.get(); + } catch (IllegalAccessException | ClassCastException e) { + throw new RuntimeException("error reading conf value: " + confKey, e); + } + } } diff --git a/auron-core/src/test/java/org/apache/auron/configuration/AuronConfigurationTest.java b/auron-core/src/test/java/org/apache/auron/configuration/AuronConfigurationTest.java deleted file mode 100644 index afee2f594..000000000 --- a/auron-core/src/test/java/org/apache/auron/configuration/AuronConfigurationTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.auron.configuration; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** - * This is a test class for {@link AuronConfiguration}. - */ -public class AuronConfigurationTest { - - private MockAuronConfiguration config; - - @BeforeEach - public void setUp() { - config = new MockAuronConfiguration(); - config.addConfig(MockAuronConfiguration.STRING_WITHOUT_DEFAULT_CONFIG_OPTION.key(), "str1"); - config.addConfig(MockAuronConfiguration.INT_CONFIG_OPTION.key(), 100); - config.addConfig(MockAuronConfiguration.BOOLEAN_CONFIG_OPTION.key(), false); - config.addConfig(MockAuronConfiguration.DOUBLE_CONFIG_OPTION.key(), 99.9); - config.addConfig(MockAuronConfiguration.LONG_CONFIG_OPTION.key(), 10000000000L); - config.addConfig(MockAuronConfiguration.FLOAT_CONFIG_OPTION.key(), 1.2f); - } - - @Test - public void testGetConfig() { - assertEquals("str1", config.get(MockAuronConfiguration.STRING_WITHOUT_DEFAULT_CONFIG_OPTION)); - assertEquals("zm", config.get(MockAuronConfiguration.STRING_CONFIG_OPTION)); - assertEquals(100, config.getInteger(MockAuronConfiguration.INT_CONFIG_OPTION)); - assertEquals(false, config.get(MockAuronConfiguration.BOOLEAN_CONFIG_OPTION)); - assertEquals(99.9, config.get(MockAuronConfiguration.DOUBLE_CONFIG_OPTION), 0.0000000001); - assertEquals(10000000000L, config.getLong(MockAuronConfiguration.LONG_CONFIG_OPTION)); - assertEquals(1.2f, config.get(MockAuronConfiguration.FLOAT_CONFIG_OPTION), 0.0000000001); - // test dynamic default value - assertEquals(500, config.getInteger(MockAuronConfiguration.INT_WITH_DYNAMIC_DEFAULT_CONFIG_OPTION)); - } -} diff --git a/auron-core/src/test/java/org/apache/auron/configuration/ConfigOptionTest.java b/auron-core/src/test/java/org/apache/auron/configuration/ConfigOptionTest.java deleted file mode 100644 index 0242974a7..000000000 --- a/auron-core/src/test/java/org/apache/auron/configuration/ConfigOptionTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.auron.configuration; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import org.junit.jupiter.api.Test; - -/** Tests for the {@link ConfigOption}. */ -public class ConfigOptionTest { - - @Test - public void testConfigOption() { - ConfigOption keyOption = ConfigOptions.key("key").stringType().noDefaultValue(); - assertEquals("key", keyOption.key()); - assertEquals(null, keyOption.defaultValue()); - assertEquals(false, keyOption.hasDefaultValue()); - ConfigOption booleanOption = - ConfigOptions.key("boolean").booleanType().defaultValue(true); - assertEquals(true, booleanOption.defaultValue()); - } - - @Test - public void testConfigOptionAddDesc() { - ConfigOption keyOption = ConfigOptions.key("key") - .description("this is a description of the key") - .stringType() - .noDefaultValue(); - assertEquals("key", keyOption.key()); - assertEquals(null, keyOption.defaultValue()); - assertEquals(false, keyOption.hasDefaultValue()); - ConfigOption booleanOption = - ConfigOptions.key("boolean").booleanType().defaultValue(true); - assertEquals(true, booleanOption.defaultValue()); - assertEquals("this is a description of the key", keyOption.description()); - } -} diff --git a/auron-core/src/test/java/org/apache/auron/configuration/MockAuronConfiguration.java b/auron-core/src/test/java/org/apache/auron/configuration/MockAuronConfiguration.java index eb14c38c3..ec6098cc8 100644 --- a/auron-core/src/test/java/org/apache/auron/configuration/MockAuronConfiguration.java +++ b/auron-core/src/test/java/org/apache/auron/configuration/MockAuronConfiguration.java @@ -16,53 +16,51 @@ */ package org.apache.auron.configuration; -import java.util.HashMap; -import java.util.Map; import java.util.Optional; public class MockAuronConfiguration extends AuronConfiguration { - public static final ConfigOption STRING_CONFIG_OPTION = - ConfigOptions.key("string").stringType().defaultValue("zm"); + // Basic configuration options with descriptions + public static final ConfigOption STRING_CONFIG_OPTION = new ConfigOption<>(String.class) + .withKey("string") + .withDescription("A string configuration option for testing.") + .withDefaultValue("zm"); - public static final ConfigOption STRING_WITHOUT_DEFAULT_CONFIG_OPTION = - ConfigOptions.key("string_without_default").stringType().noDefaultValue(); + public static final ConfigOption INT_CONFIG_OPTION = new ConfigOption<>(Integer.class) + .withKey("int") + .withDescription("An integer configuration option for testing.") + .withDefaultValue(1); - public static final ConfigOption INT_CONFIG_OPTION = - ConfigOptions.key("int").intType().defaultValue(1); + public static final ConfigOption LONG_CONFIG_OPTION = new ConfigOption<>(Long.class) + .withKey("long") + .withDescription("A long configuration option for testing.") + .withDefaultValue(1L); - public static final ConfigOption LONG_CONFIG_OPTION = - ConfigOptions.key("long").longType().defaultValue(1L); + public static final ConfigOption BOOLEAN_CONFIG_OPTION = new ConfigOption<>(Boolean.class) + .withKey("boolean") + .withDescription("A boolean configuration option for testing.") + .withDefaultValue(true); - public static final ConfigOption BOOLEAN_CONFIG_OPTION = - ConfigOptions.key("boolean").booleanType().defaultValue(true); + public static final ConfigOption DOUBLE_CONFIG_OPTION = new ConfigOption<>(Double.class) + .withKey("double") + .withDescription("A double configuration option for testing.") + .withDefaultValue(1.0); - public static final ConfigOption DOUBLE_CONFIG_OPTION = - ConfigOptions.key("double").doubleType().defaultValue(1.0); + public static final ConfigOption FLOAT_CONFIG_OPTION = new ConfigOption<>(Float.class) + .withKey("float") + .withDescription("A float configuration option for testing.") + .withDefaultValue(1.0f); - public static final ConfigOption FLOAT_CONFIG_OPTION = - ConfigOptions.key("float").floatType().defaultValue(1.0f); - - public static final ConfigOption INT_WITH_DYNAMIC_DEFAULT_CONFIG_OPTION = ConfigOptions.key( - "int_with_dynamic_default") - .intType() - .dynamicDefaultValue(config -> config.getInteger(INT_CONFIG_OPTION) * 5); - - private Map configMap = new HashMap<>(); + public static final ConfigOption INT_WITH_DYNAMIC_DEFAULT_CONFIG_OPTION = new ConfigOption<>(Integer.class) + .withKey("int_with_dynamic_default") + .withDescription("An integer configuration option with dynamic default value.") + .withDynamicDefaultValue( + config -> config.getOptional(INT_CONFIG_OPTION).orElse(1) * 5); public MockAuronConfiguration() {} - public void addConfig(String key, Object value) { - configMap.put(key, value); - } - @Override public Optional getOptional(ConfigOption option) { - return Optional.ofNullable((T) configMap.getOrDefault(option.key(), getOptionDefaultValue(option))); - } - - @Override - public Optional getOptional(String key) { - return Optional.ofNullable((T) configMap.get(key)); + return Optional.empty(); // always use default value } } diff --git a/native-engine/auron-jni-bridge/src/conf.rs b/native-engine/auron-jni-bridge/src/conf.rs index 383596d60..351432f96 100644 --- a/native-engine/auron-jni-bridge/src/conf.rs +++ b/native-engine/auron-jni-bridge/src/conf.rs @@ -67,7 +67,7 @@ pub trait BooleanConf { fn value(&self) -> Result { ensure_jni_bridge_inited()?; let key = jni_new_string!(self.key())?; - jni_call_static!(AuronConf.booleanConf(key.as_obj()) -> bool) + jni_call_static!(JniBridge.booleanConf(key.as_obj()) -> bool) } } @@ -76,7 +76,7 @@ pub trait IntConf { fn value(&self) -> Result { ensure_jni_bridge_inited()?; let key = jni_new_string!(self.key())?; - jni_call_static!(AuronConf.intConf(key.as_obj()) -> i32) + jni_call_static!(JniBridge.intConf(key.as_obj()) -> i32) } } @@ -85,7 +85,7 @@ pub trait LongConf { fn value(&self) -> Result { ensure_jni_bridge_inited()?; let key = jni_new_string!(self.key())?; - jni_call_static!(AuronConf.longConf(key.as_obj()) -> i64) + jni_call_static!(JniBridge.longConf(key.as_obj()) -> i64) } } @@ -94,7 +94,7 @@ pub trait DoubleConf { fn value(&self) -> Result { ensure_jni_bridge_inited()?; let key = jni_new_string!(self.key())?; - jni_call_static!(AuronConf.doubleConf(key.as_obj()) -> f64) + jni_call_static!(JniBridge.doubleConf(key.as_obj()) -> f64) } } @@ -104,7 +104,7 @@ pub trait StringConf { ensure_jni_bridge_inited()?; let key = jni_new_string!(self.key())?; let value = jni_get_string!( - jni_call_static!(AuronConf.stringConf(key.as_obj()) -> JObject)? + jni_call_static!(JniBridge.stringConf(key.as_obj()) -> JObject)? .as_obj() .into() )?; diff --git a/native-engine/auron-jni-bridge/src/jni_bridge.rs b/native-engine/auron-jni-bridge/src/jni_bridge.rs index 5b76d8290..f058901ae 100644 --- a/native-engine/auron-jni-bridge/src/jni_bridge.rs +++ b/native-engine/auron-jni-bridge/src/jni_bridge.rs @@ -447,7 +447,6 @@ pub struct JavaClasses<'a> { pub cSparkUDAFWrapperContext: SparkUDAFWrapperContext<'a>, pub cSparkUDTFWrapperContext: SparkUDTFWrapperContext<'a>, pub cSparkUDAFMemTracker: SparkUDAFMemTracker<'a>, - pub cAuronConf: AuronConf<'a>, pub cAuronRssPartitionWriterBase: AuronRssPartitionWriterBase<'a>, pub cAuronCallNativeWrapper: AuronCallNativeWrapper<'a>, pub cAuronOnHeapSpillManager: AuronOnHeapSpillManager<'a>, @@ -513,7 +512,6 @@ impl JavaClasses<'static> { cSparkUDAFWrapperContext: SparkUDAFWrapperContext::new(env)?, cSparkUDTFWrapperContext: SparkUDTFWrapperContext::new(env)?, cSparkUDAFMemTracker: SparkUDAFMemTracker::new(env)?, - cAuronConf: AuronConf::new(env)?, cAuronRssPartitionWriterBase: AuronRssPartitionWriterBase::new(env)?, cAuronCallNativeWrapper: AuronCallNativeWrapper::new(env)?, cAuronOnHeapSpillManager: AuronOnHeapSpillManager::new(env)?, @@ -577,9 +575,18 @@ pub struct JniBridge<'a> { pub method_getTotalMemoryLimited_ret: ReturnType, pub method_getDirectWriteSpillToDiskFile: JStaticMethodID, pub method_getDirectWriteSpillToDiskFile_ret: ReturnType, - pub method_getAuronUDFWrapperContext: JStaticMethodID, pub method_getAuronUDFWrapperContext_ret: ReturnType, + pub method_intConf: JStaticMethodID, + pub method_intConf_ret: ReturnType, + pub method_longConf: JStaticMethodID, + pub method_longConf_ret: ReturnType, + pub method_doubleConf: JStaticMethodID, + pub method_doubleConf_ret: ReturnType, + pub method_booleanConf: JStaticMethodID, + pub method_booleanConf_ret: ReturnType, + pub method_stringConf: JStaticMethodID, + pub method_stringConf_ret: ReturnType, } impl<'a> JniBridge<'a> { pub const SIG_TYPE: &'static str = "org/apache/auron/jni/JniBridge"; @@ -663,6 +670,36 @@ impl<'a> JniBridge<'a> { "(Ljava/nio/ByteBuffer;)Lorg/apache/auron/functions/AuronUDFWrapperContext;", )?, method_getAuronUDFWrapperContext_ret: ReturnType::Object, + method_intConf: env.get_static_method_id( + class, + "intConf", + "(Ljava/lang/String;)I", + )?, + method_intConf_ret: ReturnType::Primitive(Primitive::Int), + method_longConf: env.get_static_method_id( + class, + "longConf", + "(Ljava/lang/String;)J", + )?, + method_longConf_ret: ReturnType::Primitive(Primitive::Long), + method_doubleConf: env.get_static_method_id( + class, + "doubleConf", + "(Ljava/lang/String;)D", + )?, + method_doubleConf_ret: ReturnType::Primitive(Primitive::Double), + method_booleanConf: env.get_static_method_id( + class, + "booleanConf", + "(Ljava/lang/String;)Z", + )?, + method_booleanConf_ret: ReturnType::Primitive(Primitive::Boolean), + method_stringConf: env.get_static_method_id( + class, + "stringConf", + "(Ljava/lang/String;)Ljava/lang/String;", + )?, + method_stringConf_ret: ReturnType::Object, }) } } @@ -1121,58 +1158,6 @@ impl<'a> SparkMetricNode<'a> { } } -#[allow(non_snake_case)] -pub struct AuronConf<'a> { - pub class: JClass<'a>, - pub method_booleanConf: JStaticMethodID, - pub method_booleanConf_ret: ReturnType, - pub method_intConf: JStaticMethodID, - pub method_intConf_ret: ReturnType, - pub method_longConf: JStaticMethodID, - pub method_longConf_ret: ReturnType, - pub method_doubleConf: JStaticMethodID, - pub method_doubleConf_ret: ReturnType, - pub method_stringConf: JStaticMethodID, - pub method_stringConf_ret: ReturnType, -} - -impl<'a> AuronConf<'_> { - pub const SIG_TYPE: &'static str = "org/apache/spark/sql/auron/AuronConf"; - - pub fn new(env: &JNIEnv<'a>) -> JniResult> { - let class = get_global_jclass(env, Self::SIG_TYPE)?; - Ok(AuronConf { - class, - method_booleanConf: env.get_static_method_id( - class, - "booleanConf", - "(Ljava/lang/String;)Z", - )?, - method_booleanConf_ret: ReturnType::Primitive(Primitive::Boolean), - method_intConf: env.get_static_method_id(class, "intConf", "(Ljava/lang/String;)I")?, - method_intConf_ret: ReturnType::Primitive(Primitive::Int), - method_longConf: env.get_static_method_id( - class, - "longConf", - "(Ljava/lang/String;)J", - )?, - method_longConf_ret: ReturnType::Primitive(Primitive::Long), - method_doubleConf: env.get_static_method_id( - class, - "doubleConf", - "(Ljava/lang/String;)D", - )?, - method_doubleConf_ret: ReturnType::Primitive(Primitive::Double), - method_stringConf: env.get_static_method_id( - class, - "stringConf", - "(Ljava/lang/String;)Ljava/lang/String;", - )?, - method_stringConf_ret: ReturnType::Object, - }) - } -} - #[allow(non_snake_case)] pub struct AuronRssPartitionWriterBase<'a> { pub class: JClass<'a>, diff --git a/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java b/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java index 8d7d0dd52..3cc6cbd11 100644 --- a/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java +++ b/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java @@ -18,6 +18,7 @@ import net.bytebuddy.implementation.bind.annotation.Argument; import net.bytebuddy.implementation.bind.annotation.RuntimeType; +import org.apache.auron.spark.configuration.SparkAuronConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,6 +28,6 @@ public class ForceApplyShuffledHashJoinInterceptor { @RuntimeType public static Object intercept(@Argument(0) Object conf) { logger.debug("calling JoinSelectionHelper.forceApplyShuffledHashJoin() intercepted by auron"); - return AuronConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf(); + return SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN.get(); } } diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala index c3a1861de..6353d7cb7 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala @@ -107,6 +107,7 @@ import org.apache.spark.storage.FileSegment import org.apache.auron.{protobuf => pb, sparkver} import org.apache.auron.common.AuronBuildInfo import org.apache.auron.metric.SparkMetricNode +import org.apache.auron.spark.configuration.SparkAuronConfiguration import org.apache.auron.spark.ui.AuronBuildInfoEvent class ShimsImpl extends Shims with Logging { @@ -128,7 +129,7 @@ class ShimsImpl extends Shims with Logging { override def initExtension(): Unit = { ValidateSparkPlanInjector.inject() - if (AuronConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf()) { + if (SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN.get()) { ForceApplyShuffledHashJoinInjector.inject() } @@ -141,19 +142,18 @@ class ShimsImpl extends Shims with Logging { @sparkver("3.0 / 3.1") override def initExtension(): Unit = { - if (AuronConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf()) { - logWarning(s"${AuronConf.FORCE_SHUFFLED_HASH_JOIN.key} is not supported in $shimVersion") + if (SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN.get()) { + logWarning( + s"${SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN.key} is not supported in $shimVersion") } } // set Auron spark ui if spark.auron.ui.enabled is true override def onApplyingExtension(): Unit = { - logInfo( - " onApplyingExtension get ui_enabled : " + SparkEnv.get.conf - .get(AuronConf.UI_ENABLED.key, "true")) + logInfo(s"onApplyingExtension get ui_enabled: ${SparkAuronConfiguration.UI_ENABLED.get()}") - if (SparkEnv.get.conf.get(AuronConf.UI_ENABLED.key, "true").equals("true")) { + if (SparkAuronConfiguration.UI_ENABLED.get()) { val sparkContext = SparkContext.getActive.getOrElse { throw new IllegalStateException("No active spark context found that should not happen") } diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala index 3e9909789..e82eb78f3 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala @@ -17,9 +17,9 @@ package org.apache.auron import org.apache.spark.sql.{AuronQueryTest, Row} -import org.apache.spark.sql.auron.AuronConf import org.apache.spark.sql.execution.joins.auron.plan.NativeBroadcastJoinExec +import org.apache.auron.spark.configuration.SparkAuronConfiguration import org.apache.auron.util.AuronTestUtils class AuronQuerySuite extends AuronQueryTest with BaseAuronSQLSuite with AuronSQLTestHelper { @@ -197,7 +197,7 @@ class AuronQuerySuite extends AuronQueryTest with BaseAuronSQLSuite with AuronSQ if (AuronTestUtils.isSparkV32OrGreater) { Seq(true, false).foreach { forcePositionalEvolution => withEnvConf( - AuronConf.ORC_FORCE_POSITIONAL_EVOLUTION.key -> forcePositionalEvolution.toString) { + SparkAuronConfiguration.ORC_FORCE_POSITIONAL_EVOLUTION.key -> forcePositionalEvolution.toString) { withTempPath { f => val path = f.getCanonicalPath Seq[(Integer, Integer)]((1, 2), (3, 4), (5, 6), (null, null)) @@ -220,7 +220,7 @@ class AuronQuerySuite extends AuronQueryTest with BaseAuronSQLSuite with AuronSQ if (AuronTestUtils.isSparkV32OrGreater) { Seq(true, false).foreach { forcePositionalEvolution => withEnvConf( - AuronConf.ORC_FORCE_POSITIONAL_EVOLUTION.key -> forcePositionalEvolution.toString) { + SparkAuronConfiguration.ORC_FORCE_POSITIONAL_EVOLUTION.key -> forcePositionalEvolution.toString) { withTempPath { f => val path = f.getCanonicalPath Seq[(Integer, Integer, Integer)]((1, 2, 1), (3, 4, 2), (5, 6, 3), (null, null, 4)) diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/auron/NativeConvertersSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/auron/NativeConvertersSuite.scala index dc96731cc..8bc931905 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/auron/NativeConvertersSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/auron/NativeConvertersSuite.scala @@ -17,7 +17,7 @@ package org.apache.auron import org.apache.spark.sql.AuronQueryTest -import org.apache.spark.sql.auron.{AuronConf, NativeConverters} +import org.apache.spark.sql.auron.NativeConverters import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, StringType} @@ -51,25 +51,25 @@ class NativeConvertersSuite } test("cast from string to numeric adds trim wrapper before native cast when enabled") { - withSQLConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "true") { + withSQLConf("spark.auron.cast.trimString" -> "true") { assertTrimmedCast(" 42 ", IntegerType) } } test("cast from string to boolean adds trim wrapper before native cast when enabled") { - withSQLConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "true") { + withSQLConf("spark.auron.cast.trimString" -> "true") { assertTrimmedCast(" true ", BooleanType) } } test("cast trim disabled via auron conf") { - withSQLConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "false") { + withSQLConf("spark.auron.cast.trimString" -> "false") { assertNonTrimmedCast(" 42 ", IntegerType) } } test("cast trim disabled via auron conf for boolean cast") { - withSQLConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "false") { + withSQLConf("spark.auron.cast.trimString" -> "false") { assertNonTrimmedCast(" true ", BooleanType) } } diff --git a/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java b/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java index c88f84570..005e97534 100644 --- a/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java +++ b/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java @@ -28,7 +28,6 @@ import org.apache.auron.spark.configuration.SparkAuronConfiguration; import org.apache.auron.spark.sql.SparkAuronUDFWrapperContext; import org.apache.spark.SparkEnv; -import org.apache.spark.SparkEnv$; import org.apache.spark.TaskContext; import org.apache.spark.TaskContext$; import org.apache.spark.sql.auron.NativeHelper$; @@ -88,7 +87,7 @@ public OnHeapSpillManager getOnHeapSpillManager() { @Override public AuronConfiguration getAuronConfiguration() { - return new SparkAuronConfiguration(SparkEnv$.MODULE$.get().conf()); + return new SparkAuronConfiguration(); } @Override diff --git a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java index 8e66efb1e..8846700f1 100644 --- a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java +++ b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java @@ -16,15 +16,16 @@ */ package org.apache.auron.spark.configuration; -import static org.apache.auron.util.Preconditions.checkNotNull; - +import java.util.List; import java.util.Optional; +import java.util.function.Supplier; import org.apache.auron.configuration.AuronConfiguration; import org.apache.auron.configuration.ConfigOption; -import org.apache.auron.configuration.ConfigOptions; -import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.SparkEnv; import org.apache.spark.internal.config.ConfigEntry; -import org.apache.spark.internal.config.ConfigEntryWithDefault; +import org.apache.spark.internal.config.ConfigEntryWithDefaultFunction; +import org.apache.spark.sql.internal.SQLConf; import scala.Option; import scala.collection.immutable.List$; @@ -38,283 +39,592 @@ public class SparkAuronConfiguration extends AuronConfiguration { // please manually add the prefix. public static final String SPARK_PREFIX = "spark."; - public static final ConfigOption UI_ENABLED = ConfigOptions.key("auron.ui.enabled") - .description("support spark.auron.ui.enabled.") - .booleanType() - .defaultValue(true); - - public static final ConfigOption PROCESS_MEMORY_FRACTION = ConfigOptions.key( - "auron.process.vmrss.memoryFraction") - .description("suggested fraction of process total memory (on-heap and off-heap). " - + "this limit is for process's resident memory usage.") - .doubleType() - .defaultValue(0.9); - - public static final ConfigOption CASE_CONVERT_FUNCTIONS_ENABLE = ConfigOptions.key( - "auron.enable.caseconvert.functions") - .description("enable converting upper/lower functions to native, special cases may provide different, " - + "outputs from spark due to different unicode versions. ") - .booleanType() - .defaultValue(true); - - public static final ConfigOption INPUT_BATCH_STATISTICS_ENABLE = ConfigOptions.key( - "auron.enableInputBatchStatistics") - .description("enable extra metrics of input batch statistics. ") - .booleanType() - .defaultValue(true); - - public static final ConfigOption UDAF_FALLBACK_ENABLE = ConfigOptions.key("auron.udafFallback.enable") - .description("supports UDAF and other aggregate functions not implemented. ") - .booleanType() - .defaultValue(true); - - public static final ConfigOption SUGGESTED_UDAF_ROW_MEM_USAGE = ConfigOptions.key( - "auron.suggested.udaf.memUsedSize") - .description("TypedImperativeAggregate one row mem use size. ") - .intType() - .defaultValue(64); - - public static final ConfigOption UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG = ConfigOptions.key( - "auron.udafFallback.num.udafs.trigger.sortAgg") - .description( - "number of udafs to trigger sort-based aggregation, by default, all aggs containing udafs are converted to sort-based.") - .intType() - .defaultValue(1); - - public static final ConfigOption UDAF_FALLBACK_ESTIM_ROW_SIZE = ConfigOptions.key( - "auron.udafFallback.typedImperativeEstimatedRowSize") - .description("TypedImperativeAggregate one row mem use size.") - .intType() - .defaultValue(256); - - public static final ConfigOption CAST_STRING_TRIM_ENABLE = ConfigOptions.key("auron.cast.trimString") - .description("enable trimming string inputs before casting to numeric/boolean types. ") - .booleanType() - .defaultValue(true); - - public static final ConfigOption IGNORE_CORRUPTED_FILES = ConfigOptions.key("files.ignoreCorruptFiles") - .description("ignore corrupted input files. ") - .booleanType() - .defaultValue(false); - - public static final ConfigOption PARTIAL_AGG_SKIPPING_ENABLE = ConfigOptions.key( - "auron.partialAggSkipping.enable") - .description("enable partial aggregate skipping (see https://github.com/apache/auron/issues/327). ") - .booleanType() - .defaultValue(true); - - public static final ConfigOption PARTIAL_AGG_SKIPPING_RATIO = ConfigOptions.key( - "auron.partialAggSkipping.ratio") - .description("partial aggregate skipping ratio. ") - .doubleType() - .defaultValue(0.9); - - public static final ConfigOption PARTIAL_AGG_SKIPPING_MIN_ROWS = ConfigOptions.key( - "auron.partialAggSkipping.minRows") - .description("minimum number of rows to trigger partial aggregate skipping.") - .intType() - .dynamicDefaultValue( + public static final ConfigOption AURON_ENABLED = new SQLConfOption<>(Boolean.class) + .withKey("auron.enabled") + .addAltKey("auron.enable") + .withCategory("Runtime Configuration") + .withDescription("Enable Spark Auron support to accelerate query execution with native implementations.") + .withDefaultValue(true); + + public static final ConfigOption UI_ENABLED = new ConfigOption<>(Boolean.class) + .withKey("auron.ui.enabled") + .addAltKey("auron.ui.enable") + .withCategory("Runtime Configuration") + .withDescription( + "Enable Spark Auron UI support to display Auron-specific metrics and statistics in Spark UI.") + .withDefaultValue(true); + + public static final ConfigOption PROCESS_MEMORY_FRACTION = new ConfigOption<>(Double.class) + .withKey("auron.process.vmrss.memoryFraction") + .withCategory("Runtime Configuration") + .withDescription( + "Suggested fraction of process total memory (on-heap and off-heap) to use for resident memory. " + + "This controls the memory limit for the process's virtual memory resident set size (VMRSS).") + .withDefaultValue(0.9); + + public static final ConfigOption CASE_CONVERT_FUNCTIONS_ENABLE = new ConfigOption<>(Boolean.class) + .withKey("auron.enable.caseconvert.functions") + .withCategory("Expression/Function Supports") + .withDescription( + "Enable converting UPPER/LOWER string functions to native implementations for better performance. " + + "Note: May produce different outputs from Spark in special cases due to different Unicode versions.") + .withDefaultValue(true); + + public static final ConfigOption INPUT_BATCH_STATISTICS_ENABLE = new ConfigOption<>(Boolean.class) + .withKey("auron.enableInputBatchStatistics") + .withCategory("Runtime Configuration") + .withDescription( + "Enable collection of additional metrics for input batch statistics to monitor data processing performance.") + .withDefaultValue(true); + + public static final ConfigOption UDAF_FALLBACK_ENABLE = new ConfigOption<>(Boolean.class) + .withKey("auron.udafFallback.enable") + .withCategory("UDAF Fallback") + .withDescription( + "Enable fallback support for UDAF and other aggregate functions that are not implemented in Auron, " + + "allowing them to be executed using Spark's native implementation.") + .withDefaultValue(true); + + public static final ConfigOption SUGGESTED_UDAF_ROW_MEM_USAGE = new ConfigOption<>(Integer.class) + .withKey("auron.suggested.udaf.memUsedSize") + .withCategory("UDAF Fallback") + .withDescription("Suggested memory usage size per row for TypedImperativeAggregate functions in bytes. " + + "This helps in memory allocation planning for UDAF operations.") + .withDefaultValue(64); + + public static final ConfigOption UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG = new ConfigOption<>( + Integer.class) + .withKey("auron.udafFallback.num.udafs.trigger.sortAgg") + .withCategory("UDAF Fallback") + .withDescription( + "Number of UDAFs to trigger sort-based aggregation, by default, all aggs containing udafs are converted to sort-based.") + .withDefaultValue(1); + + public static final ConfigOption UDAF_FALLBACK_ESTIM_ROW_SIZE = new ConfigOption<>(Integer.class) + .withKey("auron.udafFallback.typedImperativeEstimatedRowSize") + .withCategory("UDAF Fallback") + .withDescription("Estimated memory size per row for TypedImperativeAggregate functions in bytes. " + + "This estimation is used for memory planning and allocation during UDAF fallback operations.") + .withDefaultValue(256); + + public static final ConfigOption CAST_STRING_TRIM_ENABLE = new SQLConfOption<>(Boolean.class) + .withKey("auron.cast.trimString") + .withCategory("Expression/Function Supports") + .withDescription( + "Enable automatic trimming of whitespace from string inputs before casting to numeric or boolean types. " + + "This helps prevent casting errors due to leading/trailing whitespace.") + .withDefaultValue(true); + + public static final ConfigOption IGNORE_CORRUPTED_FILES = new ConfigOption<>(Boolean.class) + .withKey("auron.files.ignoreCorruptFiles") + .withCategory("Data Sources") + .withDescription("Ignore corrupted input files, defaults to spark.sql.files.ignoreCorruptFiles") + .withDynamicDefaultValue( + conf -> SparkEnv.get().conf().getBoolean("spark.sql.files.ignoreCorruptFiles", false)); + + public static final ConfigOption PARTIAL_AGG_SKIPPING_ENABLE = new ConfigOption<>(Boolean.class) + .withKey("auron.partialAggSkipping.enable") + .withCategory("Partial Aggregate Skipping") + .withDescription( + "Enable partial aggregate skipping optimization to improve performance by skipping unnecessary " + + "partial aggregation stages when certain conditions are met. See issue #327 for detailed implementation.") + .withDefaultValue(true); + + public static final ConfigOption PARTIAL_AGG_SKIPPING_RATIO = new ConfigOption<>(Double.class) + .withKey("auron.partialAggSkipping.ratio") + .withCategory("Partial Aggregate Skipping") + .withDescription( + "Threshold ratio for partial aggregate skipping optimization. When the ratio of unique keys to total rows " + + "exceeds this value, partial aggregation may be skipped to improve performance.") + .withDefaultValue(0.9); + + public static final ConfigOption PARTIAL_AGG_SKIPPING_MIN_ROWS = new ConfigOption<>(Integer.class) + .withKey("auron.partialAggSkipping.minRows") + .withCategory("Partial Aggregate Skipping") + .withDescription("Minimum number of rows required to trigger partial aggregate skipping optimization. " + + "This prevents the optimization from being applied to very small datasets where it may not be beneficial. " + + "Defaults to spark.auron.batchSize * 5") + .withDynamicDefaultValue( config -> config.getOptional(AuronConfiguration.BATCH_SIZE).get() * 5); - public static final ConfigOption PARTIAL_AGG_SKIPPING_SKIP_SPILL = ConfigOptions.key( - "auron.partialAggSkipping.skipSpill") - .description("always skip partial aggregate when triggered spilling. ") - .booleanType() - .defaultValue(false); - - public static final ConfigOption PARQUET_ENABLE_PAGE_FILTERING = ConfigOptions.key( - "auron.parquet.enable.pageFiltering") - .description("parquet enable page filtering. ") - .booleanType() - .defaultValue(false); - - public static final ConfigOption PARQUET_ENABLE_BLOOM_FILTER = ConfigOptions.key( - "auron.parquet.enable.bloomFilter") - .description("parquet enable bloom filter. ") - .booleanType() - .defaultValue(false); - - public static final ConfigOption PARQUET_MAX_OVER_READ_SIZE = ConfigOptions.key( - "auron.parquet.maxOverReadSize") - .description("parquet max over read size.") - .intType() - .defaultValue(16384); - - public static final ConfigOption PARQUET_METADATA_CACHE_SIZE = ConfigOptions.key( - "auron.parquet.metadataCacheSize") - .description("parquet metadata cache size.") - .intType() - .defaultValue(5); - - public static final ConfigOption SPARK_IO_COMPRESSION_CODEC = ConfigOptions.key("io.compression.codec") - .description("spark io compression codec.") - .stringType() - .defaultValue("lz4"); - - public static final ConfigOption SPARK_IO_COMPRESSION_ZSTD_LEVEL = ConfigOptions.key( - "io.compression.zstd.level") - .description("spark io compression zstd level.") - .intType() - .defaultValue(1); - - public static final ConfigOption TOKIO_WORKER_THREADS_PER_CPU = ConfigOptions.key( - "auron.tokio.worker.threads.per.cpu") - .description("tokio worker threads per cpu (spark.task.cpus), 0 for auto detection.") - .intType() - .defaultValue(0); - - public static final ConfigOption SPARK_TASK_CPUS = ConfigOptions.key("task.cpus") - .description("number of cpus per task.") - .intType() - .defaultValue(1); - - public static final ConfigOption FORCE_SHUFFLED_HASH_JOIN = ConfigOptions.key( - "auron.forceShuffledHashJoin") - .description("replace all sort-merge join to shuffled-hash join, only used for benchmarking. ") - .booleanType() - .defaultValue(false); - - public static final ConfigOption SHUFFLE_COMPRESSION_TARGET_BUF_SIZE = ConfigOptions.key( - "auron.shuffle.compression.targetBufSize") - .description("shuffle compression target buffer size, default is 4MB.") - .intType() - .defaultValue(4194304); - - public static final ConfigOption SPILL_COMPRESSION_CODEC = ConfigOptions.key( - "auron.spill.compression.codec") - .description("spark spill compression codec.") - .stringType() - .defaultValue("lz4"); - - public static final ConfigOption SMJ_FALLBACK_ENABLE = ConfigOptions.key("auron.smjfallback.enable") - .description("enable hash join falling back to sort merge join when hash table is too big. ") - .booleanType() - .defaultValue(false); - - public static final ConfigOption SMJ_FALLBACK_ROWS_THRESHOLD = ConfigOptions.key( - "auron.smjfallback.rows.threshold") - .description("smj fallback threshold.") - .intType() - .defaultValue(10000000); - - public static final ConfigOption SMJ_FALLBACK_MEM_SIZE_THRESHOLD = ConfigOptions.key( - "auron.smjfallback.mem.threshold") - .description("smj fallback mem threshold.") - .intType() - .defaultValue(134217728); - - public static final ConfigOption ON_HEAP_SPILL_MEM_FRACTION = ConfigOptions.key( - "auron.onHeapSpill.memoryFraction") - .description("max memory fraction of on-heap spills. ") - .doubleType() - .defaultValue(0.9); - - public static final ConfigOption SUGGESTED_BATCH_MEM_SIZE = ConfigOptions.key( - "auron.suggested.batch.memSize") - .description("suggested memory size for record batch.") - .intType() - .defaultValue(8388608); - - public static final ConfigOption PARSE_JSON_ERROR_FALLBACK = ConfigOptions.key( - "auron.parseJsonError.fallback") - .description("fallback to UDFJson when error parsing json in native implementation. ") - .booleanType() - .defaultValue(true); - - public static final ConfigOption SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE = ConfigOptions.key( - "auron.suggested.batch.memSize.multiwayMerging") - .description("suggested memory size for k-way merging use smaller batch memory size for " - + "k-way merging since there will be multiple batches in memory at the same time.") - .intType() - .defaultValue(1048576); - public static final ConfigOption ORC_FORCE_POSITIONAL_EVOLUTION = ConfigOptions.key( - "auron.orc.force.positional.evolution") - .description("orc force positional evolution. ") - .booleanType() - .defaultValue(false); - public static final ConfigOption ORC_TIMESTAMP_USE_MICROSECOND = ConfigOptions.key( - "auron.orc.timestamp.use.microsecond") - .description("use microsecond precision when reading ORC timestamp columns. ") - .booleanType() - .defaultValue(false); - public static final ConfigOption ORC_SCHEMA_CASE_SENSITIVE = ConfigOptions.key( - "auron.orc.schema.caseSensitive.enable") - .description("whether ORC file schema matching distinguishes between uppercase and lowercase. ") - .booleanType() - .defaultValue(false); - - public static final ConfigOption FORCE_SHORT_CIRCUIT_AND_OR = ConfigOptions.key( - "auron.forceShortCircuitAndOr") - .description("force using short-circuit evaluation (PhysicalSCAndExprNode/PhysicalSCOrExprNode) " - + "for And/Or expressions, regardless of whether rhs contains HiveUDF. ") - .booleanType() - .defaultValue(false); - - private final SparkConf sparkConf; - - public SparkAuronConfiguration(SparkConf conf) { - this.sparkConf = checkNotNull(conf, "spark conf cannot be null"); - } + public static final ConfigOption PARTIAL_AGG_SKIPPING_SKIP_SPILL = new ConfigOption<>(Boolean.class) + .withKey("auron.partialAggSkipping.skipSpill") + .withCategory("Partial Aggregate Skipping") + .withDescription("Always skip partial aggregation when spilling is triggered to prevent memory pressure. " + + "When enabled, the system will bypass partial aggregation stages if memory spilling occurs, " + + "potentially trading off some optimization for memory stability.") + .withDefaultValue(false); + + public static final ConfigOption PARQUET_ENABLE_PAGE_FILTERING = new ConfigOption<>(Boolean.class) + .withKey("auron.parquet.enable.pageFiltering") + .withCategory("Data Sources") + .withDescription( + "Enable Parquet page-level filtering to skip reading unnecessary data pages during query execution. " + + "This optimization can significantly improve read performance by avoiding I/O for pages that don't match filter predicates.") + .withDefaultValue(false); + + public static final ConfigOption PARQUET_ENABLE_BLOOM_FILTER = new ConfigOption<>(Boolean.class) + .withKey("auron.parquet.enable.bloomFilter") + .withCategory("Data Sources") + .withDescription( + "Enable Parquet bloom filter support for efficient equality predicate filtering. " + + "Bloom filters can quickly determine if a value might exist in a data block, reducing unnecessary I/O operations.") + .withDefaultValue(false); + + public static final ConfigOption PARQUET_MAX_OVER_READ_SIZE = new ConfigOption<>(Integer.class) + .withKey("auron.parquet.maxOverReadSize") + .withCategory("Data Sources") + .withDescription( + "Maximum over-read size in bytes for Parquet file operations. This controls how much extra data " + + "can be read beyond the required data to optimize I/O operations and improve read performance.") + .withDefaultValue(16384); + + public static final ConfigOption PARQUET_METADATA_CACHE_SIZE = new ConfigOption<>(Integer.class) + .withKey("auron.parquet.metadataCacheSize") + .withCategory("Data Sources") + .withDescription("Size of the Parquet metadata cache in number of entries. This cache stores file metadata " + + "to avoid repeated metadata reads and improve query performance for frequently accessed files.") + .withDefaultValue(5); + + public static final ConfigOption SPARK_IO_COMPRESSION_CODEC = new ConfigOption<>(String.class) + .withKey("io.compression.codec") + .withCategory("Runtime Configuration") + .withDescription( + "Compression codec used for Spark I/O operations. Common options include lz4, snappy, gzip, and zstd. " + + "The choice of codec affects both compression ratio and decompression speed.") + .withDynamicDefaultValue(_conf -> SparkEnv.get().conf().get("spark.io.compression.codec", "lz4")); + + public static final ConfigOption SPARK_IO_COMPRESSION_ZSTD_LEVEL = new ConfigOption<>(Integer.class) + .withKey("io.compression.zstd.level") + .withCategory("Runtime Configuration") + .withDescription("Compression level for Zstandard (zstd) compression codec used in Spark I/O operations. " + + "Valid values range from 1 (fastest) to 22 (highest compression). Higher levels provide better compression " + + "but require more CPU time and memory.") + .withDynamicDefaultValue(_conf -> SparkEnv.get().conf().getInt("spark.io.compression.zstd.level", 1)); + + public static final ConfigOption TOKIO_WORKER_THREADS_PER_CPU = new ConfigOption<>(Integer.class) + .withKey("auron.tokio.worker.threads.per.cpu") + .withCategory("Runtime Configuration") + .withDescription( + "Number of Tokio worker threads to create per CPU core (spark.task.cpus). Set to 0 for automatic detection " + + "based on available CPU cores. This setting controls the thread pool size for Tokio-based asynchronous operations.") + .withDefaultValue(0); + + public static final ConfigOption SPARK_TASK_CPUS = new ConfigOption<>(Integer.class) + .withKey("task.cpus") + .withCategory("Runtime Configuration") + .withDescription( + "Number of CPU cores allocated per Spark task. This setting determines the parallelism level " + + "for individual tasks and affects resource allocation and task scheduling. " + + "Defaults to spark.task.cpus.") + .withDynamicDefaultValue(_conf -> SparkEnv.get().conf().getInt("spark.task.cpus", 1)); + + public static final ConfigOption FORCE_SHUFFLED_HASH_JOIN = new ConfigOption<>(Boolean.class) + .withKey("auron.forceShuffledHashJoin") + .withCategory("Operator Supports") + .withDescription( + "Force replacement of all sort-merge joins with shuffled-hash joins for performance comparison and benchmarking. " + + "This setting is primarily used for testing and performance analysis, as different join strategies may be optimal " + + "for different data distributions and query patterns.") + .withDefaultValue(false); + + public static final ConfigOption SHUFFLE_COMPRESSION_TARGET_BUF_SIZE = new ConfigOption<>(Integer.class) + .withKey("auron.shuffle.compression.targetBufSize") + .withCategory("Runtime Configuration") + .withDescription( + "Target buffer size in bytes for shuffle compression operations. This setting controls the buffer size " + + "used during shuffle data compression, affecting both compression efficiency and memory usage. Default is 4MB (4,194,304 bytes).") + .withDefaultValue(4194304); + + public static final ConfigOption SPILL_COMPRESSION_CODEC = new ConfigOption<>(String.class) + .withKey("auron.spill.compression.codec") + .withCategory("Runtime Configuration") + .withDescription( + "Compression codec used for Spark spill operations when data is written to disk due to memory pressure. " + + "Common options include lz4, snappy, and gzip. The choice affects both spill performance and disk space usage.") + .withDefaultValue("lz4"); + + public static final ConfigOption SMJ_FALLBACK_ENABLE = new ConfigOption<>(Boolean.class) + .withKey("auron.smjfallback.enable") + .withCategory("Operator Supports") + .withDescription( + "Enable fallback from hash join to sort-merge join when the hash table becomes too large to fit in memory. " + + "This prevents out-of-memory errors by switching to a more memory-efficient join strategy when necessary.") + .withDefaultValue(false); + + public static final ConfigOption SMJ_FALLBACK_ROWS_THRESHOLD = new ConfigOption<>(Integer.class) + .withKey("auron.smjfallback.rows.threshold") + .withCategory("Operator Supports") + .withDescription( + "Row count threshold that triggers fallback from hash join to sort-merge join. When the number of rows " + + "in the hash table exceeds this threshold, the system will switch to sort-merge join to avoid memory issues.") + .withDefaultValue(10000000); + + public static final ConfigOption SMJ_FALLBACK_MEM_SIZE_THRESHOLD = new ConfigOption<>(Integer.class) + .withKey("auron.smjfallback.mem.threshold") + .withCategory("Operator Supports") + .withDescription("Memory size threshold in bytes that triggers fallback from hash join to sort-merge join. " + + "When the hash table memory usage exceeds this threshold (128MB by default), the system switches " + + "to sort-merge join to prevent memory overflow.") + .withDefaultValue(134217728); + + public static final ConfigOption ON_HEAP_SPILL_MEM_FRACTION = new ConfigOption<>(Double.class) + .withKey("auron.onHeapSpill.memoryFraction") + .withCategory("Runtime Configuration") + .withDescription( + "Maximum memory fraction allocated for on-heap spilling operations. This controls what portion " + + "of the available on-heap memory can be used for spilling data to disk when memory pressure occurs.") + .withDefaultValue(0.9); + + public static final ConfigOption SUGGESTED_BATCH_MEM_SIZE = new ConfigOption<>(Integer.class) + .withKey("auron.suggested.batch.memSize") + .withCategory("Runtime Configuration") + .withDescription( + "Suggested memory size in bytes for record batches. This setting controls the target memory allocation " + + "for individual data batches to optimize memory usage and processing efficiency. Default is 8MB (8,388,608 bytes).") + .withDefaultValue(8388608); + + public static final ConfigOption PARSE_JSON_ERROR_FALLBACK = new ConfigOption<>(Boolean.class) + .withKey("auron.parseJsonError.fallback") + .withCategory("Expression/Function Supports") + .withDescription( + "Enable fallback to UDFJson implementation when native JSON parsing encounters errors. " + + "This ensures query execution continues even when the native JSON parser fails, at the cost of potentially slower performance.") + .withDefaultValue(true); + + public static final ConfigOption SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE = new ConfigOption<>(Integer.class) + .withKey("auron.suggested.batch.memSize.multiwayMerging") + .withCategory("Runtime Configuration") + .withDescription( + "Suggested memory size in bytes for k-way merging operations. This uses a smaller batch memory size " + + "compared to regular operations since multiple batches are kept in memory simultaneously during k-way merging. " + + "Default is 1MB (1,048,576 bytes).") + .withDefaultValue(1048576); + + public static final ConfigOption ORC_FORCE_POSITIONAL_EVOLUTION = new ConfigOption<>(Boolean.class) + .withKey("auron.orc.force.positional.evolution") + .withCategory("Data Sources") + .withDescription( + "Force ORC positional evolution mode for schema evolution operations. When enabled, column mapping " + + "will be based on column position rather than column name, which can be useful for certain schema evolution scenarios.") + .withDefaultValue(false); + + public static final ConfigOption ORC_TIMESTAMP_USE_MICROSECOND = new ConfigOption<>(Boolean.class) + .withKey("auron.orc.timestamp.use.microsecond") + .withCategory("Data Sources") + .withDescription( + "Use microsecond precision when reading ORC timestamp columns instead of the default millisecond precision. " + + "This provides higher temporal resolution for timestamp data but may require more storage space.") + .withDefaultValue(false); + + public static final ConfigOption ORC_SCHEMA_CASE_SENSITIVE = new ConfigOption<>(Boolean.class) + .withKey("auron.orc.schema.caseSensitive.enable") + .withCategory("Data Sources") + .withDescription( + "Enable case-sensitive schema matching for ORC files. When true, column names in the schema must match " + + "the case of columns in the ORC file exactly. When false, column name matching is case-insensitive.") + .withDefaultValue(false); + + public static final ConfigOption FORCE_SHORT_CIRCUIT_AND_OR = new ConfigOption<>(Boolean.class) + .withKey("auron.forceShortCircuitAndOr") + .withCategory("Expression/Function Supports") + .withDescription( + "Force the use of short-circuit evaluation (PhysicalSCAndExprNode/PhysicalSCOrExprNode) for AND/OR expressions, " + + "regardless of whether the right-hand side contains Hive UDFs. This can improve performance by avoiding unnecessary " + + "evaluation of expressions when the result is already determined.") + .withDefaultValue(false); + + public static final ConfigOption ENABLE_SCAN = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.scan") + .withCategory("Operator Supports") + .withDescription("Enable ScanExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_PAIMON_SCAN = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.paimon.scan") + .withCategory("Operator Supports") + .withDescription("Enable PaimonScanExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_PROJECT = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.project") + .withCategory("Operator Supports") + .withDescription("Enable ProjectExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_FILTER = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.filter") + .withCategory("Operator Supports") + .withDescription("Enable FilterExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_SORT = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.sort") + .withCategory("Operator Supports") + .withDescription("Enable SortExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_UNION = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.union") + .withCategory("Operator Supports") + .withDescription("Enable UnionExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_SMJ = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.smj") + .withCategory("Operator Supports") + .withDescription("Enable SortMergeJoinExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_SHJ = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.shj") + .withCategory("Operator Supports") + .withDescription("Enable ShuffledHashJoinExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_BHJ = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.bhj") + .withCategory("Operator Supports") + .withDescription("Enable BroadcastHashJoinExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_BNLJ = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.bnlj") + .withCategory("Operator Supports") + .withDescription("Enable BroadcastNestedLoopJoinExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_LOCAL_LIMIT = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.local.limit") + .withCategory("Operator Supports") + .withDescription("Enable LocalLimitExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_GLOBAL_LIMIT = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.global.limit") + .withCategory("Operator Supports") + .withDescription("Enable GlobalLimitExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_TAKE_ORDERED_AND_PROJECT = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.take.ordered.and.project") + .withCategory("Operator Supports") + .withDescription("Enable TakeOrderedAndProjectExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_COLLECT_LIMIT = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.collectLimit") + .withCategory("Operator Supports") + .withDescription("Enable CollectLimitExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_AGGR = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.aggr") + .withCategory("Operator Supports") + .withDescription("Enable AggregateExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_EXPAND = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.expand") + .withCategory("Operator Supports") + .withDescription("Enable ExpandExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_WINDOW = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.window") + .withCategory("Operator Supports") + .withDescription("Enable WindowExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_WINDOW_GROUP_LIMIT = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.window.group.limit") + .withCategory("Operator Supports") + .withDescription("Enable WindowGroupLimitExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_GENERATE = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.generate") + .withCategory("Operator Supports") + .withDescription("Enable GenerateExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_LOCAL_TABLE_SCAN = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.local.table.scan") + .withCategory("Operator Supports") + .withDescription("Enable LocalTableScanExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_DATA_WRITING = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.data.writing") + .withCategory("Operator Supports") + .withDescription("Enable DataWritingExec operation conversion to native Auron implementations.") + .withDefaultValue(false); + + public static final ConfigOption ENABLE_SCAN_PARQUET = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.scan.parquet") + .withCategory("Data Sources") + .withDescription("Enable ParquetScanExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_SCAN_PARQUET_TIMESTAMP = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.scan.parquet.timestamp") + .withCategory("Data Sources") + .withDescription( + "Enable ParquetScanExec operation conversion with timestamp fields to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_SCAN_ORC = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.scan.orc") + .withCategory("Data Sources") + .withDescription("Enable OrcScanExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_SCAN_ORC_TIMESTAMP = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.scan.orc.timestamp") + .withCategory("Data Sources") + .withDescription( + "Enable OrcScanExec operation conversion with timestamp fields to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_BROADCAST_EXCHANGE = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.broadcastExchange") + .withCategory("Operator Supports") + .withDescription("Enable BroadcastExchangeExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption ENABLE_SHUFFLE_EXCHANGE = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.shuffleExchange") + .withCategory("Operator Supports") + .withDescription("Enable ShuffleExchangeExec operation conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption UDF_JSON_ENABLED = new SQLConfOption<>(Boolean.class) + .withKey("auron.udf.UDFJson.enabled") + .withCategory("Expression/Function Supports") + .withDescription("Enable UDFJson function conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption UDF_BRICKHOUSE_ENABLED = new SQLConfOption<>(Boolean.class) + .withKey("auron.udf.brickhouse.enabled") + .withCategory("Expression/Function Supports") + .withDescription("Enable Brickhouse UDF conversion to native Auron implementations.") + .withDefaultValue(true); + + public static final ConfigOption DECIMAL_ARITH_OP_ENABLED = new SQLConfOption<>(Boolean.class) + .withKey("auron.decimal.arithOp.enabled") + .withCategory("Expression/Function Supports") + .withDescription("Enable decimal arithmetic operations conversion to native Auron implementations.") + .withDefaultValue(false); + + public static final ConfigOption DATETIME_EXTRACT_ENABLED = new SQLConfOption<>(Boolean.class) + .withKey("auron.datetime.extract.enabled") + .withCategory("Expression/Function Supports") + .withDescription("Enable datetime extract operations conversion to native Auron implementations.") + .withDefaultValue(false); + + public static final ConfigOption UDF_SINGLE_CHILD_FALLBACK_ENABLED = new SQLConfOption<>(Boolean.class) + .withKey("auron.udf.singleChildFallback.enabled") + .withCategory("Expression/Function Supports") + .withDescription("Enable falling-back UDF/expression with single child.") + .withDefaultValue(true); @Override public Optional getOptional(ConfigOption option) { - if (option.key().startsWith(SPARK_PREFIX)) { - return Optional.ofNullable(getSparkConf(option.key(), getOptionDefaultValue(option))); - } else { - return Optional.ofNullable(getSparkConf(SPARK_PREFIX + option.key(), getOptionDefaultValue(option))); - } + GetFromSparkType getFromSparkType = option instanceof SQLConfOption + ? GetFromSparkType.FROM_SQL_CONF + : option instanceof SparkContextOption + ? GetFromSparkType.FROM_SPARK_CONTEXT + : GetFromSparkType.FROM_SPARK_ENV; + return Optional.ofNullable(getFromSpark( + option.key(), + option.altKeys(), + option.getValueClass(), + () -> getOptionDefaultValue(option), + getFromSparkType)); } - @Override - public Optional getOptional(String key) { - if (key.startsWith(SPARK_PREFIX)) { - return Optional.ofNullable(getSparkConf(key, null)); - } else { - return Optional.ofNullable(getSparkConf(SPARK_PREFIX + key, null)); - } + enum GetFromSparkType { + FROM_SPARK_ENV, + FROM_SPARK_CONTEXT, + FROM_SQL_CONF; } - private synchronized T getSparkConf(String key, T defaultValue) { - // Use synchronized to avoid issues with multiple threads. - synchronized (ConfigEntry.class) { - ConfigEntry entry = (ConfigEntry) ConfigEntry.findEntry(key); - if (entry == null) { - entry = new ConfigEntryWithDefault<>( - key, - Option.empty(), + @SuppressWarnings("unchecked") + private T getFromSpark( + String key, + List altKeys, + Class valueClass, + Supplier defaultValueSupplier, + GetFromSparkType getFromSparkType) { + Object configEntry; + + synchronized (SparkAuronConfiguration.class) { + String sparkConfKey = key.startsWith(SPARK_PREFIX) ? key : SPARK_PREFIX + key; + configEntry = ConfigEntry.findEntry(sparkConfKey); + for (String altKey : altKeys) { + String sparkConfAltKey = altKey.startsWith(SPARK_PREFIX) ? altKey : SPARK_PREFIX + altKey; + if (configEntry != null) { + break; + } + configEntry = ConfigEntry.findEntry(sparkConfAltKey); + } + + if (configEntry == null) { + configEntry = new ConfigEntryWithDefaultFunction<>( + sparkConfKey, + Option.empty(), "", List$.MODULE$.empty(), - defaultValue, - (val) -> valueConverter(val, defaultValue, defaultValue == null), + defaultValueSupplier::get, + val -> valueConverter(val, valueClass), String::valueOf, null, true, null); } - return sparkConf.get(entry); + } + + if (getFromSparkType == GetFromSparkType.FROM_SPARK_ENV) { + return SparkEnv.get().conf().get((ConfigEntry) configEntry); + + } else if (getFromSparkType == GetFromSparkType.FROM_SPARK_CONTEXT) { + return SparkContext.getOrCreate().getConf().get((ConfigEntry) configEntry); + + } else if (getFromSparkType == GetFromSparkType.FROM_SQL_CONF) { + return ((ConfigEntry) configEntry).readFrom(SQLConf.get().reader()); + + } else { + throw new IllegalArgumentException("unknown getFromSparkType: " + getFromSparkType); } } - private T valueConverter(String value, T defaultValue, boolean defaultValueIsNull) { - if (defaultValueIsNull) { + private T valueConverter(String value, Class valueClass) { + if (valueClass == Integer.class) { + return (T) Integer.valueOf(value); + } else if (valueClass == Long.class) { + return (T) Long.valueOf(value); + } else if (valueClass == Boolean.class) { + return (T) Boolean.valueOf(value); + } else if (valueClass == Float.class) { + return (T) Float.valueOf(value); + } else if (valueClass == Double.class) { + return (T) Double.valueOf(value); + } else if (valueClass == String.class) { return (T) value; } else { - if (defaultValue instanceof Integer) { - return (T) Integer.valueOf(value); - } else if (defaultValue instanceof Long) { - return (T) Long.valueOf(value); - } else if (defaultValue instanceof Boolean) { - return (T) Boolean.valueOf(value); - } else if (defaultValue instanceof Float) { - return (T) Float.valueOf(value); - } else if (defaultValue instanceof Double) { - return (T) Double.valueOf(value); - } else if (defaultValue instanceof String) { - return (T) String.valueOf(value); - } else { - throw new IllegalArgumentException("Unsupported default value type: " - + defaultValue.getClass().getName()); - } + throw new IllegalArgumentException("Unsupported default value type: " + valueClass.getName()); } } } + +class SparkContextOption extends ConfigOption { + SparkContextOption(Class clazz) { + super(clazz); + } +} + +class SQLConfOption extends ConfigOption { + SQLConfOption(Class clazz) { + super(clazz); + } +} diff --git a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfigurationDocGenerator.java b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfigurationDocGenerator.java new file mode 100644 index 000000000..ea587651a --- /dev/null +++ b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfigurationDocGenerator.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.spark.configuration; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import org.apache.auron.configuration.ConfigOption; + +public class SparkAuronConfigurationDocGenerator { + + // Generate documentation for SparkAuronConfiguration + public static void main(String[] args) { + // Categories array based on SparkAuronConfiguration categories + String[] categories = { + "Runtime Configuration", + "Operator Supports", + "Data Sources", + "Expression/Function Supports", + "UDAF Fallback", + "Partial Aggregate Skipping" + }; + Class auronConfigurationClass = SparkAuronConfiguration.class; + List> configOptions = new ArrayList<>(); + + for (Field field : auronConfigurationClass.getFields()) { + try { + configOptions.add((ConfigOption) field.get(null)); + } catch (IllegalAccessException | ClassCastException e) { + // this is not a config option + } + } + configOptions.sort(Comparator.comparing(option -> option.category() + option.key())); + + for (String category : categories) { + System.out.println(); + System.out.println("### " + category); + System.out.println("| Conf Key | Type | Default Value | Description |"); + System.out.println("| -------- | ---- | ------------- | ----------- |"); + for (ConfigOption configOption : configOptions) { + if (!configOption.category().equals(category)) { + continue; + } + String sparkConfKey = + configOption.key().startsWith("spark.") ? configOption.key() : "spark." + configOption.key(); + for (String altKey : configOption.altKeys()) { + String sparkConfAltKey = altKey.startsWith("spark.") ? altKey : "spark." + altKey; + sparkConfKey += "
     _alternative: " + sparkConfAltKey + "_"; + } + String sparkConfDesc = configOption.description(); + Class sparkConfValueClass = configOption.getValueClass(); + Object sparkConfDefaultValue = configOption.defaultValue() == null ? "-" : configOption.defaultValue(); + System.out.println("| " + sparkConfKey + " | " + sparkConfValueClass.getSimpleName() + " | " + + sparkConfDefaultValue + " | " + sparkConfDesc + " |"); + } + } + } +} diff --git a/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java b/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java deleted file mode 100644 index 2943d4b2a..000000000 --- a/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.auron; - -import org.apache.spark.SparkConf; -import org.apache.spark.SparkEnv$; - -/** - * This class has been deprecated and migrated to {@link org.apache.auron.spark.configuration.SparkAuronConfiguration}. - * Will be removed in the future. - */ -@Deprecated -@SuppressWarnings("unused") -public enum AuronConf { - // support spark.auron.ui.enabled - UI_ENABLED("spark.auron.ui.enabled", true), - - /// suggested batch size for arrow batches. - BATCH_SIZE("spark.auron.batchSize", 10000), - - /// suggested fraction of off-heap memory used in native execution. - /// actual off-heap memory usage is expected to be spark.executor.memoryOverhead * fraction. - MEMORY_FRACTION("spark.auron.memoryFraction", 0.6), - - /// suggested fraction of process total memory (on-heap and off-heap). - /// this limit is for process's resident memory usage - PROCESS_MEMORY_FRACTION("spark.auron.process.vmrss.memoryFraction", 0.9), - - /// enable converting upper/lower functions to native, special cases may provide different - /// outputs from spark due to different unicode versions. - CASE_CONVERT_FUNCTIONS_ENABLE("spark.auron.enable.caseconvert.functions", true), - - /// enable extra metrics of input batch statistics - INPUT_BATCH_STATISTICS_ENABLE("spark.auron.enableInputBatchStatistics", true), - - /// supports UDAF and other aggregate functions not implemented - UDAF_FALLBACK_ENABLE("spark.auron.udafFallback.enable", true), - - // TypedImperativeAggregate one row mem use size - SUGGESTED_UDAF_ROW_MEM_USAGE("spark.auron.suggested.udaf.memUsedSize", 64), - - /// number of udafs to trigger sort-based aggregation - /// by default, all aggs containing udafs are converted to sort-based - UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG("spark.auron.udafFallback.num.udafs.trigger.sortAgg", 1), - - // TypedImperativeAggregate one row mem use size - UDAF_FALLBACK_ESTIM_ROW_SIZE("spark.auron.udafFallback.typedImperativeEstimatedRowSize", 256), - - /// enable trimming string inputs before casting to numeric/boolean types - CAST_STRING_TRIM_ENABLE("spark.auron.cast.trimString", true), - - /// ignore corrupted input files - IGNORE_CORRUPTED_FILES("spark.files.ignoreCorruptFiles", false), - - /// enable partial aggregate skipping (see https://github.com/apache/auron/issues/327) - PARTIAL_AGG_SKIPPING_ENABLE("spark.auron.partialAggSkipping.enable", true), - - /// partial aggregate skipping ratio - PARTIAL_AGG_SKIPPING_RATIO("spark.auron.partialAggSkipping.ratio", 0.9), - - /// minimum number of rows to trigger partial aggregate skipping - PARTIAL_AGG_SKIPPING_MIN_ROWS("spark.auron.partialAggSkipping.minRows", BATCH_SIZE.intConf() * 5), - - /// always skip partial aggregate when triggered spilling - PARTIAL_AGG_SKIPPING_SKIP_SPILL("spark.auron.partialAggSkipping.skipSpill", false), - - // parquet enable page filtering - PARQUET_ENABLE_PAGE_FILTERING("spark.auron.parquet.enable.pageFiltering", false), - - // parquet enable bloom filter - PARQUET_ENABLE_BLOOM_FILTER("spark.auron.parquet.enable.bloomFilter", false), - - // parquet max over read size - PARQUET_MAX_OVER_READ_SIZE("spark.auron.parquet.maxOverReadSize", 16384), - - // parquet metadata cache size - PARQUET_METADATA_CACHE_SIZE("spark.auron.parquet.metadataCacheSize", 5), - - // spark io compression codec - SPARK_IO_COMPRESSION_CODEC("spark.io.compression.codec", "lz4"), - - // spark io compression zstd level - SPARK_IO_COMPRESSION_ZSTD_LEVEL("spark.io.compression.zstd.level", 1), - - // tokio worker threads per cpu (spark.task.cpus), 0 for auto detection - TOKIO_WORKER_THREADS_PER_CPU("spark.auron.tokio.worker.threads.per.cpu", 0), - - // number of cpus per task - SPARK_TASK_CPUS("spark.task.cpus", 1), - - // replace all sort-merge join to shuffled-hash join, only used for benchmarking - FORCE_SHUFFLED_HASH_JOIN("spark.auron.forceShuffledHashJoin", false), - - // shuffle compression target buffer size, default is 4MB - SHUFFLE_COMPRESSION_TARGET_BUF_SIZE("spark.auron.shuffle.compression.targetBufSize", 4194304), - - // spark spill compression codec - SPILL_COMPRESSION_CODEC("spark.auron.spill.compression.codec", "lz4"), - - // enable hash join falling back to sort merge join when hash table is too big - SMJ_FALLBACK_ENABLE("spark.auron.smjfallback.enable", false), - - // smj fallback threshold - SMJ_FALLBACK_ROWS_THRESHOLD("spark.auron.smjfallback.rows.threshold", 10000000), - - // smj fallback threshold - SMJ_FALLBACK_MEM_SIZE_THRESHOLD("spark.auron.smjfallback.mem.threshold", 134217728), - - // max memory fraction of on-heap spills - ON_HEAP_SPILL_MEM_FRACTION("spark.auron.onHeapSpill.memoryFraction", 0.9), - - // suggested memory size for record batch - SUGGESTED_BATCH_MEM_SIZE("spark.auron.suggested.batch.memSize", 8388608), - - // fallback to UDFJson when error parsing json in native implementation - PARSE_JSON_ERROR_FALLBACK("spark.auron.parseJsonError.fallback", true), - - // suggested memory size for k-way merging - // use smaller batch memory size for kway merging since there will be multiple - // batches in memory at the same time - SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE("spark.auron.suggested.batch.memSize.multiwayMerging", 1048576), - - ORC_FORCE_POSITIONAL_EVOLUTION("spark.auron.orc.force.positional.evolution", false), - - // use microsecond precision when reading ORC timestamp columns - ORC_TIMESTAMP_USE_MICROSECOND("spark.auron.orc.timestamp.use.microsecond", false), - - ORC_SCHEMA_CASE_SENSITIVE("spark.auron.orc.schema.caseSensitive.enable", false), - - NATIVE_LOG_LEVEL("spark.auron.native.log.level", "info"); - - public final String key; - private final Object defaultValue; - - AuronConf(String key, Object defaultValue) { - this.key = key; - this.defaultValue = defaultValue; - } - - public boolean booleanConf() { - return conf().getBoolean(key, (boolean) defaultValue); - } - - public int intConf() { - return conf().getInt(key, (int) defaultValue); - } - - public long longConf() { - return conf().getLong(key, (long) defaultValue); - } - - public double doubleConf() { - return conf().getDouble(key, (double) defaultValue); - } - - public String stringConf() { - return conf().get(key, (String) defaultValue); - } - - public static boolean booleanConf(String confName) { - return AuronConf.valueOf(confName).booleanConf(); - } - - public static int intConf(String confName) { - return AuronConf.valueOf(confName).intConf(); - } - - public static long longConf(String confName) { - return AuronConf.valueOf(confName).longConf(); - } - - public static double doubleConf(String confName) { - return AuronConf.valueOf(confName).doubleConf(); - } - - public static String stringConf(String confName) { - return AuronConf.valueOf(confName).stringConf(); - } - - private static SparkConf conf() { - return SparkEnv$.MODULE$.get().conf(); - } -} diff --git a/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java b/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java deleted file mode 100644 index 3bea6e17a..000000000 --- a/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.auron; - -import java.lang.management.BufferPoolMXBean; -import java.lang.management.ManagementFactory; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.auron.functions.AuronUDFWrapperContext; -import org.apache.auron.hadoop.fs.FSDataInputWrapper; -import org.apache.auron.hadoop.fs.FSDataInputWrapper$; -import org.apache.auron.hadoop.fs.FSDataOutputWrapper; -import org.apache.auron.hadoop.fs.FSDataOutputWrapper$; -import org.apache.auron.memory.OnHeapSpillManager; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.spark.SparkEnv; -import org.apache.spark.TaskContext; -import org.apache.spark.TaskContext$; -import org.apache.spark.sql.auron.memory.SparkOnHeapSpillManager$; -import org.apache.spark.sql.auron.util.TaskContextHelper$; - -/** - * This class has been deprecated and migrated to {@link org.apache.auron.jni.JniBridge}. - * Will be removed in the future. - */ -@Deprecated -@SuppressWarnings("unused") -public class JniBridge { - - @Deprecated - public static final ConcurrentHashMap resourcesMap = new ConcurrentHashMap<>(); - - @Deprecated - public static native long callNative(long initNativeMemory, String logLevel, AuronCallNativeWrapper wrapper); - - @Deprecated - public static native boolean nextBatch(long ptr); - - @Deprecated - public static native void finalizeNative(long ptr); - - @Deprecated - public static native void onExit(); - - @Deprecated - public static ClassLoader getContextClassLoader() { - return Thread.currentThread().getContextClassLoader(); - } - - @Deprecated - public static void setContextClassLoader(ClassLoader cl) { - Thread.currentThread().setContextClassLoader(cl); - } - - @Deprecated - public static Object getResource(String key) { - return resourcesMap.remove(key); - } - - @Deprecated - public static TaskContext getTaskContext() { - return TaskContext$.MODULE$.get(); - } - - @Deprecated - public static OnHeapSpillManager getTaskOnHeapSpillManager() { - return SparkOnHeapSpillManager$.MODULE$.current(); - } - - @Deprecated - public static boolean isTaskRunning() { - TaskContext tc = getTaskContext(); - if (tc == null) { // driver is always running - return true; - } - return !tc.isCompleted() && !tc.isInterrupted(); - } - - @Deprecated - public static FSDataInputWrapper openFileAsDataInputWrapper(FileSystem fs, String path) throws Exception { - // the path is a URI string, so we need to convert it to a URI object, ref: - // org.apache.spark.paths.SparkPath.toPath - return FSDataInputWrapper$.MODULE$.wrap(fs.open(new Path(new URI(path)))); - } - - @Deprecated - public static FSDataOutputWrapper createFileAsDataOutputWrapper(FileSystem fs, String path) throws Exception { - return FSDataOutputWrapper$.MODULE$.wrap(fs.create(new Path(new URI(path)))); - } - - @Deprecated - private static final List directMXBeans = - ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class); - - @Deprecated - public static long getTotalMemoryLimited() { - return NativeHelper$.MODULE$.totalMemory(); - } - - @Deprecated - public static long getDirectMemoryUsed() { - return directMXBeans.stream() - .mapToLong(BufferPoolMXBean::getTotalCapacity) - .sum(); - } - - @Deprecated - public static String getDirectWriteSpillToDiskFile() { - return SparkEnv.get() - .blockManager() - .diskBlockManager() - .createTempLocalBlock() - ._2 - .getPath(); - } - - @Deprecated - public static void initNativeThread(ClassLoader cl, TaskContext tc) { - setContextClassLoader(cl); - TaskContext$.MODULE$.setTaskContext(tc); - TaskContextHelper$.MODULE$.setNativeThreadName(); - TaskContextHelper$.MODULE$.setHDFSCallerContext(); - } - - @Deprecated - public static AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer udfSerialized) { - throw new UnsupportedOperationException("This API is designed to support next-generation multi-engine."); - } -} diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala deleted file mode 100644 index b4028c1e2..000000000 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.auron - -import java.io.File -import java.io.IOException -import java.nio.file.Files -import java.nio.file.StandardCopyOption -import java.util.concurrent.atomic.AtomicReference - -import scala.annotation.nowarn -import scala.collection.mutable.ArrayBuffer - -import org.apache.arrow.c.ArrowArray -import org.apache.arrow.c.ArrowSchema -import org.apache.arrow.c.CDataDictionaryProvider -import org.apache.arrow.c.Data -import org.apache.arrow.vector.VectorSchemaRoot -import org.apache.arrow.vector.types.pojo.Schema -import org.apache.spark.Partition -import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging -import org.apache.spark.sql.auron.util.Using -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.UnsafeProjection -import org.apache.spark.sql.execution.auron.arrowio.util.ArrowUtils -import org.apache.spark.sql.execution.auron.arrowio.util.ArrowUtils.ROOT_ALLOCATOR -import org.apache.spark.sql.execution.auron.columnar.ColumnarHelper -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.CompletionIterator -import org.apache.spark.util.ShutdownHookManager -import org.apache.spark.util.Utils - -import org.apache.auron.metric.{MetricNode, SparkMetricNode} -import org.apache.auron.protobuf.PartitionId -import org.apache.auron.protobuf.PhysicalPlanNode -import org.apache.auron.protobuf.TaskDefinition - -/** - * This class has been deprecated and migrated to {@link - * org.apache.auron.jni.AuronCallNativeWrapper}. Will be removed in the future. - */ -@nowarn("cat=deprecation") // JniBridge is temporarily used (deprecated) -@Deprecated -case class AuronCallNativeWrapper( - nativePlan: PhysicalPlanNode, - partition: Partition, - context: Option[TaskContext], - metrics: SparkMetricNode) - extends Logging { - - AuronCallNativeWrapper.initNative() - - private val error: AtomicReference[Throwable] = new AtomicReference(null) - private val dictionaryProvider = new CDataDictionaryProvider() - private var arrowSchema: Schema = _ - private var schema: StructType = _ - private var toUnsafe: UnsafeProjection = _ - private val batchRows: ArrayBuffer[InternalRow] = ArrayBuffer() - private var batchCurRowIdx = 0 - - logInfo(s"Start executing native plan ${nativePlan.getPhysicalPlanTypeCase}") - private var nativeRuntimePtr = - JniBridge.callNative(NativeHelper.nativeMemory, AuronConf.NATIVE_LOG_LEVEL.stringConf(), this) - - private lazy val rowIterator = new Iterator[InternalRow] { - override def hasNext: Boolean = { - checkError() - - if (batchCurRowIdx < batchRows.length) { - return true - } - - // clear current batch - batchRows.clear() - batchCurRowIdx = 0 - - // load next batch - try { - if (nativeRuntimePtr != 0 && JniBridge.nextBatch(nativeRuntimePtr)) { - return hasNext - } - } finally { - // if error has been set, throw set error instead of this caught exception - checkError() - } - false - } - - @Deprecated - override def next(): InternalRow = { - val batchRow = batchRows(batchCurRowIdx) - batchCurRowIdx += 1 - batchRow - } - } - - context.foreach(_.addTaskCompletionListener[Unit]((_: TaskContext) => close())) - context.foreach(_.addTaskFailureListener((_, _) => close())) - - @Deprecated - def getRowIterator: Iterator[InternalRow] = { - CompletionIterator[InternalRow, Iterator[InternalRow]](rowIterator, close()) - } - - @Deprecated - protected def getMetrics: MetricNode = - metrics - - @Deprecated - protected def importSchema(ffiSchemaPtr: Long): Unit = { - Using.resource(ArrowSchema.wrap(ffiSchemaPtr)) { ffiSchema => - arrowSchema = Data.importSchema(ROOT_ALLOCATOR, ffiSchema, dictionaryProvider) - schema = ArrowUtils.fromArrowSchema(arrowSchema) - toUnsafe = UnsafeProjection.create(schema) - } - } - - @Deprecated - protected def importBatch(ffiArrayPtr: Long): Unit = { - if (nativeRuntimePtr == 0) { - throw new RuntimeException("Native runtime is finalized") - } - - Using.resources( - ArrowArray.wrap(ffiArrayPtr), - VectorSchemaRoot.create(arrowSchema, ROOT_ALLOCATOR)) { case (ffiArray, root) => - Data.importIntoVectorSchemaRoot(ROOT_ALLOCATOR, ffiArray, root, dictionaryProvider) - - batchRows.append( - ColumnarHelper - .rootRowsIter(root) - .map(row => toUnsafe(row).copy().asInstanceOf[InternalRow]) - .toSeq: _*) - } - } - - @Deprecated - protected def setError(error: Throwable): Unit = { - this.error.set(error) - } - - @Deprecated - protected def checkError(): Unit = { - val throwable = error.getAndSet(null) - if (throwable != null) { - close() - throw throwable - } - } - - @Deprecated - protected def getRawTaskDefinition: Array[Byte] = { - val partitionId: PartitionId = PartitionId - .newBuilder() - .setPartitionId(partition.index) - .setStageId(context.map(_.stageId()).getOrElse(0)) - .setTaskId(context.map(_.taskAttemptId()).getOrElse(0)) - .build() - - val taskDefinition = TaskDefinition - .newBuilder() - .setTaskId(partitionId) - .setPlan(nativePlan) - .build() - taskDefinition.toByteArray - } - - private def close(): Unit = { - synchronized { - batchRows.clear() - batchCurRowIdx = 0 - - if (nativeRuntimePtr != 0) { - JniBridge.finalizeNative(nativeRuntimePtr) - nativeRuntimePtr = 0 - dictionaryProvider.close() - checkError() - } - } - } -} - -@nowarn("cat=deprecation") // JniBridge is temporarily used (deprecated) -object AuronCallNativeWrapper extends Logging { - def initNative(): Unit = { - lazyInitNative - } - - private lazy val lazyInitNative: Unit = { - logInfo( - "Initializing native environment (" + - s"batchSize=${AuronConf.BATCH_SIZE.intConf()}, " + - s"nativeMemory=${NativeHelper.nativeMemory}, " + - s"memoryFraction=${AuronConf.MEMORY_FRACTION.doubleConf()})") - - // arrow configuration - System.setProperty("arrow.struct.conflict.policy", "CONFLICT_APPEND") - - assert(classOf[JniBridge] != null) // preload JNI bridge classes - AuronCallNativeWrapper.loadLibAuron() - ShutdownHookManager.addShutdownHook(() => JniBridge.onExit()) - } - - private def loadLibAuron(): Unit = { - val libName = System.mapLibraryName("auron") - try { - val classLoader = classOf[NativeSupports].getClassLoader - val tempFile = File.createTempFile("libauron-", ".tmp") - tempFile.deleteOnExit() - - Utils.tryWithResource { - val is = classLoader.getResourceAsStream(libName) - assert(is != null, s"cannot load $libName") - is - }(Files.copy(_, tempFile.toPath, StandardCopyOption.REPLACE_EXISTING)) - System.load(tempFile.getAbsolutePath) - - } catch { - case e: IOException => - throw new IllegalStateException("error loading native libraries: " + e) - } - } -} diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala index 9b8bed91b..def645d51 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala @@ -80,58 +80,34 @@ import org.apache.auron.protobuf.PhysicalPlanNode import org.apache.auron.spark.configuration.SparkAuronConfiguration object AuronConverters extends Logging { - def enableScan: Boolean = - getBooleanConf("spark.auron.enable.scan", defaultValue = true) - def enableProject: Boolean = - getBooleanConf("spark.auron.enable.project", defaultValue = true) - def enableFilter: Boolean = - getBooleanConf("spark.auron.enable.filter", defaultValue = true) - def enableSort: Boolean = - getBooleanConf("spark.auron.enable.sort", defaultValue = true) - def enableUnion: Boolean = - getBooleanConf("spark.auron.enable.union", defaultValue = true) - def enableSmj: Boolean = - getBooleanConf("spark.auron.enable.smj", defaultValue = true) - def enableShj: Boolean = - getBooleanConf("spark.auron.enable.shj", defaultValue = true) - def enableBhj: Boolean = - getBooleanConf("spark.auron.enable.bhj", defaultValue = true) - def enableBnlj: Boolean = - getBooleanConf("spark.auron.enable.bnlj", defaultValue = true) - def enableLocalLimit: Boolean = - getBooleanConf("spark.auron.enable.local.limit", defaultValue = true) - def enableGlobalLimit: Boolean = - getBooleanConf("spark.auron.enable.global.limit", defaultValue = true) + def enableScan: Boolean = SparkAuronConfiguration.ENABLE_SCAN.get() + def enableProject: Boolean = SparkAuronConfiguration.ENABLE_PROJECT.get() + def enableFilter: Boolean = SparkAuronConfiguration.ENABLE_FILTER.get() + def enableSort: Boolean = SparkAuronConfiguration.ENABLE_SORT.get() + def enableUnion: Boolean = SparkAuronConfiguration.ENABLE_UNION.get() + def enableSmj: Boolean = SparkAuronConfiguration.ENABLE_SMJ.get() + def enableShj: Boolean = SparkAuronConfiguration.ENABLE_SHJ.get() + def enableBhj: Boolean = SparkAuronConfiguration.ENABLE_BHJ.get() + def enableBnlj: Boolean = SparkAuronConfiguration.ENABLE_BNLJ.get() + def enableLocalLimit: Boolean = SparkAuronConfiguration.ENABLE_LOCAL_LIMIT.get() + def enableGlobalLimit: Boolean = SparkAuronConfiguration.ENABLE_GLOBAL_LIMIT.get() def enableTakeOrderedAndProject: Boolean = - getBooleanConf("spark.auron.enable.take.ordered.and.project", defaultValue = true) - def enableCollectLimit: Boolean = - getBooleanConf("spark.auron.enable.collectLimit", defaultValue = true) - def enableAggr: Boolean = - getBooleanConf("spark.auron.enable.aggr", defaultValue = true) - def enableExpand: Boolean = - getBooleanConf("spark.auron.enable.expand", defaultValue = true) - def enableWindow: Boolean = - getBooleanConf("spark.auron.enable.window", defaultValue = true) - def enableWindowGroupLimit: Boolean = - getBooleanConf("spark.auron.enable.window.group.limit", defaultValue = true) - def enableGenerate: Boolean = - getBooleanConf("spark.auron.enable.generate", defaultValue = true) - def enableLocalTableScan: Boolean = - getBooleanConf("spark.auron.enable.local.table.scan", defaultValue = true) - def enableDataWriting: Boolean = - getBooleanConf("spark.auron.enable.data.writing", defaultValue = false) - def enableScanParquet: Boolean = - getBooleanConf("spark.auron.enable.scan.parquet", defaultValue = true) + SparkAuronConfiguration.ENABLE_TAKE_ORDERED_AND_PROJECT.get() + def enableCollectLimit: Boolean = SparkAuronConfiguration.ENABLE_COLLECT_LIMIT.get() + def enableAggr: Boolean = SparkAuronConfiguration.ENABLE_AGGR.get() + def enableExpand: Boolean = SparkAuronConfiguration.ENABLE_EXPAND.get() + def enableWindow: Boolean = SparkAuronConfiguration.ENABLE_WINDOW.get() + def enableWindowGroupLimit: Boolean = SparkAuronConfiguration.ENABLE_WINDOW_GROUP_LIMIT.get() + def enableGenerate: Boolean = SparkAuronConfiguration.ENABLE_GENERATE.get() + def enableLocalTableScan: Boolean = SparkAuronConfiguration.ENABLE_LOCAL_TABLE_SCAN.get() + def enableDataWriting: Boolean = SparkAuronConfiguration.ENABLE_DATA_WRITING.get() + def enableScanParquet: Boolean = SparkAuronConfiguration.ENABLE_SCAN_PARQUET.get() def enableScanParquetTimestamp: Boolean = - getBooleanConf("spark.auron.enable.scan.parquet.timestamp", defaultValue = true) - def enableScanOrc: Boolean = - getBooleanConf("spark.auron.enable.scan.orc", defaultValue = true) - def enableScanOrcTimestamp: Boolean = - getBooleanConf("spark.auron.enable.scan.orc.timestamp", defaultValue = true) - def enableBroadcastExchange: Boolean = - getBooleanConf("spark.auron.enable.broadcastExchange", defaultValue = true) - def enableShuffleExechange: Boolean = - getBooleanConf("spark.auron.enable.shuffleExchange", defaultValue = true) + SparkAuronConfiguration.ENABLE_SCAN_PARQUET_TIMESTAMP.get() + def enableScanOrc: Boolean = SparkAuronConfiguration.ENABLE_SCAN_ORC.get() + def enableScanOrcTimestamp: Boolean = SparkAuronConfiguration.ENABLE_SCAN_ORC_TIMESTAMP.get() + def enableBroadcastExchange: Boolean = SparkAuronConfiguration.ENABLE_BROADCAST_EXCHANGE.get() + def enableShuffleExechange: Boolean = SparkAuronConfiguration.ENABLE_SHUFFLE_EXCHANGE.get() private val extConvertProviders = ServiceLoader.load(classOf[AuronConvertProvider]).asScala diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala index 1f8b6421d..b68b04954 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.auron import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.rules.Rule @@ -27,6 +26,8 @@ import org.apache.spark.sql.execution.LocalTableScanExec import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.internal.SQLConf +import org.apache.auron.spark.configuration.SparkAuronConfiguration + class AuronSparkSessionExtension extends (SparkSessionExtensions => Unit) with Logging { Shims.get.initExtension() @@ -35,7 +36,6 @@ class AuronSparkSessionExtension extends (SparkSessionExtensions => Unit) with L SparkEnv.get.conf.set(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key, "true") logInfo(s"${classOf[AuronSparkSessionExtension].getName} enabled") - assert(AuronSparkSessionExtension.auronEnabledKey != null) Shims.get.onApplyingExtension() extensions.injectColumnar(sparkSession => { @@ -45,11 +45,6 @@ class AuronSparkSessionExtension extends (SparkSessionExtensions => Unit) with L } object AuronSparkSessionExtension extends Logging { - lazy val auronEnabledKey: ConfigEntry[Boolean] = SQLConf - .buildConf("spark.auron.enable") - .booleanConf - .createWithDefault(true) - def dumpSimpleSparkPlanTreeNode(exec: SparkPlan, depth: Int = 0): Unit = { val nodeName = exec.nodeName val convertible = exec @@ -68,7 +63,7 @@ case class AuronColumnarOverrides(sparkSession: SparkSession) extends ColumnarRu override def preColumnarTransitions: Rule[SparkPlan] = { new Rule[SparkPlan] { override def apply(sparkPlan: SparkPlan): SparkPlan = { - if (!sparkPlan.conf.getConf(auronEnabledKey)) { + if (!SparkAuronConfiguration.AURON_ENABLED.get()) { return sparkPlan // performs no conversion if auron is not enabled } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala index 8b48c39aa..7a3bde2c8 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala @@ -81,31 +81,17 @@ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils import org.apache.auron.{protobuf => pb} -import org.apache.auron.configuration.AuronConfiguration -import org.apache.auron.jni.AuronAdaptor import org.apache.auron.protobuf.PhysicalExprNode import org.apache.auron.spark.configuration.SparkAuronConfiguration object NativeConverters extends Logging { - - private def sparkAuronConfig: AuronConfiguration = - AuronAdaptor.getInstance.getAuronConfiguration - def udfEnabled: Boolean = - AuronConverters.getBooleanConf("spark.auron.udf.enabled", defaultValue = true) - def udfJsonEnabled: Boolean = - AuronConverters.getBooleanConf("spark.auron.udf.UDFJson.enabled", defaultValue = true) - def udfBrickHouseEnabled: Boolean = - AuronConverters.getBooleanConf("spark.auron.udf.brickhouse.enabled", defaultValue = true) - def decimalArithOpEnabled: Boolean = - AuronConverters.getBooleanConf("spark.auron.decimal.arithOp.enabled", defaultValue = false) - def datetimeExtractEnabled: Boolean = - AuronConverters.getBooleanConf("spark.auron.datetime.extract.enabled", defaultValue = false) - def castTrimStringEnabled: Boolean = - AuronConverters.getBooleanConf("spark.auron.cast.trimString", defaultValue = true) + def udfJsonEnabled: Boolean = SparkAuronConfiguration.UDF_JSON_ENABLED.get() + def udfBrickHouseEnabled: Boolean = SparkAuronConfiguration.UDF_BRICKHOUSE_ENABLED.get() + def decimalArithOpEnabled: Boolean = SparkAuronConfiguration.DECIMAL_ARITH_OP_ENABLED.get() + def datetimeExtractEnabled: Boolean = SparkAuronConfiguration.DATETIME_EXTRACT_ENABLED.get() + def castTrimStringEnabled: Boolean = SparkAuronConfiguration.CAST_STRING_TRIM_ENABLE.get() def singleChildFallbackEnabled: Boolean = - AuronConverters.getBooleanConf( - "spark.auron.expression.singleChildFallback.enabled", - defaultValue = true) + SparkAuronConfiguration.UDF_SINGLE_CHILD_FALLBACK_ENABLED.get() /** * Is the data type(scalar or complex) supported by Auron. @@ -781,7 +767,7 @@ object NativeConverters extends Logging { // if rhs is complex in and/or operators, use short-circuiting implementation // or if forceShortCircuitAndOr is enabled, always use short-circuiting case And(lhs, rhs) - if sparkAuronConfig.getBoolean(SparkAuronConfiguration.FORCE_SHORT_CIRCUIT_AND_OR) + if SparkAuronConfiguration.FORCE_SHORT_CIRCUIT_AND_OR.get() || rhs.find(HiveUDFUtil.isHiveUDF).isDefined => buildExprNode { _.setScAndExpr( @@ -791,7 +777,7 @@ object NativeConverters extends Logging { .setRight(convertExprWithFallback(rhs, isPruningExpr, fallback))) } case Or(lhs, rhs) - if sparkAuronConfig.getBoolean(SparkAuronConfiguration.FORCE_SHORT_CIRCUIT_AND_OR) + if SparkAuronConfiguration.FORCE_SHORT_CIRCUIT_AND_OR.get() || rhs.find(HiveUDFUtil.isHiveUDF).isDefined => buildExprNode { _.setScOrExpr( @@ -889,11 +875,9 @@ object NativeConverters extends Logging { case Length(arg) if arg.dataType == StringType => buildScalarFunction(pb.ScalarFunction.CharacterLength, arg :: Nil, IntegerType) - case e: Lower - if sparkAuronConfig.getBoolean(SparkAuronConfiguration.CASE_CONVERT_FUNCTIONS_ENABLE) => + case e: Lower if SparkAuronConfiguration.CASE_CONVERT_FUNCTIONS_ENABLE.get() => buildExtScalarFunction("Spark_StringLower", e.children, e.dataType) - case e: Upper - if sparkAuronConfig.getBoolean(SparkAuronConfiguration.CASE_CONVERT_FUNCTIONS_ENABLE) => + case e: Upper if SparkAuronConfiguration.CASE_CONVERT_FUNCTIONS_ENABLE.get() => buildExtScalarFunction("Spark_StringUpper", e.children, e.dataType) case e: StringTrim => @@ -1254,7 +1238,7 @@ object NativeConverters extends Logging { } // fallback to UDAF - if (sparkAuronConfig.getBoolean(SparkAuronConfiguration.UDAF_FALLBACK_ENABLE)) { + if (SparkAuronConfiguration.UDAF_FALLBACK_ENABLE.get()) { udaf match { case _: DeclarativeAggregate => case _: TypedImperativeAggregate[_] => diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala index 8d8175219..19f98b415 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala @@ -292,6 +292,7 @@ object Shims { lazy val get: Shims = { classOf[Shims].getClassLoader .loadClass("org.apache.spark.sql.auron.ShimsImpl") + .getConstructor() .newInstance() .asInstanceOf[Shims] } diff --git a/spark-extension/src/test/java/org/apache/auron/spark/configuration/SparkAuronConfigurationTest.java b/spark-extension/src/test/java/org/apache/auron/spark/configuration/SparkAuronConfigurationTest.java deleted file mode 100644 index 99345250c..000000000 --- a/spark-extension/src/test/java/org/apache/auron/spark/configuration/SparkAuronConfigurationTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.auron.spark.configuration; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import org.apache.spark.SparkConf; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** - * This class is used to test the {@link SparkAuronConfiguration) class. - */ -public class SparkAuronConfigurationTest { - - private SparkAuronConfiguration sparkAuronConfiguration; - - @BeforeEach - public void setUp() { - SparkConf sparkConf = new SparkConf(); - sparkConf.set("spark.auron.ui.enabled", "false"); - sparkConf.set("spark.auron.process.vmrss.memoryFraction", "0.66"); - sparkConf.set("spark.auron.suggested.udaf.memUsedSize", "1024"); - sparkConf.set("spark.io.compression.codec", "gzip"); - sparkAuronConfiguration = new SparkAuronConfiguration(sparkConf); - } - - @Test - public void testGetSparkConfig() { - assertEquals(sparkAuronConfiguration.get(SparkAuronConfiguration.UI_ENABLED), false); - assertEquals(sparkAuronConfiguration.get(SparkAuronConfiguration.PROCESS_MEMORY_FRACTION), 0.66); - assertEquals(sparkAuronConfiguration.get(SparkAuronConfiguration.SUGGESTED_UDAF_ROW_MEM_USAGE), 1024); - assertEquals(sparkAuronConfiguration.get(SparkAuronConfiguration.SPARK_IO_COMPRESSION_CODEC), "gzip"); - - assertEquals( - sparkAuronConfiguration - .getOptional(SparkAuronConfiguration.UI_ENABLED.key()) - .get(), - false); - assertEquals( - sparkAuronConfiguration - .getOptional(SparkAuronConfiguration.PROCESS_MEMORY_FRACTION.key()) - .get(), - 0.66); - assertEquals( - sparkAuronConfiguration - .getOptional(SparkAuronConfiguration.SUGGESTED_UDAF_ROW_MEM_USAGE.key()) - .get(), - 1024); - assertEquals( - sparkAuronConfiguration - .getOptional(SparkAuronConfiguration.SPARK_IO_COMPRESSION_CODEC.key()) - .get(), - "gzip"); - - // Test default value - assertEquals( - sparkAuronConfiguration - .getOptional(SparkAuronConfiguration.PARSE_JSON_ERROR_FALLBACK) - .get(), - true); - } -} diff --git a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala index 5b320d18e..f9d75c439 100644 --- a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala +++ b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala @@ -24,11 +24,11 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.hive.execution.HiveTableScanExec import org.apache.spark.sql.hive.execution.auron.plan.NativePaimonTableScanExec +import org.apache.auron.spark.configuration.SparkAuronConfiguration + class PaimonConvertProvider extends AuronConvertProvider with Logging { - override def isEnabled: Boolean = { - AuronConverters.getBooleanConf("spark.auron.enable.paimon.scan", defaultValue = false) - } + override def isEnabled: Boolean = SparkAuronConfiguration.ENABLE_PAIMON_SCAN.get() override def isSupported(exec: SparkPlan): Boolean = { exec match {