From a507d0e006c8aac2b37b3b58ae1e7e17dc867970 Mon Sep 17 00:00:00 2001 From: zhtttylz Date: Fri, 8 May 2026 11:38:57 +0800 Subject: [PATCH 1/3] [AURON #2257] Avoid URI reparsing in JNI Hadoop paths --- auron-core/pom.xml | 6 ++ .../java/org/apache/auron/jni/JniBridge.java | 6 +- .../org/apache/auron/jni/JniBridgeTest.java | 89 +++++++++++++++++++ 3 files changed, 97 insertions(+), 4 deletions(-) create mode 100644 auron-core/src/test/java/org/apache/auron/jni/JniBridgeTest.java diff --git a/auron-core/pom.xml b/auron-core/pom.xml index 414553e31..8565aeb7e 100644 --- a/auron-core/pom.xml +++ b/auron-core/pom.xml @@ -77,6 +77,12 @@ provided + + org.apache.hadoop + hadoop-client-runtime + test + + org.junit.jupiter junit-jupiter-api diff --git a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java index 48f10ab01..1f145be3f 100644 --- a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java +++ b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.lang.management.BufferPoolMXBean; import java.lang.management.ManagementFactory; -import java.net.URI; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -71,12 +70,11 @@ public static void putResource(String key, Object value) { } public static FSDataInputWrapper openFileAsDataInputWrapper(FileSystem fs, String path) throws Exception { - // the path is a URI string, so we need to convert it to a URI object - return FSDataInputWrapper.wrap(fs.open(new Path(new URI(path)))); + return FSDataInputWrapper.wrap(fs.open(new Path(path))); } public static FSDataOutputWrapper createFileAsDataOutputWrapper(FileSystem fs, String path) throws Exception { - return FSDataOutputWrapper.wrap(fs.create(new Path(new URI(path)))); + return FSDataOutputWrapper.wrap(fs.create(new Path(path))); } public static long getDirectMemoryUsed() { diff --git a/auron-core/src/test/java/org/apache/auron/jni/JniBridgeTest.java b/auron-core/src/test/java/org/apache/auron/jni/JniBridgeTest.java new file mode 100644 index 000000000..1d52e47bd --- /dev/null +++ b/auron-core/src/test/java/org/apache/auron/jni/JniBridgeTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.jni; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.jupiter.api.Test; + +public class JniBridgeTest { + + @Test + public void testFileWrappersPreserveLiteralHashInHdfsPath() throws Exception { + String path = "hdfs://mycluster/tmp/channel=wx_repro#mini/spark-submit-123/part-00000.json"; + CapturingFileSystem cfs = new CapturingFileSystem(); + + JniBridge.openFileAsDataInputWrapper(cfs, path).close(); + JniBridge.createFileAsDataOutputWrapper(cfs, path).close(); + + assertPathPreservesHash(cfs.openedPath); + assertPathPreservesHash(cfs.createdPath); + } + + private static void assertPathPreservesHash(Path path) { + assertEquals( + "/tmp/channel=wx_repro#mini/spark-submit-123/part-00000.json", + path.toUri().getPath()); + assertNull(path.toUri().getFragment()); + } + + private static class CapturingFileSystem extends RawLocalFileSystem { + private final Statistics statistics = new Statistics("hdfs"); + private Path openedPath; + private Path createdPath; + + @Override + public FSDataInputStream open(Path path) throws IOException { + openedPath = path; + return new FSDataInputStream(new EmptyFSInputStream()); + } + + @Override + public FSDataOutputStream create(Path path) throws IOException { + createdPath = path; + return new FSDataOutputStream(new ByteArrayOutputStream(), statistics); + } + } + + private static class EmptyFSInputStream extends FSInputStream { + @Override + public void seek(long pos) {} + + @Override + public long getPos() { + return 0; + } + + @Override + public boolean seekToNewSource(long targetPos) { + return false; + } + + @Override + public int read() { + return -1; + } + } +} From 0c4fc476fcf8a9252f2a933c7c4544b6627d8ce8 Mon Sep 17 00:00:00 2001 From: zhtttylz Date: Thu, 14 May 2026 14:20:54 +0800 Subject: [PATCH 2/3] Fix JNI Hadoop path decoding --- .../java/org/apache/auron/jni/JniBridge.java | 9 ++++++++- .../org/apache/auron/jni/JniBridgeTest.java | 17 +++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java index 1f145be3f..df2c8dd6a 100644 --- a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java +++ b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.lang.management.BufferPoolMXBean; import java.lang.management.ManagementFactory; +import java.net.URI; +import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -70,13 +72,18 @@ public static void putResource(String key, Object value) { } public static FSDataInputWrapper openFileAsDataInputWrapper(FileSystem fs, String path) throws Exception { - return FSDataInputWrapper.wrap(fs.open(new Path(path))); + return FSDataInputWrapper.wrap(fs.open(toInputPath(path))); } public static FSDataOutputWrapper createFileAsDataOutputWrapper(FileSystem fs, String path) throws Exception { return FSDataOutputWrapper.wrap(fs.create(new Path(path))); } + private static Path toInputPath(String path) throws URISyntaxException { + String safePath = path.indexOf('#') >= 0 ? path.replace("#", "%23") : path; + return new Path(new URI(safePath)); + } + public static long getDirectMemoryUsed() { return directMXBeans.stream() .mapToLong(BufferPoolMXBean::getTotalCapacity) diff --git a/auron-core/src/test/java/org/apache/auron/jni/JniBridgeTest.java b/auron-core/src/test/java/org/apache/auron/jni/JniBridgeTest.java index 1d52e47bd..5f8128393 100644 --- a/auron-core/src/test/java/org/apache/auron/jni/JniBridgeTest.java +++ b/auron-core/src/test/java/org/apache/auron/jni/JniBridgeTest.java @@ -42,6 +42,23 @@ public void testFileWrappersPreserveLiteralHashInHdfsPath() throws Exception { assertPathPreservesHash(cfs.createdPath); } + @Test + public void testFileWrappersHandleReadUriAndWriteRawPercentPaths() throws Exception { + String readPath = "file:/tmp/t1/part=test%2520test/part-00000.parquet"; + String writePath = "file:/tmp/t1/part=test%20test/part-00001.parquet"; + CapturingFileSystem cfs = new CapturingFileSystem(); + + JniBridge.openFileAsDataInputWrapper(cfs, readPath).close(); + JniBridge.createFileAsDataOutputWrapper(cfs, writePath).close(); + + assertEquals( + "/tmp/t1/part=test%20test/part-00000.parquet", + cfs.openedPath.toUri().getPath()); + assertEquals( + "/tmp/t1/part=test%20test/part-00001.parquet", + cfs.createdPath.toUri().getPath()); + } + private static void assertPathPreservesHash(Path path) { assertEquals( "/tmp/channel=wx_repro#mini/spark-submit-123/part-00000.json", From a759cdbbb91cd32c74e092b824023305bf162e6d Mon Sep 17 00:00:00 2001 From: zhtttylz Date: Tue, 26 May 2026 20:12:30 +0800 Subject: [PATCH 3/3] Fix JNI Hadoop path handling --- .../java/org/apache/auron/jni/JniBridge.java | 9 +------ .../org/apache/auron/jni/JniBridgeTest.java | 25 ++++++++++++++----- .../apache/spark/sql/auron/ShimsImpl.scala | 16 ++++++++++++ .../org/apache/spark/sql/auron/Shims.scala | 2 ++ .../auron/plan/NativeFileSourceScanBase.scala | 7 +++--- 5 files changed, 41 insertions(+), 18 deletions(-) diff --git a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java index df2c8dd6a..1f145be3f 100644 --- a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java +++ b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java @@ -19,8 +19,6 @@ import java.io.IOException; import java.lang.management.BufferPoolMXBean; import java.lang.management.ManagementFactory; -import java.net.URI; -import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -72,18 +70,13 @@ public static void putResource(String key, Object value) { } public static FSDataInputWrapper openFileAsDataInputWrapper(FileSystem fs, String path) throws Exception { - return FSDataInputWrapper.wrap(fs.open(toInputPath(path))); + return FSDataInputWrapper.wrap(fs.open(new Path(path))); } public static FSDataOutputWrapper createFileAsDataOutputWrapper(FileSystem fs, String path) throws Exception { return FSDataOutputWrapper.wrap(fs.create(new Path(path))); } - private static Path toInputPath(String path) throws URISyntaxException { - String safePath = path.indexOf('#') >= 0 ? path.replace("#", "%23") : path; - return new Path(new URI(safePath)); - } - public static long getDirectMemoryUsed() { return directMXBeans.stream() .mapToLong(BufferPoolMXBean::getTotalCapacity) diff --git a/auron-core/src/test/java/org/apache/auron/jni/JniBridgeTest.java b/auron-core/src/test/java/org/apache/auron/jni/JniBridgeTest.java index 5f8128393..05489643d 100644 --- a/auron-core/src/test/java/org/apache/auron/jni/JniBridgeTest.java +++ b/auron-core/src/test/java/org/apache/auron/jni/JniBridgeTest.java @@ -18,9 +18,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSInputStream; @@ -43,22 +46,32 @@ public void testFileWrappersPreserveLiteralHashInHdfsPath() throws Exception { } @Test - public void testFileWrappersHandleReadUriAndWriteRawPercentPaths() throws Exception { - String readPath = "file:/tmp/t1/part=test%2520test/part-00000.parquet"; - String writePath = "file:/tmp/t1/part=test%20test/part-00001.parquet"; + public void testFileWrappersPreserveNormalizedPercentPathStrings() throws Exception { + String path = "file:/tmp/t1/part=test%20test/part-00000.parquet"; CapturingFileSystem cfs = new CapturingFileSystem(); - JniBridge.openFileAsDataInputWrapper(cfs, readPath).close(); - JniBridge.createFileAsDataOutputWrapper(cfs, writePath).close(); + JniBridge.openFileAsDataInputWrapper(cfs, path).close(); + JniBridge.createFileAsDataOutputWrapper(cfs, path).close(); assertEquals( "/tmp/t1/part=test%20test/part-00000.parquet", cfs.openedPath.toUri().getPath()); assertEquals( - "/tmp/t1/part=test%20test/part-00001.parquet", + "/tmp/t1/part=test%20test/part-00000.parquet", cfs.createdPath.toUri().getPath()); } + @Test + public void testHadoopPathUriAcceptsFilePathWithSpace() throws Exception { + String path = "file:/tmp/t1/part=test test/part-00000.parquet"; + + assertThrows(URISyntaxException.class, () -> new URI(path)); + assertEquals("file", new Path(path).toUri().getScheme()); + assertEquals( + "/tmp/t1/part=test test/part-00000.parquet", + new Path(path).toUri().getPath()); + } + private static void assertPathPreservesHash(Path path) { assertEquals( "/tmp/channel=wx_repro#mini/spark-submit-123/part-00000.json", diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala index 0e8a2c8e7..607882e0b 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala @@ -971,6 +971,18 @@ class ShimsImpl extends Shims with Logging { size: Long): PartitionedFile = PartitionedFile(partitionValues, filePath, offset, size) + @sparkver("3.0 / 3.1 / 3.2 / 3.3") + override def getPartitionedFilePathString(file: PartitionedFile): String = { + import org.apache.hadoop.fs.Path + import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils + + // Spark 3.0-3.3 PartitionedFile.filePath is URI-encoded. + file.filePath + .split(Path.SEPARATOR, -1) + .map(ExternalCatalogUtils.unescapePathName) + .mkString(Path.SEPARATOR) + } + @sparkver("3.4 / 3.5 / 4.0 / 4.1") override def getPartitionedFile( partitionValues: InternalRow, @@ -982,6 +994,10 @@ class ShimsImpl extends Shims with Logging { PartitionedFile(partitionValues, SparkPath.fromPath(new Path(filePath)), offset, size) } + @sparkver("3.4 / 3.5 / 4.0 / 4.1") + override def getPartitionedFilePathString(file: PartitionedFile): String = + file.toPath.toString + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5 / 4.0 / 4.1") override def getMinPartitionNum(sparkSession: SparkSession): Int = sparkSession.sessionState.conf.filesMinPartitionNum diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala index 19f98b415..896a8d4cd 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala @@ -272,6 +272,8 @@ abstract class Shims { offset: Long, size: Long): PartitionedFile + def getPartitionedFilePathString(file: PartitionedFile): String + def getMinPartitionNum(sparkSession: SparkSession): Int @nowarn("cat=unused") // Some params temporarily unused diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFileSourceScanBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFileSourceScanBase.scala index 84e886060..bf038184a 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFileSourceScanBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFileSourceScanBase.scala @@ -16,13 +16,12 @@ */ package org.apache.spark.sql.execution.auron.plan -import java.net.URI import java.security.PrivilegedExceptionAction import scala.jdk.CollectionConverters._ import org.apache.commons.lang3.reflect.MethodUtils -import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.MapPartitionsRDD import org.apache.spark.sql.auron.NativeConverters @@ -114,7 +113,7 @@ abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec) } pb.PartitionedFile .newBuilder() - .setPath(s"${file.filePath}") + .setPath(Shims.get.getPartitionedFilePathString(file)) .setSize(fileSizes(file.filePath)) .addAllPartitionValues(nativePartitionValues.asJava) .setLastModifiedNs(0) @@ -149,7 +148,7 @@ abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec) val currentTimeMillis = System.currentTimeMillis() val fs = NativeHelper.currentUser.doAs(new PrivilegedExceptionAction[FileSystem] { override def run(): FileSystem = { - FileSystem.get(new URI(location), sharedConf) + FileSystem.get(new Path(location).toUri, sharedConf) } }) getFsTimeMetric.add((System.currentTimeMillis() - currentTimeMillis) * 1000000)