Skip to content

Commit f88891d

Browse files
authored
Merge branch 'master' into error_prone_bulk1
2 parents 2d3aff0 + 914e425 commit f88891d

27 files changed

Lines changed: 493 additions & 150 deletions

File tree

.github/workflows/build_wheels.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,13 +244,13 @@ jobs:
244244
- name: Install Python
245245
uses: actions/setup-python@v5
246246
with:
247-
python-version: '3.10'
247+
python-version: '3.11'
248248
- uses: docker/setup-qemu-action@v3
249249
if: ${{matrix.os_python.arch == 'aarch64'}}
250250
name: Set up QEMU
251251
- name: Install cibuildwheel
252252
# note: sync cibuildwheel version with gradle task sdks:python:bdistPy* steps
253-
run: pip install cibuildwheel==2.23.3 setuptools
253+
run: pip install cibuildwheel==3.4.0 setuptools
254254
- name: Build wheel
255255
# Only build wheel if it is one of the target versions for this platform, otherwise no-op
256256
if: ${{ contains(matrix.os_python.python, matrix.py_version) }}

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1554,14 +1554,13 @@ class BeamModulePlugin implements Plugin<Project> {
15541554
"NonCanonicalType",
15551555
"Slf4jFormatShouldBeConst",
15561556
"Slf4jSignOnlyFormat",
1557-
"StaticAssignmentInConstructor",
1557+
"ThreadPriorityCheck",
15581558
"TimeUnitConversionChecker",
15591559
"UndefinedEquals",
15601560
"UnescapedEntity",
15611561
"UnnecessaryMethodReference",
15621562
"UnnecessaryParentheses",
15631563
"UnrecognisedJavadocTag",
1564-
"UnsafeReflectiveConstructionCast",
15651564
// errorprone 3.2.0+ checks
15661565
"DirectInvocationOnMock",
15671566
"JUnitIncompatibleType",

runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.List;
3737
import java.util.NoSuchElementException;
3838
import java.util.concurrent.CountDownLatch;
39+
import java.util.concurrent.atomic.AtomicBoolean;
3940
import org.apache.beam.runners.direct.BoundedReadEvaluatorFactory.BoundedSourceShard;
4041
import org.apache.beam.sdk.coders.BigEndianLongCoder;
4142
import org.apache.beam.sdk.coders.Coder;
@@ -306,7 +307,7 @@ public void boundedSourceEvaluatorClosesReader() throws Exception {
306307
evaluator.finishBundle();
307308
CommittedBundle<Long> committed = output.commit(Instant.now());
308309
assertThat(committed.getElements(), containsInAnyOrder(gw(2L), gw(3L), gw(1L)));
309-
assertThat(TestSource.readerClosed, is(true));
310+
assertThat(TestSource.readerClosed.get(), is(true));
310311
}
311312

312313
@Test
@@ -326,7 +327,7 @@ public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
326327
evaluator.finishBundle();
327328
CommittedBundle<Long> committed = output.commit(Instant.now());
328329
assertThat(committed.getElements(), emptyIterable());
329-
assertThat(TestSource.readerClosed, is(true));
330+
assertThat(TestSource.readerClosed.get(), is(true));
330331
}
331332

332333
@Test
@@ -336,7 +337,7 @@ public void cleanupShutsDownExecutor() {
336337
}
337338

338339
private static class TestSource<T> extends OffsetBasedSource<T> {
339-
private static boolean readerClosed;
340+
private static final AtomicBoolean readerClosed = new AtomicBoolean(false);
340341
private final Coder<T> coder;
341342
private final T[] elems;
342343
private final int firstSplitIndex;
@@ -352,7 +353,7 @@ public TestSource(Coder<T> coder, int firstSplitIndex, T... elems) {
352353
this.elems = elems;
353354
this.coder = coder;
354355
this.firstSplitIndex = firstSplitIndex;
355-
readerClosed = false;
356+
readerClosed.set(false);
356357

357358
subrangesCompleted = new CountDownLatch(2);
358359
}
@@ -449,7 +450,7 @@ public T getCurrent() throws NoSuchElementException {
449450

450451
@Override
451452
public void close() throws IOException {
452-
TestSource.readerClosed = true;
453+
TestSource.readerClosed.set(true);
453454
}
454455
}
455456

runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -339,8 +339,8 @@ public void evaluatorReusesReaderAndClosesAtTheEnd() throws Exception {
339339
} while (!Iterables.isEmpty(residual.getElements()));
340340

341341
verify(output, times(numElements)).add(any());
342-
assertThat(TestUnboundedSource.readerCreatedCount, equalTo(1));
343-
assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
342+
assertThat(TestUnboundedSource.READER_CREATED_COUNT.get(), equalTo(1));
343+
assertThat(TestUnboundedSource.READER_CLOSED_COUNT.get(), equalTo(1));
344344
}
345345

346346
@Test
@@ -382,7 +382,7 @@ public void evaluatorClosesReaderAndResumesFromCheckpoint() throws Exception {
382382
secondEvaluator.processElement(Iterables.getOnlyElement(residual.getElements()));
383383
secondEvaluator.finishBundle();
384384

385-
assertThat(TestUnboundedSource.readerClosedCount, equalTo(2));
385+
assertThat(TestUnboundedSource.READER_CLOSED_COUNT.get(), equalTo(2));
386386
assertThat(
387387
Iterables.getOnlyElement(residual.getElements()).getValue().getCheckpoint().isFinalized(),
388388
is(true));
@@ -421,12 +421,12 @@ public void evaluatorThrowsInCloseRethrows() throws Exception {
421421

422422
@Test // before this was throwing a NPE
423423
public void emptySource() throws Exception {
424-
TestUnboundedSource.readerClosedCount = 0;
424+
TestUnboundedSource.READER_CLOSED_COUNT.set(0);
425425
final TestUnboundedSource<String> source = new TestUnboundedSource<>(StringUtf8Coder.of());
426426
source.advanceWatermarkToInfinity = true;
427427
processElement(source);
428-
assertEquals(1, TestUnboundedSource.readerClosedCount);
429-
TestUnboundedSource.readerClosedCount = 0; // reset
428+
assertEquals(1, TestUnboundedSource.READER_CLOSED_COUNT.get());
429+
TestUnboundedSource.READER_CLOSED_COUNT.set(0); // reset
430430
}
431431

432432
@Test(expected = IOException.class)
@@ -472,7 +472,7 @@ private void processElement(final TestUnboundedSource<String> source) throws Exc
472472
final WindowedValue<UnboundedSourceShard<String, TestCheckpointMark>> value =
473473
WindowedValues.of(
474474
shard, BoundedWindow.TIMESTAMP_MAX_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
475-
TestUnboundedSource.readerClosedCount = 0;
475+
TestUnboundedSource.READER_CLOSED_COUNT.set(0);
476476
evaluator.processElement(value);
477477
}
478478

@@ -492,11 +492,15 @@ public Instant apply(Long input) {
492492
}
493493

494494
private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> {
495-
private static int getWatermarkCalls = 0;
496-
497-
static int readerCreatedCount;
498-
static int readerClosedCount;
499-
static int readerAdvancedCount;
495+
private static final java.util.concurrent.atomic.AtomicInteger getWatermarkCalls =
496+
new java.util.concurrent.atomic.AtomicInteger(0);
497+
498+
static final java.util.concurrent.atomic.AtomicInteger READER_CREATED_COUNT =
499+
new java.util.concurrent.atomic.AtomicInteger(0);
500+
static final java.util.concurrent.atomic.AtomicInteger READER_CLOSED_COUNT =
501+
new java.util.concurrent.atomic.AtomicInteger(0);
502+
static final java.util.concurrent.atomic.AtomicInteger READER_ADVANCED_COUNT =
503+
new java.util.concurrent.atomic.AtomicInteger(0);
500504
private final Coder<T> coder;
501505
private final List<T> elems;
502506
private boolean dedupes = false;
@@ -508,9 +512,9 @@ public TestUnboundedSource(Coder<T> coder, T... elems) {
508512
}
509513

510514
private TestUnboundedSource(Coder<T> coder, boolean throwOnClose, List<T> elems) {
511-
readerCreatedCount = 0;
512-
readerClosedCount = 0;
513-
readerAdvancedCount = 0;
515+
READER_CREATED_COUNT.set(0);
516+
READER_CLOSED_COUNT.set(0);
517+
READER_ADVANCED_COUNT.set(0);
514518
this.coder = coder;
515519
this.elems = elems;
516520
this.throwOnClose = throwOnClose;
@@ -528,7 +532,7 @@ public UnboundedSource.UnboundedReader<T> createReader(
528532
checkState(
529533
checkpointMark == null || checkpointMark.decoded,
530534
"Cannot resume from a checkpoint that has not been decoded");
531-
readerCreatedCount++;
535+
READER_CREATED_COUNT.incrementAndGet();
532536
return new TestUnboundedReader(elems, checkpointMark == null ? -1 : checkpointMark.index);
533537
}
534538

@@ -568,7 +572,7 @@ public boolean start() throws IOException {
568572

569573
@Override
570574
public boolean advance() throws IOException {
571-
readerAdvancedCount++;
575+
READER_ADVANCED_COUNT.incrementAndGet();
572576
if (index + 1 < elems.size()) {
573577
index++;
574578
return true;
@@ -578,11 +582,11 @@ public boolean advance() throws IOException {
578582

579583
@Override
580584
public Instant getWatermark() {
581-
getWatermarkCalls++;
585+
getWatermarkCalls.incrementAndGet();
582586
if (index + 1 == elems.size() && TestUnboundedSource.this.advanceWatermarkToInfinity) {
583587
return BoundedWindow.TIMESTAMP_MAX_VALUE;
584588
} else {
585-
return new Instant(index + getWatermarkCalls);
589+
return new Instant(index + getWatermarkCalls.get());
586590
}
587591
}
588592

@@ -618,7 +622,7 @@ public byte[] getCurrentRecordId() {
618622
@Override
619623
public void close() throws IOException {
620624
try {
621-
readerClosedCount++;
625+
READER_CLOSED_COUNT.incrementAndGet();
622626
// Enforce the AutoCloseable contract. Close is not idempotent.
623627
assertThat(closed, is(false));
624628
if (throwOnClose) {

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4796,17 +4796,18 @@ public void run() {
47964796

47974797
private static class FakeSlowDoFn extends DoFn<String, String> {
47984798

4799-
private static FakeClock clock; // A static variable keeps this DoFn serializable.
4799+
private static final AtomicReference<FakeClock> clock =
4800+
new AtomicReference<>(); // A static variable keeps this DoFn serializable.
48004801
private final Duration sleep;
48014802

48024803
FakeSlowDoFn(FakeClock clock, Duration sleep) {
4803-
FakeSlowDoFn.clock = clock;
4804+
FakeSlowDoFn.clock.set(clock);
48044805
this.sleep = sleep;
48054806
}
48064807

48074808
@ProcessElement
48084809
public void processElement(ProcessContext c) throws Exception {
4809-
clock.sleep(sleep);
4810+
clock.get().sleep(sleep);
48104811
c.output(c.element());
48114812
}
48124813
}

runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/InMemoryMetrics.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,40 +22,43 @@
2222
import com.codahale.metrics.MetricRegistry;
2323
import java.util.Collection;
2424
import java.util.Properties;
25+
import java.util.concurrent.atomic.AtomicReference;
2526
import org.apache.beam.runners.spark.structuredstreaming.metrics.WithMetricsSupport;
2627
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
2728
import org.apache.spark.metrics.sink.Sink;
2829

2930
/** An in-memory {@link Sink} implementation for tests. */
3031
public class InMemoryMetrics implements Sink {
3132

32-
private static WithMetricsSupport extendedMetricsRegistry;
33-
private static MetricRegistry internalMetricRegistry;
33+
private static final AtomicReference<WithMetricsSupport> extendedMetricsRegistry =
34+
new AtomicReference<>();
35+
private static final AtomicReference<MetricRegistry> internalMetricRegistry =
36+
new AtomicReference<>();
3437

3538
// Constructor for Spark 3.1
3639
@SuppressWarnings("UnusedParameters")
3740
public InMemoryMetrics(
3841
final Properties properties,
3942
final MetricRegistry metricRegistry,
4043
final org.apache.spark.SecurityManager securityMgr) {
41-
extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
42-
internalMetricRegistry = metricRegistry;
44+
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
45+
internalMetricRegistry.set(metricRegistry);
4346
}
4447

4548
// Constructor for Spark >= 3.2
4649
@SuppressWarnings("UnusedParameters")
4750
public InMemoryMetrics(final Properties properties, final MetricRegistry metricRegistry) {
48-
extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
49-
internalMetricRegistry = metricRegistry;
51+
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
52+
internalMetricRegistry.set(metricRegistry);
5053
}
5154

5255
@SuppressWarnings({"TypeParameterUnusedInFormals", "rawtypes"}) // because of getGauges
5356
public static <T> T valueOf(final String name) {
5457
// this might fail in case we have multiple aggregators with the same suffix after
5558
// the last dot, but it should be good enough for tests.
56-
if (extendedMetricsRegistry != null) {
57-
Collection<Gauge> matches =
58-
extendedMetricsRegistry.getGauges((n, m) -> n.endsWith(name)).values();
59+
WithMetricsSupport extended = extendedMetricsRegistry.get();
60+
if (extended != null) {
61+
Collection<Gauge> matches = extended.getGauges((n, m) -> n.endsWith(name)).values();
5962
return matches.isEmpty() ? null : (T) Iterables.getOnlyElement(matches).getValue();
6063
} else {
6164
return null;
@@ -64,8 +67,9 @@ public static <T> T valueOf(final String name) {
6467

6568
@SuppressWarnings("WeakerAccess")
6669
public static void clearAll() {
67-
if (internalMetricRegistry != null) {
68-
internalMetricRegistry.removeMatching(MetricFilter.ALL);
70+
MetricRegistry internal = internalMetricRegistry.get();
71+
if (internal != null) {
72+
internal.removeMatching(MetricFilter.ALL);
6973
}
7074
}
7175

runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,40 +22,43 @@
2222
import com.codahale.metrics.MetricRegistry;
2323
import java.util.Collection;
2424
import java.util.Properties;
25+
import java.util.concurrent.atomic.AtomicReference;
2526
import org.apache.beam.runners.spark.metrics.WithMetricsSupport;
2627
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
2728
import org.apache.spark.metrics.sink.Sink;
2829

2930
/** An in-memory {@link Sink} implementation for tests. */
3031
public class InMemoryMetrics implements Sink {
3132

32-
private static WithMetricsSupport extendedMetricsRegistry;
33-
private static MetricRegistry internalMetricRegistry;
33+
private static final AtomicReference<WithMetricsSupport> extendedMetricsRegistry =
34+
new AtomicReference<>();
35+
private static final AtomicReference<MetricRegistry> internalMetricRegistry =
36+
new AtomicReference<>();
3437

3538
// Constructor for Spark 3.1
3639
@SuppressWarnings("UnusedParameters")
3740
public InMemoryMetrics(
3841
final Properties properties,
3942
final MetricRegistry metricRegistry,
4043
final org.apache.spark.SecurityManager securityMgr) {
41-
extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
42-
internalMetricRegistry = metricRegistry;
44+
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
45+
internalMetricRegistry.set(metricRegistry);
4346
}
4447

4548
// Constructor for Spark >= 3.2
4649
@SuppressWarnings("UnusedParameters")
4750
public InMemoryMetrics(final Properties properties, final MetricRegistry metricRegistry) {
48-
extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
49-
internalMetricRegistry = metricRegistry;
51+
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
52+
internalMetricRegistry.set(metricRegistry);
5053
}
5154

5255
@SuppressWarnings({"TypeParameterUnusedInFormals", "rawtypes"})
5356
public static <T> T valueOf(final String name) {
5457
// this might fail in case we have multiple aggregators with the same suffix after
5558
// the last dot, but it should be good enough for tests.
56-
if (extendedMetricsRegistry != null) {
57-
Collection<Gauge> matches =
58-
extendedMetricsRegistry.getGauges((n, m) -> n.endsWith(name)).values();
59+
WithMetricsSupport extended = extendedMetricsRegistry.get();
60+
if (extended != null) {
61+
Collection<Gauge> matches = extended.getGauges((n, m) -> n.endsWith(name)).values();
5962
return matches.isEmpty() ? null : (T) Iterables.getOnlyElement(matches).getValue();
6063
} else {
6164
return null;
@@ -64,8 +67,9 @@ public static <T> T valueOf(final String name) {
6467

6568
@SuppressWarnings("WeakerAccess")
6669
public static void clearAll() {
67-
if (internalMetricRegistry != null) {
68-
internalMetricRegistry.removeMatching(MetricFilter.ALL);
70+
MetricRegistry internal = internalMetricRegistry.get();
71+
if (internal != null) {
72+
internal.removeMatching(MetricFilter.ALL);
6973
}
7074
}
7175

runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
* tests requiring a different context have to be forked using separate test classes.
3939
*/
4040
@SuppressWarnings({
41-
"rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
41+
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
42+
"StaticAssignmentInConstructor" // used for testing purposes
4243
})
4344
@RunWith(Enclosed.class)
4445
public class SparkRunnerKryoRegistratorTest {

sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,16 @@ private String windowToString(BoundedWindow window) {
353353
}
354354
if (window instanceof IntervalWindow) {
355355
IntervalWindow iw = (IntervalWindow) window;
356-
return String.format("%s-%s", iw.start().toString(), iw.end().toString());
356+
// Use ISO-8601 format but replace colons with underscores for Windows compatibility
357+
// since colons are illegal characters in Windows file paths.
358+
String startStr = iw.start().toString();
359+
String endStr = iw.end().toString();
360+
String osName = System.getProperty("os.name");
361+
if (osName != null && osName.startsWith("Windows")) {
362+
startStr = startStr.replace(':', '_');
363+
endStr = endStr.replace(':', '_');
364+
}
365+
return String.format("%s-%s", startStr, endStr);
357366
}
358367
return window.toString();
359368
}

0 commit comments

Comments
 (0)