Skip to content

Commit 74677b6

Browse files
committed
feat: Add Log Cache log streaming support
1 parent b236ad3 commit 74677b6

File tree

7 files changed

+234
-0
lines changed

7 files changed

+234
-0
lines changed

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/logcache/v1/ReactorLogCacheEndpoints.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,24 @@
1616

1717
package org.cloudfoundry.reactor.logcache.v1;
1818

19+
import java.util.ArrayList;
20+
import java.util.Collections;
21+
import java.util.List;
1922
import java.util.Map;
23+
import java.util.concurrent.atomic.AtomicLong;
24+
import org.cloudfoundry.logcache.v1.Envelope;
25+
import org.cloudfoundry.logcache.v1.EnvelopeBatch;
26+
import org.cloudfoundry.logcache.v1.EnvelopeType;
2027
import org.cloudfoundry.logcache.v1.InfoRequest;
2128
import org.cloudfoundry.logcache.v1.InfoResponse;
2229
import org.cloudfoundry.logcache.v1.MetaRequest;
2330
import org.cloudfoundry.logcache.v1.MetaResponse;
2431
import org.cloudfoundry.logcache.v1.ReadRequest;
2532
import org.cloudfoundry.logcache.v1.ReadResponse;
33+
import org.cloudfoundry.logcache.v1.TailLogsRequest;
2634
import org.cloudfoundry.reactor.ConnectionContext;
2735
import org.cloudfoundry.reactor.TokenProvider;
36+
import reactor.core.publisher.Flux;
2837
import reactor.core.publisher.Mono;
2938

3039
final class ReactorLogCacheEndpoints extends AbstractLogCacheOperations {
@@ -52,4 +61,87 @@ Mono<ReadResponse> read(ReadRequest request) {
5261
Mono<ReadResponse> recentLogs(ReadRequest request) {
5362
return read(request);
5463
}
64+
65+
/**
66+
* Continuously polls Log Cache and emits new {@link Envelope}s as they appear.
67+
*
68+
* <p>Algorithm (mirrors the Go {@code logcache.Walk()} implementation):
69+
* <ol>
70+
* <li>Start from {@code startTime} (defaults to now – 5 s in nanoseconds).</li>
71+
* <li>Issue a {@code /api/v1/read} with {@code start_time = cursor}.</li>
72+
* <li>Emit every returned envelope in ascending timestamp order and advance cursor
73+
* to {@code lastTimestamp + 1}.</li>
74+
* <li>If the response was empty, wait {@code pollInterval} before next poll.</li>
75+
* <li>Repeat indefinitely until cancelled.</li>
76+
* </ol>
77+
*/
78+
Flux<Envelope> logsTail(TailLogsRequest request) {
79+
long defaultStartNanos = (System.currentTimeMillis() - 5_000L) * 1_000_000L;
80+
AtomicLong cursor =
81+
new AtomicLong(
82+
request.getStartTime() != null
83+
? request.getStartTime()
84+
: defaultStartNanos);
85+
86+
List<EnvelopeType> envelopeTypes =
87+
request.getEnvelopeTypes() != null
88+
? request.getEnvelopeTypes()
89+
: Collections.emptyList();
90+
String nameFilter = request.getNameFilter();
91+
92+
// One poll: read from cursor, sort, emit, advance cursor.
93+
// Returns empty Mono when batch is empty (triggers delay+repeat).
94+
Flux<Envelope> poll =
95+
Mono.defer(
96+
() -> {
97+
ReadRequest.Builder builder =
98+
ReadRequest.builder()
99+
.sourceId(request.getSourceId())
100+
.startTime(cursor.get());
101+
if (!envelopeTypes.isEmpty()) {
102+
builder.envelopeTypes(envelopeTypes);
103+
}
104+
if (nameFilter != null && !nameFilter.isEmpty()) {
105+
builder.nameFilter(nameFilter);
106+
}
107+
return read(builder.build())
108+
.map(ReadResponse::getEnvelopes)
109+
.map(EnvelopeBatch::getBatch)
110+
.onErrorReturn(Collections.emptyList());
111+
})
112+
.flatMapMany(
113+
batch -> {
114+
if (batch.isEmpty()) {
115+
return Flux.empty();
116+
}
117+
List<Envelope> sorted = new ArrayList<>(batch);
118+
sorted.sort(
119+
(a, b) -> {
120+
long ta =
121+
a.getTimestamp() != null
122+
? a.getTimestamp()
123+
: 0L;
124+
long tb =
125+
b.getTimestamp() != null
126+
? b.getTimestamp()
127+
: 0L;
128+
return Long.compare(ta, tb);
129+
});
130+
long last =
131+
sorted.get(sorted.size() - 1).getTimestamp() != null
132+
? sorted.get(sorted.size() - 1).getTimestamp()
133+
: cursor.get();
134+
cursor.set(last + 1);
135+
return Flux.fromIterable(sorted);
136+
});
137+
138+
// Repeat poll indefinitely; when empty (no new data) delay before next attempt.
139+
return poll.repeatWhen(
140+
flux ->
141+
flux.flatMap(
142+
count ->
143+
count == 0
144+
? Mono.delay(request.getPollInterval())
145+
: Mono.just(count)));
146+
}
55147
}

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/logcache/v1/_ReactorLogCacheClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,19 @@
1616

1717
package org.cloudfoundry.reactor.logcache.v1;
1818

19+
import org.cloudfoundry.logcache.v1.Envelope;
1920
import org.cloudfoundry.logcache.v1.InfoRequest;
2021
import org.cloudfoundry.logcache.v1.InfoResponse;
2122
import org.cloudfoundry.logcache.v1.LogCacheClient;
2223
import org.cloudfoundry.logcache.v1.MetaRequest;
2324
import org.cloudfoundry.logcache.v1.MetaResponse;
2425
import org.cloudfoundry.logcache.v1.ReadRequest;
2526
import org.cloudfoundry.logcache.v1.ReadResponse;
27+
import org.cloudfoundry.logcache.v1.TailLogsRequest;
2628
import org.cloudfoundry.reactor.ConnectionContext;
2729
import org.cloudfoundry.reactor.TokenProvider;
2830
import org.immutables.value.Value;
31+
import reactor.core.publisher.Flux;
2932
import reactor.core.publisher.Mono;
3033

3134
import java.net.URI;
@@ -58,6 +61,11 @@ public Mono<ReadResponse> recentLogs(ReadRequest request) {
5861
return getReactorLogCacheEndpoints().recentLogs(request);
5962
}
6063

64+
@Override
65+
public Flux<Envelope> logsTail(TailLogsRequest request) {
66+
return getReactorLogCacheEndpoints().logsTail(request);
67+
}
68+
6169
/**
6270
* The connection context
6371
*/

cloudfoundry-client/src/main/java/org/cloudfoundry/logcache/v1/LogCacheClient.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.cloudfoundry.logcache.v1;
1818

19+
import reactor.core.publisher.Flux;
1920
import reactor.core.publisher.Mono;
2021

2122
/**
@@ -54,4 +55,17 @@ public interface LogCacheClient {
5455
* @return the events from the recent logs
5556
*/
5657
Mono<ReadResponse> recentLogs(ReadRequest request);
58+
59+
/**
60+
* Continuously polls the Log Cache /api/v1/read endpoint and streams new {@link Envelope}s
61+
* as they appear. This is the Java equivalent of the Go {@code logcache.Walk()} API and
62+
* {@code cf tail --follow}.
63+
* <p>
64+
* The returned {@link Flux} will never complete on its own – unsubscribe (or cancel) it to
65+
* stop streaming.
66+
*
67+
* @param request the tail request (source id, optional filters, poll interval)
68+
* @return an infinite stream of envelopes
69+
*/
70+
Flux<Envelope> logsTail(TailLogsRequest request);
5771
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2013-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.cloudfoundry.logcache.v1;
18+
19+
import org.cloudfoundry.Nullable;
20+
import org.immutables.value.Value;
21+
22+
import java.time.Duration;
23+
import java.util.List;
24+
25+
/**
26+
* The request options for the Log Cache tail (streaming follow) operation.
27+
* This continuously polls the Log Cache /api/v1/read endpoint, emitting new envelopes
28+
* as they appear – equivalent to {@code cf tail --follow} or the Go {@code logcache.Walk()} API.
29+
*/
30+
@Value.Immutable
31+
abstract class _TailLogsRequest {
32+
33+
/**
34+
* The source id (application guid or service guid) to stream logs for.
35+
*/
36+
abstract String getSourceId();
37+
38+
/**
39+
* Optional start time (UNIX nanoseconds). Defaults to "now – 5 seconds" when not set.
40+
*/
41+
@Nullable
42+
abstract Long getStartTime();
43+
44+
/**
45+
* Optional envelope type filter.
46+
*/
47+
@Nullable
48+
abstract List<EnvelopeType> getEnvelopeTypes();
49+
50+
/**
51+
* Optional regex name filter (requires Log Cache ≥ 2.1.0).
52+
*/
53+
@Nullable
54+
abstract String getNameFilter();
55+
56+
/**
57+
* How long to wait between successive polls when no new envelopes are available.
58+
* Defaults to 250 ms (matching the Go client's {@code AlwaysRetryBackoff}).
59+
*/
60+
@Value.Default
61+
Duration getPollInterval() {
62+
return Duration.ofMillis(250);
63+
}
64+
}
65+

cloudfoundry-operations/src/main/java/org/cloudfoundry/operations/applications/Applications.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
package org.cloudfoundry.operations.applications;
1818

1919
import org.cloudfoundry.doppler.LogMessage;
20+
import org.cloudfoundry.logcache.v1.Envelope;
2021
import org.cloudfoundry.logcache.v1.Log;
2122
import org.cloudfoundry.logcache.v1.ReadRequest;
23+
import org.cloudfoundry.logcache.v1.TailLogsRequest;
2224
import reactor.core.publisher.Flux;
2325
import reactor.core.publisher.Mono;
2426

@@ -137,6 +139,16 @@ public interface Applications {
137139
*/
138140
Flux<Log> logsRecent(ReadRequest request);
139141

142+
/**
143+
* Continuously streams application log envelopes from Log Cache by repeatedly polling
144+
* the {@code /api/v1/read} endpoint. The returned {@link Flux} is infinite – cancel it
145+
* to stop streaming. This is the Java equivalent of {@code cf tail --follow}.
146+
*
147+
* @param request the tail request (source id, optional filters, poll interval)
148+
* @return an infinite stream of envelopes
149+
*/
150+
Flux<Envelope> logsTail(TailLogsRequest request);
151+
140152
/**
141153
* List the applications logs.
142154
* Only works with {@code Loggregator < 107.0}, shipped in {@code CFD < 24.3}

cloudfoundry-operations/src/main/java/org/cloudfoundry/operations/applications/DefaultApplications.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@
158158
import org.cloudfoundry.logcache.v1.Log;
159159
import org.cloudfoundry.logcache.v1.LogCacheClient;
160160
import org.cloudfoundry.logcache.v1.ReadRequest;
161+
import org.cloudfoundry.logcache.v1.TailLogsRequest;
161162
import org.cloudfoundry.operations.util.OperationsLogging;
162163
import org.cloudfoundry.util.DateUtils;
163164
import org.cloudfoundry.util.DelayTimeoutException;
@@ -565,6 +566,14 @@ public Flux<Log> logsRecent(ReadRequest request) {
565566
.checkpoint();
566567
}
567568

569+
@Override
570+
public Flux<org.cloudfoundry.logcache.v1.Envelope> logsTail(TailLogsRequest request) {
571+
return this.logCacheClient
572+
.flatMapMany(client -> client.logsTail(request))
573+
.transform(OperationsLogging.log("Tail Application Logs"))
574+
.checkpoint();
575+
}
576+
568577
@Override
569578
public Flux<ApplicationLog> logs(ApplicationLogsRequest request) {
570579
return logs(LogsRequest.builder()

cloudfoundry-operations/src/test/java/org/cloudfoundry/operations/applications/DefaultApplicationsTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@
150150
import org.cloudfoundry.logcache.v1.LogType;
151151
import org.cloudfoundry.logcache.v1.ReadRequest;
152152
import org.cloudfoundry.logcache.v1.ReadResponse;
153+
import org.cloudfoundry.logcache.v1.TailLogsRequest;
153154
import org.cloudfoundry.operations.AbstractOperationsTest;
154155
import org.cloudfoundry.util.DateUtils;
155156
import org.cloudfoundry.util.FluentMap;
@@ -1383,6 +1384,23 @@ void logsRecentLogCache() {
13831384
.verify(Duration.ofSeconds(5));
13841385
}
13851386

1387+
@Test
1388+
void logsTailLogCache() {
1389+
TailLogsRequest tailRequest = TailLogsRequest.builder().sourceId("test-source-id").build();
1390+
requestLogsTailLogCache(this.logCacheClient, tailRequest, "test-tail-payload");
1391+
1392+
this.applications
1393+
.logsTail(tailRequest)
1394+
.take(1)
1395+
.as(StepVerifier::create)
1396+
.expectNextMatches(
1397+
envelope ->
1398+
envelope.getLog() != null
1399+
&& LogType.OUT.equals(envelope.getLog().getType()))
1400+
.expectComplete()
1401+
.verify(Duration.ofSeconds(5));
1402+
}
1403+
13861404
@SuppressWarnings("deprecation")
13871405
@Test
13881406
void logsRecentNotSetDoppler() {
@@ -5384,6 +5402,22 @@ private static void requestLogsRecentLogCache(
53845402
.build()));
53855403
}
53865404

5405+
private static void requestLogsTailLogCache(
5406+
LogCacheClient logCacheClient, TailLogsRequest tailRequest, String payload) {
5407+
when(logCacheClient.logsTail(tailRequest))
5408+
.thenReturn(
5409+
Flux.just(
5410+
Envelope.builder()
5411+
.sourceId(tailRequest.getSourceId())
5412+
.timestamp(System.nanoTime())
5413+
.log(
5414+
Log.builder()
5415+
.payload(payload)
5416+
.type(LogType.OUT)
5417+
.build())
5418+
.build()));
5419+
}
5420+
53875421
private static void requestLogsStream(DopplerClient dopplerClient, String applicationId) {
53885422
when(dopplerClient.stream(StreamRequest.builder().applicationId(applicationId).build()))
53895423
.thenReturn(

0 commit comments

Comments
 (0)