Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ on:
java:
required: false
type: string
default: 17
default: 21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The modifications made to this file need to be reverted before merging.

branch:
description: Branch to run the build against
required: false
Expand Down Expand Up @@ -374,7 +374,7 @@ jobs:
# Hive "other tests" test needs larger metaspace size based on experiment.
if [[ "$MODULES_TO_TEST" == "hive" ]] && [[ "$EXCLUDED_TAGS" == "org.apache.spark.tags.SlowHiveTest" ]]; then export METASPACE_SIZE=2g; fi
# SPARK-46283: should delete the following env replacement after SPARK 3.x EOL
if [[ "$MODULES_TO_TEST" == *"streaming-kinesis-asl"* ]] && [[ "${{ inputs.branch }}" =~ ^branch-3 ]]; then
if [[ "$MODULES_TO_TEST" == *"streaming-kinesis-asl"* ]] && [[ "${{ inputs.branch }}" =~ ^branch-3 ]]; then
MODULES_TO_TEST=${MODULES_TO_TEST//streaming-kinesis-asl, /}
fi
export SERIAL_SBT_TESTS=1
Expand Down
199 changes: 172 additions & 27 deletions common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private[spark] object ClosureCleaner extends Logging {
obj: AnyRef,
outerClass: Class[_],
accessedFields: Map[Class[_], Set[String]]): AnyRef = {
val clone = instantiateClass(outerClass, parent)
val clone = instantiateClass(outerClass, parent, "$outer")

var currentClass = outerClass
assert(currentClass != null, "The outer class can't be null.")
Expand Down Expand Up @@ -263,6 +263,73 @@ private[spark] object ClosureCleaner extends Logging {
true
}

/**
* Clean the given closure and return a new closure with only accessed fields.
* This method is designed for Java 21+ where direct field modification is not allowed.
*
* @param func the closure to clean
* @param cleanTransitively whether to clean enclosing closures transitively
* @param accessedFields a map from a class to a set of its fields that are accessed by
* the starting closure
* @return a new closure with only accessed fields
*/
private[spark] def cleanAndRet(
func: AnyRef,
cleanTransitively: Boolean,
accessedFields: Map[Class[_], Set[String]]): AnyRef = {
// Only handle indylambda closures for Java 21+
val maybeIndylambdaProxy = IndylambdaScalaClosures.getSerializationProxy(func)

if (!isClosure(func.getClass) && maybeIndylambdaProxy.isEmpty) {
logDebug(s"Expected a closure; got ${func.getClass.getName}")
return func
}

if (func == null) {
return func
}

if (maybeIndylambdaProxy.isEmpty) {
// For non-indylambda closures, return original func (not supported in Java 21+)
logWarning(s"Non-indylambda closure not supported in Java 21+: ${func.getClass.getName}")
return func
}

val lambdaProxy = maybeIndylambdaProxy.get
val implMethodName = lambdaProxy.getImplMethodName

logDebug(s"Cloning indylambda closure and returning the new cleaned closure: $implMethodName")

// capturing class is the class that declared this lambda
val capturingClassName = lambdaProxy.getCapturingClass.replace('/', '.')
val classLoader = func.getClass.getClassLoader
// scalastyle:off classforname
val capturingClass = Class.forName(capturingClassName, false, classLoader)
// scalastyle:on classforname

// Fail fast if we detect return statements in closures
val capturingClassReader = getClassReader(capturingClass)
capturingClassReader.accept(new ReturnStatementFinder(Option(implMethodName)), 0)

val outerThis = if (lambdaProxy.getCapturedArgCount > 0) {
// only need to clean when there is an enclosing non-null "this" captured by the closure
Option(lambdaProxy.getCapturedArg(0)).getOrElse(return func)
} else {
return func
}

val isClosureDeclaredInScalaRepl = capturingClassName.startsWith("$line") &&
capturingClassName.endsWith("$iw")
if (isClosureDeclaredInScalaRepl && outerThis.getClass.getName == capturingClassName) {
assert(accessedFields.isEmpty)
cleanupScalaReplClosureAndRet(func, lambdaProxy, outerThis, cleanTransitively)
} else {
// For other closures, return original func
logWarning(s"Unsupported closure type for Java 21+: ${func.getClass.getName}")
func
}
}

/**
* Cleans non-indylambda closure in place
*
Expand Down Expand Up @@ -390,17 +457,21 @@ private[spark] object ClosureCleaner extends Logging {
}

/**
* Null out fields of enclosing class which are not actually accessed by a closure
* @param func the starting closure to clean
* @param lambdaProxy starting closure proxy
* @param outerThis lambda enclosing class
* @param cleanTransitively whether to clean enclosing closures transitively
* Clone outer this for Scala REPL closure with only accessed fields.
* This is the common logic shared between cleanupScalaReplClosure and
* cleanupScalaReplClosureAndRet.
*
* @param func the closure to clean
* @param lambdaProxy lambda proxy
* @param outerThis outer this object
* @param cleanTransitively whether to clean transitively
* @return outer this (either original or cloned) with only accessed fields
*/
private def cleanupScalaReplClosure(
private def cloneOuterThisForScalaReplClosure(
func: AnyRef,
lambdaProxy: SerializedLambda,
outerThis: AnyRef,
cleanTransitively: Boolean): Unit = {
cleanTransitively: Boolean): AnyRef = {

val capturingClass = outerThis.getClass
val accessedFields: Map[Class[_], Set[String]] = Map.empty
Expand All @@ -422,16 +493,59 @@ private[spark] object ClosureCleaner extends Logging {
logDebug(s" + cloning instance of REPL class ${capturingClass.getName}")
val clonedOuterThis = cloneAndSetFields(
parent = null, outerThis, capturingClass, accessedFields)

val outerField = func.getClass.getDeclaredField("arg$1")
// SPARK-37072: When Java 17 is used and `outerField` is read-only,
// the content of `outerField` cannot be set by reflect api directly.
// But we can remove the `final` modifier of `outerField` before set value
// and reset the modifier after set value.
setFieldAndIgnoreModifiers(func, outerField, clonedOuterThis)
clonedOuterThis
} else {
logDebug(s"All fields are accessed, return original outerThis")
outerThis
}
}

/**
* Null out fields of enclosing class which are not actually accessed by a closure
* @param func the starting closure to clean
* @param lambdaProxy starting closure proxy
* @param outerThis lambda enclosing class
* @param cleanTransitively whether to clean enclosing closures transitively
*/
private def cleanupScalaReplClosure(
func: AnyRef,
lambdaProxy: SerializedLambda,
outerThis: AnyRef,
cleanTransitively: Boolean): Unit = {

val outerThisToUse = cloneOuterThisForScalaReplClosure(
func, lambdaProxy, outerThis, cleanTransitively)
val outerField = func.getClass.getDeclaredField("arg$1")
// SPARK-37072: When Java 17 is used and `outerField` is read-only,
// the content of `outerField` cannot be set by reflect api directly.
// But we can remove the `final` modifier of `outerField` before set value
// and reset the modifier after set value.
setFieldAndIgnoreModifiers(func, outerField, outerThisToUse)
}

/**
* Clean Scala REPL closure and return a new closure with only accessed fields.
* This is the Java 21+ version that returns a new closure instead of modifying the original.
*/
private def cleanupScalaReplClosureAndRet(
func: AnyRef,
lambdaProxy: SerializedLambda,
outerThis: AnyRef,
cleanTransitively: Boolean): AnyRef = {

val outerThisToUse = cloneOuterThisForScalaReplClosure(
func, lambdaProxy, outerThis, cleanTransitively)

if (outerThisToUse ne outerThis) {
// Fields were cleaned, create a new closure instance with the cleaned outer this
val newFunc = instantiateClass(func.getClass, outerThisToUse, "arg$1")
logDebug(s" +++ Scala REPL closure is now cleaned and returned +++")
newFunc
} else {
logDebug(s"No fields to clean, return original func")
func
}
}

/**
* Cleans up Ammonite closures and nulls out fields captured from cmd & cmd$Helper objects
Expand Down Expand Up @@ -501,7 +615,7 @@ private[spark] object ClosureCleaner extends Logging {
val cmdClones = Map[Class[_], AnyRef]()
for ((cmdClass, _) <- ammCmdInstances if !cmdClass.getName.contains("Helper")) {
logDebug(s" + Cloning instance of Ammonite command class ${cmdClass.getName}")
cmdClones(cmdClass) = instantiateClass(cmdClass, enclosingObject = null)
cmdClones(cmdClass) = instantiateClass(cmdClass, enclosingObject = null, "$outer")
}
for ((cmdHelperClass, cmdHelperInstance) <- ammCmdInstances
if cmdHelperClass.getName.contains("Helper")) {
Expand All @@ -514,7 +628,7 @@ private[spark] object ClosureCleaner extends Logging {
val outerClone = cmdHelperOuter.flatMap(o => cmdClones.get(o.getClass)).orNull
logDebug(s" + Cloning instance of Ammonite command helper class ${cmdHelperClass.getName}")
cmdClones(cmdHelperClass) =
instantiateClass(cmdHelperClass, enclosingObject = outerClone)
instantiateClass(cmdHelperClass, enclosingObject = outerClone, "$outer")
}

// set accessed fields
Expand Down Expand Up @@ -583,18 +697,49 @@ private[spark] object ClosureCleaner extends Logging {
} else None
}

private def instantiateClass(cls: Class[_], enclosingObject: AnyRef): AnyRef = {
// Use reflection to instantiate object without calling constructor
val rf = sun.reflect.ReflectionFactory.getReflectionFactory()
val parentCtor = classOf[java.lang.Object].getDeclaredConstructor()
val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
val obj = newCtor.newInstance().asInstanceOf[AnyRef]
private def instantiateClass(
cls: Class[_], enclosingObject: AnyRef, fieldName: String): AnyRef = {
if (enclosingObject != null) {
val field = cls.getDeclaredField("$outer")
field.setAccessible(true)
field.set(obj, enclosingObject)
// Log all available constructors for debugging
val constructors = cls.getDeclaredConstructors
logDebug(s"Available constructors for ${cls.getName}:")
constructors.foreach { ctor =>
val paramTypes = ctor.getParameterTypes.map(_.getSimpleName).mkString(", ")
logDebug(s" - ${ctor.getName}($paramTypes)")
}

// Try to find a constructor that takes the enclosing object as parameter
val field = cls.getDeclaredField(fieldName)
val fieldType = field.getType
logDebug(s"Looking for constructor with parameter type: ${fieldType.getSimpleName}")

try {
val constructor = cls.getDeclaredConstructor(fieldType)
constructor.setAccessible(true)
logDebug(s"Found matching constructor: ${constructor.getName}(${fieldType.getSimpleName})")
constructor.newInstance(enclosingObject).asInstanceOf[AnyRef]
} catch {
case _: NoSuchMethodException =>
logDebug(s"No constructor found with parameter type ${fieldType.getSimpleName}, " +
s"falling back to serialization constructor")
// Fallback to serialization constructor if no suitable constructor found
val rf = sun.reflect.ReflectionFactory.getReflectionFactory()
val parentCtor = classOf[java.lang.Object].getDeclaredConstructor()
val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
val obj = newCtor.newInstance().asInstanceOf[AnyRef]
// Try to set field using setFieldAndIgnoreModifiers for Java 17+ compatibility
val field = cls.getDeclaredField(fieldName)
setFieldAndIgnoreModifiers(obj, field, enclosingObject)
obj
}
} else {
logDebug(s"No enclosing object provided for ${cls.getName}, using serialization constructor")
// Use serialization constructor when no enclosing object
val rf = sun.reflect.ReflectionFactory.getReflectionFactory()
val parentCtor = classOf[java.lang.Object].getDeclaredConstructor()
val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
newCtor.newInstance().asInstanceOf[AnyRef]
}
obj
}
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2840,8 +2840,12 @@ class SparkContext(config: SparkConf) extends Logging {
* @return the cleaned closure
*/
private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
SparkClosureCleaner.clean(f, checkSerializable)
f
if (Runtime.version().feature() > 21) {
SparkClosureCleaner.cleanAndRet(f, checkSerializable)
} else {
SparkClosureCleaner.clean(f, checkSerializable)
f
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,28 @@ private[spark] object SparkClosureCleaner {
}
}
}

/**
* Clean the given closure and return a new closure with only accessed fields.
* This method is designed for Java 21+ where direct field modification is not allowed.
*
* @param closure the closure to clean
* @param checkSerializable whether to verify that the closure is serializable after cleaning
* @param cleanTransitively whether to clean enclosing closures transitively
* @return a new closure with only accessed fields
*/
def cleanAndRet[T <: AnyRef](
closure: T,
checkSerializable: Boolean = true,
cleanTransitively: Boolean = true): T = {
val cleanedClosure = ClosureCleaner.cleanAndRet(closure, cleanTransitively, mutable.Map.empty)
try {
if (checkSerializable && SparkEnv.get != null) {
SparkEnv.get.closureSerializer.newInstance().serialize(cleanedClosure)
}
} catch {
case ex: Exception => throw new SparkException("Task not serializable", ex)
}
cleanedClosure.asInstanceOf[T]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public class JavaModuleOptions {
"--add-opens=java.base/sun.security.action=ALL-UNNAMED",
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED",
"--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED",
"-Djdk.reflect.useDirectMethodHandle=false",
"-Dio.netty.tryReflectionSetAccessible=true",
"--enable-native-access=ALL-UNNAMED"};

Expand Down
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,6 @@
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED
--add-opens=java.base/sun.security.action=ALL-UNNAMED
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
-Djdk.reflect.useDirectMethodHandle=false
-Dio.netty.tryReflectionSetAccessible=true
--enable-native-access=ALL-UNNAMED
</extraJavaTestArgs>
Expand Down
1 change: 0 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1702,7 +1702,6 @@ object TestSettings {
"--add-opens=java.base/sun.nio.cs=ALL-UNNAMED",
"--add-opens=java.base/sun.security.action=ALL-UNNAMED",
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED",
"-Djdk.reflect.useDirectMethodHandle=false",
"-Dio.netty.tryReflectionSetAccessible=true",
"--enable-native-access=ALL-UNNAMED").mkString(" ")
s"-Xmx$heapSize -Xss4m -XX:MaxMetaspaceSize=$metaspaceSize -XX:ReservedCodeCacheSize=128m -Dfile.encoding=UTF-8 $extraTestJavaArgs"
Expand Down
1 change: 0 additions & 1 deletion sql/connect/bin/spark-connect-scala-client
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ JVM_ARGS="-XX:+IgnoreUnrecognizedVMOptions \
--add-opens=java.base/sun.security.action=ALL-UNNAMED \
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED \
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED \
-Djdk.reflect.useDirectMethodHandle=false \
-Dio.netty.tryReflectionSetAccessible=true \
--enable-native-access=ALL-UNNAMED"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,13 @@ object StateSpec {
def function[KeyType, ValueType, StateType, MappedType](
mappingFunction: (Time, KeyType, Option[ValueType], State[StateType]) => Option[MappedType]
): StateSpec[KeyType, ValueType, StateType, MappedType] = {
SparkClosureCleaner.clean(mappingFunction, checkSerializable = true)
new StateSpecImpl(mappingFunction)
val cleanedFunction = if (Runtime.version().feature() > 21) {
SparkClosureCleaner.cleanAndRet(mappingFunction, checkSerializable = true)
} else {
SparkClosureCleaner.clean(mappingFunction, checkSerializable = true)
mappingFunction
}
new StateSpecImpl(cleanedFunction)
}

/**
Expand All @@ -175,10 +180,15 @@ object StateSpec {
def function[KeyType, ValueType, StateType, MappedType](
mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
): StateSpec[KeyType, ValueType, StateType, MappedType] = {
SparkClosureCleaner.clean(mappingFunction, checkSerializable = true)
val cleanedFunction = if (Runtime.version().feature() > 21) {
SparkClosureCleaner.cleanAndRet(mappingFunction, checkSerializable = true)
} else {
SparkClosureCleaner.clean(mappingFunction, checkSerializable = true)
mappingFunction
}
val wrappedFunction =
(time: Time, key: KeyType, value: Option[ValueType], state: State[StateType]) => {
Some(mappingFunction(key, value, state))
Some(cleanedFunction(key, value, state))
}
new StateSpecImpl(wrappedFunction)
}
Expand Down