Skip to content

Commit 08fb796

Browse files
committed
feat: Add Log Cache log streaming support
1 parent b236ad3 commit 08fb796

File tree

7 files changed

+369
-0
lines changed

7 files changed

+369
-0
lines changed

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/logcache/v1/ReactorLogCacheEndpoints.java

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,23 @@
1616

1717
package org.cloudfoundry.reactor.logcache.v1;
1818

19+
import java.util.ArrayList;
20+
import java.util.Collections;
21+
import java.util.List;
1922
import java.util.Map;
23+
import java.util.concurrent.atomic.AtomicLong;
24+
import org.cloudfoundry.logcache.v1.Envelope;
25+
import org.cloudfoundry.logcache.v1.EnvelopeType;
2026
import org.cloudfoundry.logcache.v1.InfoRequest;
2127
import org.cloudfoundry.logcache.v1.InfoResponse;
2228
import org.cloudfoundry.logcache.v1.MetaRequest;
2329
import org.cloudfoundry.logcache.v1.MetaResponse;
2430
import org.cloudfoundry.logcache.v1.ReadRequest;
2531
import org.cloudfoundry.logcache.v1.ReadResponse;
32+
import org.cloudfoundry.logcache.v1.TailLogsRequest;
2633
import org.cloudfoundry.reactor.ConnectionContext;
2734
import org.cloudfoundry.reactor.TokenProvider;
35+
import reactor.core.publisher.Flux;
2836
import reactor.core.publisher.Mono;
2937

3038
final class ReactorLogCacheEndpoints extends AbstractLogCacheOperations {
@@ -52,4 +60,107 @@ Mono<ReadResponse> read(ReadRequest request) {
5260
Mono<ReadResponse> recentLogs(ReadRequest request) {
5361
return read(request);
5462
}
63+
64+
/**
65+
* Continuously polls Log Cache and emits new {@link Envelope}s as they arrive.
66+
*
67+
* <p>Mirrors the Go {@code logcache.Walk()} / {@code cf tail --follow} semantics:
68+
* <ol>
69+
* <li>Start the cursor at {@code startTime} (defaults to now&nbsp;&minus;&nbsp;5&nbsp;s in
70+
* nanoseconds).</li>
71+
* <li>Issue {@code GET /api/v1/read/{sourceId}?start_time=cursor}.</li>
72+
* <li>Emit every returned envelope in ascending timestamp order and advance
73+
* the cursor to {@code lastTimestamp + 1}.</li>
74+
* <li>When the batch is empty, wait {@code pollInterval} before the next poll.</li>
75+
* <li>Repeat forever – the caller cancels the subscription to stop.</li>
76+
* </ol>
77+
* Fully non-blocking: no {@code Thread.sleep}.
78+
*/
79+
Flux<Envelope> logsTail(TailLogsRequest request) {
80+
long defaultStartNanos = (System.currentTimeMillis() - 5_000L) * 1_000_000L;
81+
AtomicLong cursor =
82+
new AtomicLong(
83+
request.getStartTime() != null
84+
? request.getStartTime()
85+
: defaultStartNanos);
86+
87+
List<EnvelopeType> envelopeTypes =
88+
request.getEnvelopeTypes() != null
89+
? request.getEnvelopeTypes()
90+
: Collections.emptyList();
91+
String nameFilter = request.getNameFilter();
92+
93+
/*
94+
* Strategy (mirrors Go's logcache.Walk):
95+
* – Mono.defer builds a fresh ReadRequest from the mutable cursor on every repetition.
96+
* – The Mono returns either the sorted batch (non-empty) or an empty list.
97+
* – flatMapMany turns each batch into a stream of individual Envelope items.
98+
* – repeat() subscribes again after each completion.
99+
* – When the batch was empty we insert a delay via Mono.delay before the next
100+
* repetition so we do not hammer the server. We signal "empty" by returning
101+
* a sentinel Mono<Boolean> (false = was empty, true = had data) and use
102+
* repeatWhen to conditionally delay.
103+
*/
104+
return Flux.defer(
105+
() -> {
106+
// Build the read request from the current cursor position.
107+
ReadRequest.Builder builder =
108+
ReadRequest.builder()
109+
.sourceId(request.getSourceId())
110+
.startTime(cursor.get());
111+
if (!envelopeTypes.isEmpty()) {
112+
builder.envelopeTypes(envelopeTypes);
113+
}
114+
if (nameFilter != null && !nameFilter.isEmpty()) {
115+
builder.nameFilter(nameFilter);
116+
}
117+
118+
return read(builder.build())
119+
.onErrorReturn(ReadResponse.builder().build())
120+
.flatMapMany(
121+
resp -> {
122+
List<Envelope> raw =
123+
resp.getEnvelopes() != null
124+
? resp.getEnvelopes().getBatch()
125+
: Collections.emptyList();
126+
127+
if (raw.isEmpty()) {
128+
// Signal "no data" so repeatWhen can insert the
129+
// back-off delay.
130+
return Flux.empty();
131+
}
132+
133+
// Sort ascending by timestamp and advance the
134+
// cursor.
135+
List<Envelope> sorted = new ArrayList<>(raw);
136+
sorted.sort(
137+
(a, b) ->
138+
Long.compare(
139+
a.getTimestamp() != null
140+
? a.getTimestamp()
141+
: 0L,
142+
b.getTimestamp() != null
143+
? b.getTimestamp()
144+
: 0L));
145+
146+
Envelope last = sorted.get(sorted.size() - 1);
147+
cursor.set(
148+
(last.getTimestamp() != null
149+
? last.getTimestamp()
150+
: cursor.get())
151+
+ 1);
152+
153+
return Flux.fromIterable(sorted);
154+
});
155+
})
156+
// repeatWhen receives a Flux<Long> where each element is the count of items
157+
// emitted in the previous cycle (0 = empty batch → insert delay).
158+
.repeatWhen(
159+
companion ->
160+
companion.flatMap(
161+
count ->
162+
count == 0
163+
? Mono.delay(request.getPollInterval())
164+
: Mono.just(count)));
165+
}
55166
}

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/logcache/v1/_ReactorLogCacheClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,19 @@
1616

1717
package org.cloudfoundry.reactor.logcache.v1;
1818

19+
import org.cloudfoundry.logcache.v1.Envelope;
1920
import org.cloudfoundry.logcache.v1.InfoRequest;
2021
import org.cloudfoundry.logcache.v1.InfoResponse;
2122
import org.cloudfoundry.logcache.v1.LogCacheClient;
2223
import org.cloudfoundry.logcache.v1.MetaRequest;
2324
import org.cloudfoundry.logcache.v1.MetaResponse;
2425
import org.cloudfoundry.logcache.v1.ReadRequest;
2526
import org.cloudfoundry.logcache.v1.ReadResponse;
27+
import org.cloudfoundry.logcache.v1.TailLogsRequest;
2628
import org.cloudfoundry.reactor.ConnectionContext;
2729
import org.cloudfoundry.reactor.TokenProvider;
2830
import org.immutables.value.Value;
31+
import reactor.core.publisher.Flux;
2932
import reactor.core.publisher.Mono;
3033

3134
import java.net.URI;
@@ -58,6 +61,11 @@ public Mono<ReadResponse> recentLogs(ReadRequest request) {
5861
return getReactorLogCacheEndpoints().recentLogs(request);
5962
}
6063

64+
@Override
65+
public Flux<Envelope> logsTail(TailLogsRequest request) {
66+
return getReactorLogCacheEndpoints().logsTail(request);
67+
}
68+
6169
/**
6270
* The connection context
6371
*/

cloudfoundry-client/src/main/java/org/cloudfoundry/logcache/v1/LogCacheClient.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.cloudfoundry.logcache.v1;
1818

19+
import reactor.core.publisher.Flux;
1920
import reactor.core.publisher.Mono;
2021

2122
/**
@@ -54,4 +55,17 @@ public interface LogCacheClient {
5455
* @return the events from the recent logs
5556
*/
5657
Mono<ReadResponse> recentLogs(ReadRequest request);
58+
59+
/**
60+
* Continuously polls the Log Cache /api/v1/read endpoint and streams new {@link Envelope}s
61+
* as they appear. This is the Java equivalent of the Go {@code logcache.Walk()} API and
62+
* {@code cf tail --follow}.
63+
* <p>
64+
* The returned {@link Flux} will never complete on its own – unsubscribe (or cancel) it to
65+
* stop streaming.
66+
*
67+
* @param request the tail request (source id, optional filters, poll interval)
68+
* @return an infinite stream of envelopes
69+
*/
70+
Flux<Envelope> logsTail(TailLogsRequest request);
5771
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2013-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.cloudfoundry.logcache.v1;
18+
19+
import org.cloudfoundry.Nullable;
20+
import org.immutables.value.Value;
21+
22+
import java.time.Duration;
23+
import java.util.List;
24+
25+
/**
26+
* The request options for the Log Cache tail (streaming follow) operation.
27+
* This continuously polls the Log Cache /api/v1/read endpoint, emitting new envelopes
28+
* as they appear – equivalent to {@code cf tail --follow} or the Go {@code logcache.Walk()} API.
29+
*/
30+
@Value.Immutable
31+
abstract class _TailLogsRequest {
32+
33+
/**
34+
* The source id (application guid or service guid) to stream logs for.
35+
*/
36+
abstract String getSourceId();
37+
38+
/**
39+
* Optional start time (UNIX nanoseconds). Defaults to "now – 5 seconds" when not set.
40+
*/
41+
@Nullable
42+
abstract Long getStartTime();
43+
44+
/**
45+
* Optional envelope type filter.
46+
*/
47+
@Nullable
48+
abstract List<EnvelopeType> getEnvelopeTypes();
49+
50+
/**
51+
* Optional regex name filter (requires Log Cache ≥ 2.1.0).
52+
*/
53+
@Nullable
54+
abstract String getNameFilter();
55+
56+
/**
57+
* How long to wait between successive polls when no new envelopes are available.
58+
* Defaults to 250 ms (matching the Go client's {@code AlwaysRetryBackoff}).
59+
*/
60+
@Value.Default
61+
Duration getPollInterval() {
62+
return Duration.ofMillis(250);
63+
}
64+
}
65+

cloudfoundry-operations/src/main/java/org/cloudfoundry/operations/applications/Applications.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
package org.cloudfoundry.operations.applications;
1818

1919
import org.cloudfoundry.doppler.LogMessage;
20+
import org.cloudfoundry.logcache.v1.Envelope;
2021
import org.cloudfoundry.logcache.v1.Log;
2122
import org.cloudfoundry.logcache.v1.ReadRequest;
23+
import org.cloudfoundry.logcache.v1.TailLogsRequest;
2224
import reactor.core.publisher.Flux;
2325
import reactor.core.publisher.Mono;
2426

@@ -137,6 +139,16 @@ public interface Applications {
137139
*/
138140
Flux<Log> logsRecent(ReadRequest request);
139141

142+
/**
143+
* Continuously streams application log envelopes from Log Cache by repeatedly polling
144+
* the {@code /api/v1/read} endpoint. The returned {@link Flux} is infinite – cancel it
145+
* to stop streaming. This is the Java equivalent of {@code cf tail --follow}.
146+
*
147+
* @param request the tail request (source id, optional filters, poll interval)
148+
* @return an infinite stream of envelopes
149+
*/
150+
Flux<Envelope> logsTail(TailLogsRequest request);
151+
140152
/**
141153
* List the applications logs.
142154
* Only works with {@code Loggregator < 107.0}, shipped in {@code CFD < 24.3}

cloudfoundry-operations/src/main/java/org/cloudfoundry/operations/applications/DefaultApplications.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@
158158
import org.cloudfoundry.logcache.v1.Log;
159159
import org.cloudfoundry.logcache.v1.LogCacheClient;
160160
import org.cloudfoundry.logcache.v1.ReadRequest;
161+
import org.cloudfoundry.logcache.v1.TailLogsRequest;
161162
import org.cloudfoundry.operations.util.OperationsLogging;
162163
import org.cloudfoundry.util.DateUtils;
163164
import org.cloudfoundry.util.DelayTimeoutException;
@@ -565,6 +566,14 @@ public Flux<Log> logsRecent(ReadRequest request) {
565566
.checkpoint();
566567
}
567568

569+
@Override
570+
public Flux<org.cloudfoundry.logcache.v1.Envelope> logsTail(TailLogsRequest request) {
571+
return this.logCacheClient
572+
.flatMapMany(client -> client.logsTail(request))
573+
.transform(OperationsLogging.log("Tail Application Logs"))
574+
.checkpoint();
575+
}
576+
568577
@Override
569578
public Flux<ApplicationLog> logs(ApplicationLogsRequest request) {
570579
return logs(LogsRequest.builder()

0 commit comments

Comments
 (0)