Skip to content

[SPARK-55886][SQL][PYTHON] Add DataFrame.zip for merging column-projected DataFrames#54976

Draft
zhengruifeng wants to merge 3 commits into
apache:masterfrom
zhengruifeng:df-zip
Draft

[SPARK-55886][SQL][PYTHON] Add DataFrame.zip for merging column-projected DataFrames#54976
zhengruifeng wants to merge 3 commits into
apache:masterfrom
zhengruifeng:df-zip

Conversation

@zhengruifeng
Copy link
Copy Markdown
Contributor

@zhengruifeng zhengruifeng commented Mar 24, 2026

What changes were proposed in this pull request?

Add a new DataFrame.zip(other) API that combines the columns of two DataFrames side-by-side without a join key by reusing the shared base plan rather than emitting a relational join.

  • Logical plan: introduces Zip(left, right), always unresolved, with a ZIP tree pattern.
  • Analyzer: a new ResolveZip rule walks both sides, strips Project chains, and -- when the two bases produce the same result -- rewrites the Zip into a single Project over that shared base. The rule rejects non-scalar Python UDFs (e.g. GROUPED_MAP) that would break the 1:1 row mapping.
  • CheckAnalysis: if ResolveZip cannot merge (different base plans), the surviving Zip triggers a new ZIP_PLANS_NOT_MERGEABLE error (sqlState 42K03).
  • Scala API: Dataset.zip declared in sql/api and implemented in the classic Dataset; Spark Connect throws UnsupportedOperationException (planned for a follow-up).
  • PySpark: abstract DataFrame.zip on the parent class, classic implementation delegating to the JVM via _jdf.zip, Connect raises PySparkNotImplementedError. New entry in the API reference index.

Why are the changes needed?

RDD.zip is a natural way to project two views of the same data and recombine them row-for-row. There has been no DataFrame equivalent: users porting that pattern have to fall back to a join on a synthetic row id, or recompute the source and select both column sets, which adds unnecessary work to the plan (a shuffle/join, or duplicated source evaluation) when the two sides are known to be row-aligned by construction.

DataFrame.zip lifts the RDD pattern to the DataFrame API. Because the analyzer rewrites the operator into a single Project over the shared base, the operation is free at runtime: no join, no extra scan, no shuffle.

Side-by-side:

# RDD: rdd.zip lines up two row-aligned projections of the same source.
square  = lambda x: x * x
is_even = lambda x: x % 2 == 0

rdd   = sc.range(10)
rdd1  = rdd.map(square)
rdd2  = rdd.map(is_even)
zipped_rdd = rdd1.zip(rdd2).collect()
# [(0, True), (1, False), (4, True), (9, False), (16, True),
#  (25, False), (36, True), (49, False), (64, True), (81, False)]

# DataFrame: the same pattern, now expressible directly.
square_udf  = sf.udf(square,  LongType())
is_even_udf = sf.udf(is_even, BooleanType())

df  = spark.range(10)
df1 = df.select(square_udf("id").alias("square"))
df2 = df.select(is_even_udf("id").alias("is_even"))
df1.zip(df2).show()
# +------+-------+
# |square|is_even|
# +------+-------+
# |     0|   true|
# |     1|  false|
# |     4|   true|
# |     9|  false|
# |    16|   true|
# |    25|  false|
# |    36|   true|
# |    49|  false|
# |    64|   true|
# |    81|  false|
# +------+-------+

Additional patterns this enables:

  • Computing a derived column on one branch and aligning it with a derived column from the same source.
  • Splitting a single transformation into independently named sub-DataFrames and recombining them.

Does this PR introduce any user-facing change?

Yes. New public API:

  • Scala: Dataset.zip(other: Dataset[_]): DataFrame, @since 4.3.0.
  • PySpark: DataFrame.zip(other), versionadded:: 4.3.0.

How was this patch tested?

  • New ResolveZipSuite (catalyst) covering the analyzer rewrite: matching bases, mismatched bases, expression projections, partial Project on one side, unresolved children, longer chains of Projects, alias composition through chains, stacked withColumn-style projections, and the ZIP_PLANS_NOT_MERGEABLE CheckAnalysis path.
  • New DataFrameZipSuite (sql/core) covering end-to-end results, the "resolved plan is a single Project" invariant, withColumn/withColumnRenamed on both sides, longer chains, parent-with-chained-child, and the two main error cases (unrelated DataFrames, spark.range sources).
  • New python/pyspark/sql/tests/test_zip.py mirroring the Scala suite plus scalar Python UDF and pandas UDF cases.
  • MIMA exclusion added for the new abstract method on Dataset.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

@zhengruifeng zhengruifeng changed the title [DO-NOT-REVIEW][SPARK-55886][SQL] Add DataFrame.zip for merging column-projected DataFrames [WIP][DO-NOT-REVIEW][SPARK-55886][SQL] Add DataFrame.zip for merging column-projected DataFrames Mar 24, 2026
@zhengruifeng zhengruifeng force-pushed the df-zip branch 2 times, most recently from 4a909d5 to eec3de6 Compare April 10, 2026 05:59
@zhengruifeng zhengruifeng force-pushed the df-zip branch 2 times, most recently from 158319b to c2cc51f Compare May 14, 2026 11:42
@zhengruifeng zhengruifeng changed the title [WIP][DO-NOT-REVIEW][SPARK-55886][SQL] Add DataFrame.zip for merging column-projected DataFrames [SPARK-55886][SQL] Add DataFrame.zip for merging column-projected DataFrames May 19, 2026
@zhengruifeng zhengruifeng changed the title [SPARK-55886][SQL] Add DataFrame.zip for merging column-projected DataFrames [SPARK-55886][SQL][PYTHON] Add DataFrame.zip for merging column-projected DataFrames May 19, 2026
ResolveZip previously stripped at most one Project layer on each side
before comparing bases with `sameResult`. CheckAnalysis on the other
hand stripped Project chains recursively, so plans like
`df.select(...).select(...).zip(df.select(...))` were:
- not rewritten by ResolveZip (one-level strip leaves mismatched bases),
- silently accepted by CheckAnalysis (recursive strip matches),
- and then crashed at execution because Zip stays unresolved.

Make `extractProjectAndBase` recursive: walk down nested Projects while
composing alias substitutions so each layer's output references are
rewritten back to the original base attributes. Bare Attribute
references to an inner alias are wrapped in a fresh Alias to preserve
the outer name/exprId; other expressions get their inner-alias
attributes inlined via `transform`.

Tests:
- ResolveZipSuite: longer chain, aliased chain (verifies the resolved
  Project sits directly on the shared base), stacked withColumn-style.
- DataFrameZipSuite: withColumn on both sides, chained withColumn,
  longer chain, parent-with-chained-child, withColumnRenamed.
- test_zip.py: parallel PySpark coverage of the same scenarios.

Generated-by: Claude Code
The connect doctest harness in pyspark.sql.connect.dataframe._test runs
doctest.testmod against pyspark.sql.dataframe under a connect session.
The Examples block in DataFrame.zip calls .show(), which invokes the
connect zip stub and raises NOT_IMPLEMENTED, failing the build.

Match the existing pattern used for toJSON/rdd and drop the parent
docstring before testmod runs.

Generated-by: Claude Code
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant