|
22 | 22 | import java.util.Map; |
23 | 23 | import java.util.concurrent.atomic.AtomicLong; |
24 | 24 | import org.cloudfoundry.logcache.v1.Envelope; |
25 | | -import org.cloudfoundry.logcache.v1.EnvelopeBatch; |
26 | 25 | import org.cloudfoundry.logcache.v1.EnvelopeType; |
27 | 26 | import org.cloudfoundry.logcache.v1.InfoRequest; |
28 | 27 | import org.cloudfoundry.logcache.v1.InfoResponse; |
@@ -63,85 +62,84 @@ Mono<ReadResponse> recentLogs(ReadRequest request) { |
63 | 62 | } |
64 | 63 |
|
65 | 64 | /** |
66 | | - * Continuously polls Log Cache and emits new {@link Envelope}s as they appear. |
| 65 | + * Continuously polls Log Cache and emits new {@link Envelope}s as they arrive. |
67 | 66 | * |
68 | | - * <p>Algorithm (mirrors the Go {@code logcache.Walk()} implementation): |
| 67 | + * <p>Mirrors the Go {@code logcache.Walk()} / {@code cf tail --follow} semantics: |
69 | 68 | * <ol> |
70 | | - * <li>Start from {@code startTime} (defaults to now – 5 s in nanoseconds).</li> |
71 | | - * <li>Issue a {@code /api/v1/read} with {@code start_time = cursor}.</li> |
72 | | - * <li>Emit every returned envelope in ascending timestamp order and advance cursor |
73 | | - * to {@code lastTimestamp + 1}.</li> |
74 | | - * <li>If the response was empty, wait {@code pollInterval} before next poll.</li> |
75 | | - * <li>Repeat indefinitely until cancelled.</li> |
| 69 | + * <li>Start the cursor at {@code startTime} (defaults to now − 5 s in |
| 70 | + * nanoseconds).</li> |
| 71 | + * <li>Issue {@code GET /api/v1/read/{sourceId}?start_time=cursor}.</li> |
| 72 | + * <li>Emit every returned envelope in ascending timestamp order and advance |
| 73 | + * the cursor to {@code lastTimestamp + 1}.</li> |
| 74 | + * <li>When the batch is empty, wait {@code pollInterval} before the next poll.</li> |
| 75 | + * <li>Repeat forever – the caller cancels the subscription to stop.</li> |
76 | 76 | * </ol> |
| 77 | + * Fully non-blocking: no {@code Thread.sleep}. |
77 | 78 | */ |
78 | 79 | Flux<Envelope> logsTail(TailLogsRequest request) { |
79 | 80 | long defaultStartNanos = (System.currentTimeMillis() - 5_000L) * 1_000_000L; |
80 | | - AtomicLong cursor = |
81 | | - new AtomicLong( |
82 | | - request.getStartTime() != null |
83 | | - ? request.getStartTime() |
84 | | - : defaultStartNanos); |
| 81 | + AtomicLong cursor = new AtomicLong( |
| 82 | + request.getStartTime() != null ? request.getStartTime() : defaultStartNanos); |
85 | 83 |
|
86 | 84 | List<EnvelopeType> envelopeTypes = |
87 | 85 | request.getEnvelopeTypes() != null |
88 | 86 | ? request.getEnvelopeTypes() |
89 | 87 | : Collections.emptyList(); |
90 | 88 | String nameFilter = request.getNameFilter(); |
91 | 89 |
|
92 | | - // One poll: read from cursor, sort, emit, advance cursor. |
93 | | - // Returns empty Mono when batch is empty (triggers delay+repeat). |
94 | | - Flux<Envelope> poll = |
95 | | - Mono.defer( |
96 | | - () -> { |
97 | | - ReadRequest.Builder builder = |
98 | | - ReadRequest.builder() |
99 | | - .sourceId(request.getSourceId()) |
100 | | - .startTime(cursor.get()); |
101 | | - if (!envelopeTypes.isEmpty()) { |
102 | | - builder.envelopeTypes(envelopeTypes); |
103 | | - } |
104 | | - if (nameFilter != null && !nameFilter.isEmpty()) { |
105 | | - builder.nameFilter(nameFilter); |
106 | | - } |
107 | | - return read(builder.build()) |
108 | | - .map(ReadResponse::getEnvelopes) |
109 | | - .map(EnvelopeBatch::getBatch) |
110 | | - .onErrorReturn(Collections.emptyList()); |
111 | | - }) |
112 | | - .flatMapMany( |
113 | | - batch -> { |
114 | | - if (batch.isEmpty()) { |
115 | | - return Flux.empty(); |
116 | | - } |
117 | | - List<Envelope> sorted = new ArrayList<>(batch); |
118 | | - sorted.sort( |
119 | | - (a, b) -> { |
120 | | - long ta = |
121 | | - a.getTimestamp() != null |
122 | | - ? a.getTimestamp() |
123 | | - : 0L; |
124 | | - long tb = |
125 | | - b.getTimestamp() != null |
126 | | - ? b.getTimestamp() |
127 | | - : 0L; |
128 | | - return Long.compare(ta, tb); |
129 | | - }); |
130 | | - long last = |
131 | | - sorted.get(sorted.size() - 1).getTimestamp() != null |
132 | | - ? sorted.get(sorted.size() - 1).getTimestamp() |
133 | | - : cursor.get(); |
134 | | - cursor.set(last + 1); |
135 | | - return Flux.fromIterable(sorted); |
136 | | - }); |
137 | | - |
138 | | - // Repeat poll indefinitely; when empty (no new data) delay before next attempt. |
139 | | - return poll.repeatWhen( |
140 | | - flux -> |
141 | | - flux.flatMap( |
142 | | - count -> |
143 | | - count == 0 |
144 | | - ? Mono.delay(request.getPollInterval()) |
145 | | - : Mono.just(count))); |
| 90 | + /* |
| 91 | + * Strategy (mirrors Go's logcache.Walk): |
| 92 | + * – Mono.defer builds a fresh ReadRequest from the mutable cursor on every repetition. |
| 93 | + * – The Mono returns either the sorted batch (non-empty) or an empty list. |
| 94 | + * – flatMapMany turns each batch into a stream of individual Envelope items. |
| 95 | + * – repeat() subscribes again after each completion. |
| 96 | + * – When the batch was empty we insert a delay via Mono.delay before the next |
| 97 | + * repetition so we do not hammer the server. We signal "empty" by returning |
| 98 | + * a sentinel Mono<Boolean> (false = was empty, true = had data) and use |
| 99 | + * repeatWhen to conditionally delay. |
| 100 | + */ |
| 101 | + return Flux.defer(() -> { |
| 102 | + // Build the read request from the current cursor position. |
| 103 | + ReadRequest.Builder builder = ReadRequest.builder() |
| 104 | + .sourceId(request.getSourceId()) |
| 105 | + .startTime(cursor.get()); |
| 106 | + if (!envelopeTypes.isEmpty()) { |
| 107 | + builder.envelopeTypes(envelopeTypes); |
| 108 | + } |
| 109 | + if (nameFilter != null && !nameFilter.isEmpty()) { |
| 110 | + builder.nameFilter(nameFilter); |
| 111 | + } |
| 112 | + |
| 113 | + return read(builder.build()) |
| 114 | + .onErrorReturn(ReadResponse.builder().build()) |
| 115 | + .flatMapMany(resp -> { |
| 116 | + List<Envelope> raw = resp.getEnvelopes() != null |
| 117 | + ? resp.getEnvelopes().getBatch() |
| 118 | + : Collections.emptyList(); |
| 119 | + |
| 120 | + if (raw.isEmpty()) { |
| 121 | + // Signal "no data" so repeatWhen can insert the back-off delay. |
| 122 | + return Flux.empty(); |
| 123 | + } |
| 124 | + |
| 125 | + // Sort ascending by timestamp and advance the cursor. |
| 126 | + List<Envelope> sorted = new ArrayList<>(raw); |
| 127 | + sorted.sort((a, b) -> Long.compare( |
| 128 | + a.getTimestamp() != null ? a.getTimestamp() : 0L, |
| 129 | + b.getTimestamp() != null ? b.getTimestamp() : 0L)); |
| 130 | + |
| 131 | + Envelope last = sorted.get(sorted.size() - 1); |
| 132 | + cursor.set((last.getTimestamp() != null |
| 133 | + ? last.getTimestamp() : cursor.get()) + 1); |
| 134 | + |
| 135 | + return Flux.fromIterable(sorted); |
| 136 | + }); |
| 137 | + }) |
| 138 | + // repeatWhen receives a Flux<Long> where each element is the count of items |
| 139 | + // emitted in the previous cycle (0 = empty batch → insert delay). |
| 140 | + .repeatWhen(companion -> companion.flatMap(count -> |
| 141 | + count == 0 |
| 142 | + ? Mono.delay(request.getPollInterval()) |
| 143 | + : Mono.just(count))); |
146 | 144 | } |
147 | 145 | } |
0 commit comments