Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 54 additions & 2 deletions native/spark-expr/src/json_funcs/to_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,19 @@ fn array_to_json_string(arr: &Arc<dyn Array>, timezone: &str) -> Result<ArrayRef
if let Some(struct_array) = arr.as_any().downcast_ref::<StructArray>() {
struct_to_json(struct_array, timezone)
} else {
spark_cast(
let array = spark_cast(
ColumnarValue::Array(Arc::clone(arr)),
&DataType::Utf8,
&SparkCastOptions::new(EvalMode::Legacy, timezone, false),
)?
.into_array(arr.len())
.into_array(arr.len())?;

let string_array = array
.as_any()
.downcast_ref::<StringArray>()
.expect("Utf8 array");

Ok(normalize_special_floats(string_array))
}
}

Expand Down Expand Up @@ -181,6 +188,23 @@ fn escape_string(input: &str) -> String {
escaped_string
}

fn normalize_special_floats(arr: &StringArray) -> ArrayRef {
let mut builder = StringBuilder::with_capacity(arr.len(), arr.len() * 8);

for i in 0..arr.len() {
if arr.is_null(i) {
builder.append_null();
} else {
match arr.value(i) {
"Infinity" | "-Infinity" | "NaN" => builder.append_null(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't reviewed this in detail yet, but it seems odd to handle these values after they have already been converted to strings. Could the check not happen when converting float to string instead?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that handling this earlier would be preferable in general. In this case, to_json delegates primitive type handling to spark_cast, and the goal here was to avoid changing spark_cast behavior globally since it is used by other expressions where preserving "NaN" / "Infinity" string output may be expected.

Normalizing the values at the to_json layer keeps the change scoped specifically to JSON semantics while still aligning the output with Spark’s behavior.

That said, I’m happy to move the check earlier or adjust the approach if you think handling this during float-to-string conversion would be more appropriate for Comet

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andygrove for more details, run the unit tests from this pull request: #3011

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Brijesh-Thakkar and @kazantsev-maksim. I will review this PR more carefully today.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked out this branch locally and then merged the changes from #3011 and saw different results from Spark and Comet for infinity:

Spark: [{"c0":false, ... "c5":"-Infinity", ...}]
Comet: [{"c0":false, ... "c5":-Infinity, ...}]

Let's get the unit tests merged first and then pull them into this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark doesn't follow the JSON spec:

scala> spark.sql("""SELECT to_json(struct(cast('Infinity' as double) as inf, 
     |                       cast('NaN' as double) as nan))""").show(false)
+----------------------------------------------------------------------------+
|to_json(struct(CAST(Infinity AS DOUBLE) AS inf, CAST(NaN AS DOUBLE) AS nan))|
+----------------------------------------------------------------------------+
|{"inf":"Infinity","nan":"NaN"}                                              |
+----------------------------------------------------------------------------+

v => builder.append_value(v),
}
}
}

Arc::new(builder.finish())
}

fn struct_to_json(array: &StructArray, timezone: &str) -> Result<ArrayRef> {
// get field names and escape any quotes
let field_names: Vec<String> = array
Expand Down Expand Up @@ -331,6 +355,34 @@ mod test {
Ok(())
}

#[test]
fn test_to_json_infinity() -> Result<()> {
use arrow::array::{Float64Array, StructArray};
use arrow::datatypes::{DataType, Field};

let values: ArrayRef = Arc::new(Float64Array::from(vec![
Some(f64::INFINITY),
Some(f64::NEG_INFINITY),
Some(f64::NAN),
Some(1.5),
]));

let struct_array = StructArray::from(vec![(
Arc::new(Field::new("a", DataType::Float64, true)),
values,
)]);

let json = struct_to_json(&struct_array, "UTC")?;
let json = json.as_any().downcast_ref::<StringArray>().unwrap();

assert_eq!(r#"{}"#, json.value(0));
assert_eq!(r#"{}"#, json.value(1));
assert_eq!(r#"{}"#, json.value(2));
assert_eq!(r#"{"a":1.5}"#, json.value(3));

Ok(())
}

fn create_ints() -> Arc<PrimitiveArray<Int32Type>> {
Arc::new(Int32Array::from(vec![
Some(123),
Expand Down
Loading