6565import org .apache .beam .sdk .transforms .WithKeys ;
6666import org .apache .beam .sdk .transforms .windowing .BoundedWindow ;
6767import org .apache .beam .sdk .transforms .windowing .PaneInfo ;
68- import org .apache .beam .sdk .util .ShardedKey ;
6968import org .apache .beam .sdk .values .KV ;
7069import org .apache .beam .sdk .values .PCollection ;
7170import org .apache .beam .sdk .values .PCollectionRowTuple ;
@@ -226,8 +225,8 @@ public PCollectionRowTuple expand(PCollection<String> input) {
226225 batchManifestFiles .withMaxBufferingDuration (checkStateNotNull (intervalTrigger ));
227226 }
228227
229- PCollection <KV <ShardedKey < Integer > , Iterable <SerializableDataFile >>> groupedFiles =
230- keyedFiles .apply ("GroupDataFilesIntoBatches" , batchDataFiles . withShardedKey () );
228+ PCollection <KV <Integer , Iterable <SerializableDataFile >>> groupedFiles =
229+ keyedFiles .apply ("GroupDataFilesIntoBatches" , batchDataFiles );
231230
232231 PCollection <KV <String , byte []>> manifests =
233232 groupedFiles .apply (
@@ -660,7 +659,7 @@ static String getPartitionFromMetrics(Metrics metrics, InputFile inputFile, Tabl
660659 * downstream {@link CommitManifestFilesDoFn}.
661660 */
662661 static class CreateManifests
663- extends DoFn <KV <ShardedKey < Integer > , Iterable <SerializableDataFile >>, KV <String , byte []>> {
662+ extends DoFn <KV <Integer , Iterable <SerializableDataFile >>, KV <String , byte []>> {
664663 private final IcebergCatalogConfig catalogConfig ;
665664 private final String identifier ;
666665 private transient @ MonotonicNonNull Table table ;
@@ -672,7 +671,7 @@ public CreateManifests(IcebergCatalogConfig catalogConfig, String identifier) {
672671
673672 @ ProcessElement
674673 public void process (
675- @ Element KV <ShardedKey < Integer > , Iterable <SerializableDataFile >> batch ,
674+ @ Element KV <Integer , Iterable <SerializableDataFile >> batch ,
676675 OutputReceiver <KV <String , byte []>> output )
677676 throws IOException {
678677 if (!batch .getValue ().iterator ().hasNext ()) {
@@ -682,7 +681,7 @@ public void process(
682681 table = catalogConfig .catalog ().loadTable (TableIdentifier .parse (identifier ));
683682 }
684683
685- PartitionSpec spec = checkStateNotNull (table .specs ().get (batch .getKey (). getKey () ));
684+ PartitionSpec spec = checkStateNotNull (table .specs ().get (batch .getKey ()));
686685
687686 String manifestPath =
688687 String .format (
0 commit comments