Skip to content

Commit 2840052

Browse files
committed
Add ReflectionCache for Iceberg serialization optimization (#3456)
1 parent 45b670a commit 2840052

3 files changed

Lines changed: 503 additions & 63 deletions

File tree

spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,167 @@
1919

2020
package org.apache.comet.iceberg
2121

22+
import java.lang.reflect.Method
23+
2224
import org.apache.spark.internal.Logging
2325

26+
/**
27+
* Cache for Iceberg reflection metadata to avoid repeated class loading and method lookups.
28+
*
29+
* This cache is created once per serializePartitions() call and passed to helper methods. It
30+
* provides ~50% serialization speedup by eliminating redundant reflection operations that would
31+
* otherwise be performed per-task (tens of thousands of times for large tables).
32+
*
33+
* @param contentScanTaskClass
34+
* org.apache.iceberg.ContentScanTask
35+
* @param fileScanTaskClass
36+
* org.apache.iceberg.FileScanTask
37+
* @param contentFileClass
38+
* org.apache.iceberg.ContentFile
39+
* @param deleteFileClass
40+
* org.apache.iceberg.DeleteFile
41+
* @param schemaParserClass
42+
* org.apache.iceberg.SchemaParser
43+
* @param schemaClass
44+
* org.apache.iceberg.Schema
45+
* @param partitionSpecParserClass
46+
* org.apache.iceberg.PartitionSpecParser
47+
* @param partitionSpecClass
48+
* org.apache.iceberg.PartitionSpec
49+
* @param structLikeClass
50+
* org.apache.iceberg.StructLike
51+
* @param fileMethod
52+
* ContentScanTask.file()
53+
* @param startMethod
54+
* ContentScanTask.start()
55+
* @param lengthMethod
56+
* ContentScanTask.length()
57+
* @param partitionMethod
58+
* ContentScanTask.partition()
59+
* @param residualMethod
60+
* ContentScanTask.residual()
61+
* @param taskSchemaMethod
62+
* FileScanTask.schema()
63+
* @param deletesMethod
64+
* FileScanTask.deletes()
65+
* @param specMethod
66+
* FileScanTask.spec()
67+
* @param schemaToJsonMethod
68+
* SchemaParser.toJson(Schema)
69+
* @param specToJsonMethod
70+
* PartitionSpecParser.toJson(PartitionSpec)
71+
* @param deleteContentMethod
72+
* DeleteFile.content()
73+
* @param deleteSpecIdMethod
74+
* DeleteFile.specId()
75+
* @param deleteEqualityIdsMethod
76+
* DeleteFile.equalityFieldIds()
77+
*/
78+
case class ReflectionCache(
79+
// Iceberg classes
80+
contentScanTaskClass: Class[_],
81+
fileScanTaskClass: Class[_],
82+
contentFileClass: Class[_],
83+
deleteFileClass: Class[_],
84+
schemaParserClass: Class[_],
85+
schemaClass: Class[_],
86+
partitionSpecParserClass: Class[_],
87+
partitionSpecClass: Class[_],
88+
structLikeClass: Class[_],
89+
// ContentScanTask methods
90+
fileMethod: Method,
91+
startMethod: Method,
92+
lengthMethod: Method,
93+
partitionMethod: Method,
94+
residualMethod: Method,
95+
// FileScanTask methods
96+
taskSchemaMethod: Method,
97+
deletesMethod: Method,
98+
specMethod: Method,
99+
// Schema methods
100+
schemaToJsonMethod: Method,
101+
// PartitionSpec methods
102+
specToJsonMethod: Method,
103+
// DeleteFile methods
104+
deleteContentMethod: Method,
105+
deleteSpecIdMethod: Method,
106+
deleteEqualityIdsMethod: Method)
107+
108+
object ReflectionCache extends Logging {
109+
110+
/**
111+
* Creates a ReflectionCache by loading all Iceberg classes and methods once.
112+
*
113+
* This should be called once at the start of serializePartitions() and the cache passed to all
114+
* helper methods.
115+
*
116+
* @return
117+
* ReflectionCache with all classes and methods pre-loaded
118+
*/
119+
def create(): ReflectionCache = {
120+
// scalastyle:off classforname
121+
val contentScanTaskClass = Class.forName(IcebergReflection.ClassNames.CONTENT_SCAN_TASK)
122+
val fileScanTaskClass = Class.forName(IcebergReflection.ClassNames.FILE_SCAN_TASK)
123+
val contentFileClass = Class.forName(IcebergReflection.ClassNames.CONTENT_FILE)
124+
val deleteFileClass = Class.forName(IcebergReflection.ClassNames.DELETE_FILE)
125+
val schemaParserClass = Class.forName(IcebergReflection.ClassNames.SCHEMA_PARSER)
126+
val schemaClass = Class.forName(IcebergReflection.ClassNames.SCHEMA)
127+
val partitionSpecParserClass =
128+
Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC_PARSER)
129+
val partitionSpecClass = Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC)
130+
val structLikeClass = Class.forName(IcebergReflection.ClassNames.STRUCT_LIKE)
131+
// scalastyle:on classforname
132+
133+
// ContentScanTask methods
134+
val fileMethod = contentScanTaskClass.getMethod("file")
135+
val startMethod = contentScanTaskClass.getMethod("start")
136+
val lengthMethod = contentScanTaskClass.getMethod("length")
137+
val partitionMethod = contentScanTaskClass.getMethod("partition")
138+
val residualMethod = contentScanTaskClass.getMethod("residual")
139+
140+
// FileScanTask methods
141+
val taskSchemaMethod = fileScanTaskClass.getMethod("schema")
142+
val deletesMethod = fileScanTaskClass.getMethod("deletes")
143+
val specMethod = fileScanTaskClass.getMethod("spec")
144+
145+
// Schema methods
146+
val schemaToJsonMethod = schemaParserClass.getMethod("toJson", schemaClass)
147+
schemaToJsonMethod.setAccessible(true)
148+
149+
// PartitionSpec methods
150+
val specToJsonMethod = partitionSpecParserClass.getMethod("toJson", partitionSpecClass)
151+
152+
// DeleteFile methods
153+
val deleteContentMethod = deleteFileClass.getMethod("content")
154+
val deleteSpecIdMethod = deleteFileClass.getMethod("specId")
155+
val deleteEqualityIdsMethod = deleteFileClass.getMethod("equalityFieldIds")
156+
157+
ReflectionCache(
158+
contentScanTaskClass = contentScanTaskClass,
159+
fileScanTaskClass = fileScanTaskClass,
160+
contentFileClass = contentFileClass,
161+
deleteFileClass = deleteFileClass,
162+
schemaParserClass = schemaParserClass,
163+
schemaClass = schemaClass,
164+
partitionSpecParserClass = partitionSpecParserClass,
165+
partitionSpecClass = partitionSpecClass,
166+
structLikeClass = structLikeClass,
167+
fileMethod = fileMethod,
168+
startMethod = startMethod,
169+
lengthMethod = lengthMethod,
170+
partitionMethod = partitionMethod,
171+
residualMethod = residualMethod,
172+
taskSchemaMethod = taskSchemaMethod,
173+
deletesMethod = deletesMethod,
174+
specMethod = specMethod,
175+
schemaToJsonMethod = schemaToJsonMethod,
176+
specToJsonMethod = specToJsonMethod,
177+
deleteContentMethod = deleteContentMethod,
178+
deleteSpecIdMethod = deleteSpecIdMethod,
179+
deleteEqualityIdsMethod = deleteEqualityIdsMethod)
180+
}
181+
}
182+
24183
/**
25184
* Shared reflection utilities for Iceberg operations.
26185
*

0 commit comments

Comments
 (0)