Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 77 additions & 36 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.sql.types._

import org.apache.comet.{CometConf, CometNativeException, DataTypeSupport}
import org.apache.comet.CometConf._
import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, isCometLoaded, withInfo, withInfos}
import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, withInfo, withInfos}
import org.apache.comet.DataTypeSupport.isComplexType
import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection}
import org.apache.comet.objectstore.NativeConfig
Expand Down Expand Up @@ -145,21 +145,9 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
if (!CometScanExec.isFileFormatSupported(r.fileFormat)) {
return withInfo(scanExec, s"Unsupported file format ${r.fileFormat}")
}
val hadoopConf = r.sparkSession.sessionState.newHadoopConfWithOptions(r.options)

var scanImpl = COMET_NATIVE_SCAN_IMPL.get()

val hadoopConf = scanExec.relation.sparkSession.sessionState
.newHadoopConfWithOptions(scanExec.relation.options)

// if scan is auto then pick the best available scan
if (scanImpl == SCAN_AUTO) {
scanImpl = selectScan(scanExec, r.partitionSchema, hadoopConf)
}

if (scanImpl == SCAN_NATIVE_DATAFUSION && !CometNativeScan.isSupported(scanExec)) {
return scanExec
}

// TODO is this restriction valid for all native scan types?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are default values for primitive types supported by native_datafusion?

Copy link
Contributor

@mbutrovich mbutrovich Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. #1756

val possibleDefaultValues = getExistenceDefaultValues(scanExec.requiredSchema)
if (possibleDefaultValues.exists(d => {
d != null && (d.isInstanceOf[ArrayBasedMapData] || d
Expand All @@ -170,32 +158,73 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
// and arrays respectively.
withInfo(
scanExec,
"Full native scan disabled because nested types for default values are not supported")
}

if (encryptionEnabled(hadoopConf) && scanImpl != CometConf.SCAN_NATIVE_COMET) {
if (!isEncryptionConfigSupported(hadoopConf)) {
withInfo(scanExec, s"$scanImpl does not support encryption")
}
"Full native scan disabled because default values for nested types are not supported")
return scanExec
}

// check that schema is supported
checkSchema(scanExec, scanImpl, r)

if (hasExplainInfo(scanExec)) {
// could not accelerate, and plan is already tagged with fallback reasons
scanExec
} else {
// this is confusing, but we always insert a CometScanExec here, which may replaced
// with a CometNativeExec when CometExecRule runs, depending on the scanImpl value.
CometScanExec(scanExec, session, scanImpl)
COMET_NATIVE_SCAN_IMPL.get() match {
case SCAN_AUTO =>
// TODO add support for native_datafusion in the future
nativeIcebergCompatScan(session, scanExec, r, hadoopConf)
.orElse(nativeCometScan(session, scanExec, r, hadoopConf))
.getOrElse(scanExec)
case SCAN_NATIVE_DATAFUSION =>
nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
case SCAN_NATIVE_ICEBERG_COMPAT =>
nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
case SCAN_NATIVE_COMET =>
nativeCometScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
}

case _ =>
withInfo(scanExec, s"Unsupported relation ${scanExec.relation}")
}
}

private def nativeDataFusionScan(
session: SparkSession,
scanExec: FileSourceScanExec,
r: HadoopFsRelation,
hadoopConf: Configuration): Option[SparkPlan] = {
if (!CometNativeScan.isSupported(scanExec)) {
return None
}
if (encryptionEnabled(hadoopConf) && !isEncryptionConfigSupported(hadoopConf)) {
withInfo(scanExec, s"$SCAN_NATIVE_DATAFUSION does not support encryption")
return None
}
if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) {
return None
}
Some(CometScanExec(scanExec, session, SCAN_NATIVE_DATAFUSION))
}

private def nativeIcebergCompatScan(
session: SparkSession,
scanExec: FileSourceScanExec,
r: HadoopFsRelation,
hadoopConf: Configuration): Option[SparkPlan] = {
if (encryptionEnabled(hadoopConf) && !isEncryptionConfigSupported(hadoopConf)) {
withInfo(scanExec, s"$SCAN_NATIVE_ICEBERG_COMPAT does not support encryption")
return None
}
if (!isSchemaSupported(scanExec, SCAN_NATIVE_ICEBERG_COMPAT, r)) {
return None
}
Some(CometScanExec(scanExec, session, SCAN_NATIVE_ICEBERG_COMPAT))
}

private def nativeCometScan(
session: SparkSession,
scanExec: FileSourceScanExec,
r: HadoopFsRelation,
hadoopConf: Configuration): Option[SparkPlan] = {
if (!isSchemaSupported(scanExec, SCAN_NATIVE_COMET, r)) {
return None
}
Some(CometScanExec(scanExec, session, SCAN_NATIVE_COMET))
}

private def transformV2Scan(scanExec: BatchScanExec): SparkPlan = {

scanExec.scan match {
Expand Down Expand Up @@ -612,20 +641,32 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
private def isDynamicPruningFilter(e: Expression): Boolean =
e.exists(_.isInstanceOf[PlanExpression[_]])

def checkSchema(scanExec: FileSourceScanExec, scanImpl: String, r: HadoopFsRelation): Unit = {
private def isSchemaSupported(
scanExec: FileSourceScanExec,
scanImpl: String,
r: HadoopFsRelation): Boolean = {
val fallbackReasons = new ListBuffer[String]()
val typeChecker = CometScanTypeChecker(scanImpl)
val schemaSupported =
typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
if (!schemaSupported) {
withInfo(scanExec, s"Unsupported schema ${scanExec.requiredSchema} for $scanImpl")
withInfo(
scanExec,
s"Unsupported schema ${scanExec.requiredSchema} " +
s"for $scanImpl: ${fallbackReasons.mkString(", ")}")
return false
}
val partitionSchemaSupported =
typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons)
if (!partitionSchemaSupported) {
fallbackReasons += s"Unsupported partitioning schema ${r.partitionSchema} for $scanImpl"
withInfo(
scanExec,
s"Unsupported partitioning schema ${scanExec.requiredSchema} " +
s"for $scanImpl: ${fallbackReasons
.mkString(", ")}")
return false
}
withInfos(scanExec, fallbackReasons.toSet)
true
}
}

Expand Down
Loading