|
27 | 27 | import java.util.ArrayList; |
28 | 28 | import java.util.List; |
29 | 29 | import java.util.Map; |
| 30 | +import java.util.Objects; |
30 | 31 | import java.util.concurrent.ConcurrentHashMap; |
31 | 32 | import net.bytebuddy.ByteBuddy; |
32 | 33 | import net.bytebuddy.description.field.FieldDescription; |
|
106 | 107 | import org.apache.beam.sdk.util.UserCodeException; |
107 | 108 | import org.apache.beam.sdk.values.TypeDescriptor; |
108 | 109 | import org.apache.beam.sdk.values.TypeDescriptors; |
| 110 | +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; |
109 | 111 | import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; |
110 | 112 | import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Primitives; |
111 | 113 | import org.checkerframework.checker.nullness.qual.MonotonicNonNull; |
@@ -192,24 +194,23 @@ public boolean equals(@Nullable Object o) { |
192 | 194 | return false; |
193 | 195 | } |
194 | 196 | InvokerCacheKey that = (InvokerCacheKey) o; |
195 | | - return fnClass.equals(that.fnClass) |
196 | | - && inputType.equals(that.inputType) |
197 | | - && outputType.equals(that.outputType); |
| 197 | + return Objects.equals(fnClass, that.fnClass) |
| 198 | + && Objects.equals(inputType, that.inputType) |
| 199 | + && Objects.equals(outputType, that.outputType); |
198 | 200 | } |
199 | 201 |
|
200 | 202 | @Override |
201 | 203 | public int hashCode() { |
202 | | - int result = fnClass.hashCode(); |
203 | | - result = 31 * result + inputType.hashCode(); |
204 | | - result = 31 * result + outputType.hashCode(); |
205 | | - return result; |
| 204 | + return Objects.hash(fnClass, inputType, outputType); |
206 | 205 | } |
207 | 206 |
|
208 | 207 | @Override |
209 | 208 | public String toString() { |
210 | | - return String.format( |
211 | | - "InvokerCacheKey{fnClass=%s, inputType=%s, outputType=%s}", |
212 | | - fnClass.getName(), inputType, outputType); |
| 209 | + return MoreObjects.toStringHelper(this) |
| 210 | + .add("fnClass", fnClass.getName()) |
| 211 | + .add("inputType", inputType) |
| 212 | + .add("outputType", outputType) |
| 213 | + .toString(); |
213 | 214 | } |
214 | 215 | } |
215 | 216 |
|
@@ -317,9 +318,10 @@ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker( |
317 | 318 | signature.fnClass(), |
318 | 319 | fn.getClass()); |
319 | 320 |
|
320 | | - // Extract input and output type descriptors from the DoFn instance |
321 | | - // Fall back to Object.class if the type descriptors are null or unavailable (e.g., MapElements |
322 | | - // after serialization) |
| 321 | + // Extract input and output type descriptors to distinguish generic instantiations. |
| 322 | + // Fall back to Object.class if unavailable. When type info is lost, different generic |
| 323 | + // instantiations share an invoker, which is acceptable since the DoFn class in the cache |
| 324 | + // key prevents collisions between different DoFn classes. |
323 | 325 | TypeDescriptor<InputT> inputType; |
324 | 326 | try { |
325 | 327 | inputType = fn.getInputTypeDescriptor(); |
@@ -540,18 +542,34 @@ public static double validateSize(double size) { |
540 | 542 | } |
541 | 543 | } |
542 | 544 |
|
| 545 | + /** |
| 546 | + * Generates a type suffix string for use in invoker class names. |
| 547 | + * |
| 548 | + * <p>This creates a unique suffix based on the input and output type descriptors to avoid class |
| 549 | + * name collisions when the same DoFn class is used with different generic types. |
| 550 | + * |
| 551 | + * <p>The format is: {@code DoFnInvoker$<8-digit hex hash>} |
| 552 | + * |
| 553 | + * @param inputType the input type descriptor |
| 554 | + * @param outputType the output type descriptor |
| 555 | + * @return a string suffix for the invoker class name |
| 556 | + */ |
| 557 | + public static String generateTypeSuffix( |
| 558 | + TypeDescriptor<?> inputType, TypeDescriptor<?> outputType) { |
| 559 | + return String.format( |
| 560 | + "%s$%08x", |
| 561 | + DoFnInvoker.class.getSimpleName(), |
| 562 | + (inputType.toString() + "|" + outputType.toString()).hashCode()); |
| 563 | + } |
| 564 | + |
543 | 565 | /** Generates a {@link DoFnInvoker} class for the given {@link DoFnSignature}. */ |
544 | 566 | private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass( |
545 | 567 | DoFnSignature signature, TypeDescriptor<?> inputType, TypeDescriptor<?> outputType) { |
546 | 568 | Class<? extends DoFn<?, ?>> fnClass = signature.fnClass(); |
547 | 569 |
|
548 | 570 | // Create a unique suffix based on the type descriptors to avoid class name collisions |
549 | 571 | // when the same DoFn class is used with different generic types. |
550 | | - String typeSuffix = |
551 | | - String.format( |
552 | | - "%s$%08x", |
553 | | - DoFnInvoker.class.getSimpleName(), |
554 | | - (inputType.toString() + "|" + outputType.toString()).hashCode()); |
| 572 | + String typeSuffix = generateTypeSuffix(inputType, outputType); |
555 | 573 |
|
556 | 574 | final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(fnClass); |
557 | 575 |
|
|
0 commit comments