|
27 | 27 | import java.util.ArrayList; |
28 | 28 | import java.util.List; |
29 | 29 | import java.util.Map; |
| 30 | +import java.util.Objects; |
30 | 31 | import java.util.concurrent.ConcurrentHashMap; |
31 | 32 | import net.bytebuddy.ByteBuddy; |
32 | 33 | import net.bytebuddy.description.field.FieldDescription; |
|
106 | 107 | import org.apache.beam.sdk.util.UserCodeException; |
107 | 108 | import org.apache.beam.sdk.values.TypeDescriptor; |
108 | 109 | import org.apache.beam.sdk.values.TypeDescriptors; |
| 110 | +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; |
109 | 111 | import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; |
110 | 112 | import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Primitives; |
111 | 113 | import org.checkerframework.checker.nullness.qual.MonotonicNonNull; |
@@ -166,15 +168,66 @@ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(DoFn<InputT, Ou |
166 | 168 | private static final String FN_DELEGATE_FIELD_NAME = "delegate"; |
167 | 169 |
|
168 | 170 | /** |
169 | | - * A cache of constructors of generated {@link DoFnInvoker} classes, keyed by {@link DoFn} class. |
170 | | - * Needed because generating an invoker class is expensive, and to avoid generating an excessive |
171 | | - * number of classes consuming PermGen memory. |
| 171 | + * Cache key for DoFnInvoker constructors that includes both the DoFn class and its generic type |
| 172 | + * parameters to prevent collisions when the same DoFn class is used with different generic types. |
| 173 | + */ |
| 174 | + private static final class InvokerCacheKey { |
| 175 | + private final Class<? extends DoFn<?, ?>> fnClass; |
| 176 | + private final TypeDescriptor<?> inputType; |
| 177 | + private final TypeDescriptor<?> outputType; |
| 178 | + |
| 179 | + InvokerCacheKey( |
| 180 | + Class<? extends DoFn<?, ?>> fnClass, |
| 181 | + TypeDescriptor<?> inputType, |
| 182 | + TypeDescriptor<?> outputType) { |
| 183 | + this.fnClass = fnClass; |
| 184 | + this.inputType = inputType; |
| 185 | + this.outputType = outputType; |
| 186 | + } |
| 187 | + |
| 188 | + @Override |
| 189 | + public boolean equals(@Nullable Object o) { |
| 190 | + if (this == o) { |
| 191 | + return true; |
| 192 | + } |
| 193 | + if (!(o instanceof InvokerCacheKey)) { |
| 194 | + return false; |
| 195 | + } |
| 196 | + InvokerCacheKey that = (InvokerCacheKey) o; |
| 197 | + return Objects.equals(fnClass, that.fnClass) |
| 198 | + && Objects.equals(inputType, that.inputType) |
| 199 | + && Objects.equals(outputType, that.outputType); |
| 200 | + } |
| 201 | + |
| 202 | + @Override |
| 203 | + public int hashCode() { |
| 204 | + return Objects.hash(fnClass, inputType, outputType); |
| 205 | + } |
| 206 | + |
| 207 | + @Override |
| 208 | + public String toString() { |
| 209 | + return MoreObjects.toStringHelper(this) |
| 210 | + .add("fnClass", fnClass.getName()) |
| 211 | + .add("inputType", inputType) |
| 212 | + .add("outputType", outputType) |
| 213 | + .toString(); |
| 214 | + } |
| 215 | + } |
| 216 | + |
| 217 | + /** |
| 218 | + * A cache of constructors of generated {@link DoFnInvoker} classes, keyed by {@link DoFn} class |
| 219 | + * and its generic type parameters. Needed because generating an invoker class is expensive, and |
| 220 | + * to avoid generating an excessive number of classes consuming PermGen memory. |
| 221 | + * |
| 222 | + * <p>The cache key includes generic type information to prevent collisions when the same DoFn |
| 223 | + * class is used with different generic types (e.g., MyDoFn<String> vs |
| 224 | + * MyDoFn<Integer>). |
172 | 225 | * |
173 | 226 | * <p>Note that special care must be taken to enumerate this object as concurrent hash maps are <a |
174 | 227 | * href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly>weakly |
175 | 228 | * consistent</a>. |
176 | 229 | */ |
177 | | - private final Map<Class<?>, Constructor<?>> byteBuddyInvokerConstructorCache = |
| 230 | + private final Map<InvokerCacheKey, Constructor<?>> byteBuddyInvokerConstructorCache = |
178 | 231 | new ConcurrentHashMap<>(); |
179 | 232 |
|
180 | 233 | private ByteBuddyDoFnInvokerFactory() {} |
@@ -265,11 +318,39 @@ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker( |
265 | 318 | signature.fnClass(), |
266 | 319 | fn.getClass()); |
267 | 320 |
|
| 321 | + // Extract input and output type descriptors to distinguish generic instantiations. |
| 322 | + // Fall back to Object.class if unavailable. When type info is lost, different generic |
| 323 | + // instantiations share an invoker, which is acceptable since the DoFn class in the cache |
| 324 | + // key prevents collisions between different DoFn classes. |
| 325 | + TypeDescriptor<InputT> inputType; |
| 326 | + try { |
| 327 | + inputType = fn.getInputTypeDescriptor(); |
| 328 | + } catch (Exception e) { |
| 329 | + // Some DoFns (like MapElements) throw IllegalStateException if queried after |
| 330 | + // serialization. |
| 331 | + // In this case, we fall back to the raw class behavior (Object). |
| 332 | + inputType = null; |
| 333 | + } |
| 334 | + if (inputType == null) { |
| 335 | + inputType = (TypeDescriptor<InputT>) TypeDescriptor.of(Object.class); |
| 336 | + } |
| 337 | + |
| 338 | + TypeDescriptor<OutputT> outputType; |
| 339 | + try { |
| 340 | + outputType = fn.getOutputTypeDescriptor(); |
| 341 | + } catch (Exception e) { |
| 342 | + // Same as above: fall back to Object if type info is unavailable. |
| 343 | + outputType = null; |
| 344 | + } |
| 345 | + if (outputType == null) { |
| 346 | + outputType = (TypeDescriptor<OutputT>) TypeDescriptor.of(Object.class); |
| 347 | + } |
| 348 | + |
268 | 349 | try { |
269 | 350 | @SuppressWarnings("unchecked") |
270 | 351 | DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>> invoker = |
271 | 352 | (DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>>) |
272 | | - getByteBuddyInvokerConstructor(signature).newInstance(fn); |
| 353 | + getByteBuddyInvokerConstructor(signature, inputType, outputType).newInstance(fn); |
273 | 354 |
|
274 | 355 | if (signature.onTimerMethods() != null) { |
275 | 356 | for (OnTimerMethod onTimerMethod : signature.onTimerMethods().values()) { |
@@ -297,19 +378,24 @@ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker( |
297 | 378 | } |
298 | 379 |
|
299 | 380 | /** |
300 | | - * Returns a generated constructor for a {@link DoFnInvoker} for the given {@link DoFn} class. |
| 381 | + * Returns a generated constructor for a {@link DoFnInvoker} for the given {@link DoFnSignature} |
| 382 | + * and specific generic types. |
301 | 383 | * |
302 | 384 | * <p>These are cached such that at most one {@link DoFnInvoker} class exists for a given {@link |
303 | | - * DoFn} class. |
| 385 | + * DoFn} class with specific generic type parameters. Different generic instantiations of the same |
| 386 | + * DoFn class will have separate cached invoker classes. |
304 | 387 | */ |
305 | | - private Constructor<?> getByteBuddyInvokerConstructor(DoFnSignature signature) { |
| 388 | + private Constructor<?> getByteBuddyInvokerConstructor( |
| 389 | + DoFnSignature signature, TypeDescriptor<?> inputType, TypeDescriptor<?> outputType) { |
306 | 390 | Class<? extends DoFn<?, ?>> fnClass = signature.fnClass(); |
| 391 | + InvokerCacheKey cacheKey = new InvokerCacheKey(fnClass, inputType, outputType); |
307 | 392 | return byteBuddyInvokerConstructorCache.computeIfAbsent( |
308 | | - fnClass, |
309 | | - clazz -> { |
310 | | - Class<? extends DoFnInvoker<?, ?>> invokerClass = generateInvokerClass(signature); |
| 393 | + cacheKey, |
| 394 | + key -> { |
| 395 | + Class<? extends DoFnInvoker<?, ?>> invokerClass = |
| 396 | + generateInvokerClass(signature, inputType, outputType); |
311 | 397 | try { |
312 | | - return invokerClass.getConstructor(clazz); |
| 398 | + return invokerClass.getConstructor(fnClass); |
313 | 399 | } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) { |
314 | 400 | throw new RuntimeException(e); |
315 | 401 | } |
@@ -456,19 +542,42 @@ public static double validateSize(double size) { |
456 | 542 | } |
457 | 543 | } |
458 | 544 |
|
| 545 | + /** |
| 546 | + * Generates a type suffix string for use in invoker class names. |
| 547 | + * |
| 548 | + * <p>This creates a unique suffix based on the input and output type descriptors to avoid class |
| 549 | + * name collisions when the same DoFn class is used with different generic types. |
| 550 | + * |
| 551 | + * <p>The format is: {@code DoFnInvoker$<8-digit hex hash>} |
| 552 | + * |
| 553 | + * @param inputType the input type descriptor |
| 554 | + * @param outputType the output type descriptor |
| 555 | + * @return a string suffix for the invoker class name |
| 556 | + */ |
| 557 | + public static String generateTypeSuffix( |
| 558 | + TypeDescriptor<?> inputType, TypeDescriptor<?> outputType) { |
| 559 | + return String.format( |
| 560 | + "%s$%08x", |
| 561 | + DoFnInvoker.class.getSimpleName(), |
| 562 | + (inputType.toString() + "|" + outputType.toString()).hashCode()); |
| 563 | + } |
| 564 | + |
459 | 565 | /** Generates a {@link DoFnInvoker} class for the given {@link DoFnSignature}. */ |
460 | | - private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass(DoFnSignature signature) { |
| 566 | + private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass( |
| 567 | + DoFnSignature signature, TypeDescriptor<?> inputType, TypeDescriptor<?> outputType) { |
461 | 568 | Class<? extends DoFn<?, ?>> fnClass = signature.fnClass(); |
462 | 569 |
|
| 570 | + // Create a unique suffix based on the type descriptors to avoid class name collisions |
| 571 | + // when the same DoFn class is used with different generic types. |
| 572 | + String typeSuffix = generateTypeSuffix(inputType, outputType); |
| 573 | + |
463 | 574 | final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(fnClass); |
464 | 575 |
|
465 | 576 | DynamicType.Builder<?> builder = |
466 | 577 | new ByteBuddy() |
467 | 578 | // Create subclasses inside the target class, to have access to |
468 | 579 | // private and package-private bits |
469 | | - .with( |
470 | | - StableInvokerNamingStrategy.forDoFnClass(fnClass) |
471 | | - .withSuffix(DoFnInvoker.class.getSimpleName())) |
| 580 | + .with(StableInvokerNamingStrategy.forDoFnClass(fnClass).withSuffix(typeSuffix)) |
472 | 581 |
|
473 | 582 | // class <invoker class> extends DoFnInvokerBase { |
474 | 583 | .subclass(DoFnInvokerBase.class, ConstructorStrategy.Default.NO_CONSTRUCTORS) |
|
0 commit comments