Skip to content

Commit d4e3265

Browse files
committed
Promote dashboard service impls to rqueue-web; introduce MessageBrowsingRepository
Eliminate per-backend stub classes for the dashboard's web services. Move all single-impl-capable service classes to rqueue-web and push the genuinely backend-shaped storage primitives behind a new repository abstraction. - Define MessageBrowsingRepository (rqueue-core/repository): three methods — getDataSize / getDataSizes (with bulk pipelining contract) / viewData. Replaces direct RqueueRedisTemplate access from RqueueQDetailServiceImpl. - Implement RedisMessageBrowsingRepository (rqueue-redis): wraps RqueueRedisTemplate, pipelines getDataSizes via RedisUtils.executePipeLine for the home-dashboard size tables, renders all four viewData paths (LIST / ZSET / SET / KEY). - Implement NatsMessageBrowsingRepository (rqueue-nats): sizes return 0; viewData throws BackendCapabilityException("nats", "viewData", ...) — JetStream KV does not model arbitrary keyed reads. RqueueWebExceptionAdvice maps that to HTTP 501. - Promote 4 service impls from rqueue-redis to rqueue-web (single shared impl, no @conditional gating, no per-backend stub): - RqueueDashboardChartServiceImpl (already pure RqueueQStatsDao consumer) - RqueueJobServiceImpl (already pure RqueueJobDao consumer) - RqueueSystemManagerServiceImpl (drop RqueueStringDao for queue-name set; use EndpointRegistry instead. RqueueStringDao kept as required=false for the Redis-only deleteQueue hard-cleanup path; on NATS that returns code=1 "not supported".) - RqueueQDetailServiceImpl (replace 14 stringRqueueRedisTemplate calls with repository calls; collapse 4 pipelined-size methods into one helper) - Add NatsRqueueQStatsDao (no-op) so the now-unconditional RqueueJobMetricsAggregatorService boots on NATS without a missing-bean failure. - Drop RedisBackendCondition from controllers + aggregator + view-controller service impl. The dashboard now wires on both backends; capability gaps surface as 501s from BackendCapabilityException, not as bean-graph failures. - Wire the new MessageBrowsingRepository bean in both RqueueRedisListenerConfig and RqueueNatsAutoConfig. - Move 5 unit-test files alongside the impls. Update mocks to target the repository's higher-level methods; add EndpointRegistry.delete() to test setup to prevent state leaks across the static singleton. Verified: ./gradlew compileJava compileTestJava clean across all modules; ./gradlew :rqueue-{core,web,redis,nats}:test -DincludeTags=unit all green. Assisted-By: Claude Code
1 parent df9a5b7 commit d4e3265

21 files changed

Lines changed: 553 additions & 348 deletions

File tree

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.github.sonus21.rqueue.repository;
18+
19+
import com.github.sonus21.rqueue.models.enums.DataType;
20+
import com.github.sonus21.rqueue.models.response.DataViewResponse;
21+
import java.util.List;
22+
23+
/**
24+
* Backend-shaped browsing primitives consumed by the dashboard's queue-detail and data-explorer
25+
* pages. Replaces direct {@code RqueueRedisTemplate} access from
26+
* {@code RqueueQDetailServiceImpl}, allowing a single backend-neutral service impl in
27+
* {@code rqueue-web}.
28+
*
29+
* <p>The interface deliberately exposes Redis-shaped operations on arbitrary keys (LIST / ZSET /
30+
* SET / KEY); it is the storage-layer abstraction, not a feature contract. Backends without an
31+
* equivalent (e.g. NATS JetStream KV) throw {@code BackendCapabilityException} from
32+
* {@link #viewData} and return {@code 0} from the size queries; the dashboard either hides the
33+
* panel (via capability flags) or surfaces the 501 cleanly.
34+
*/
35+
public interface MessageBrowsingRepository {
36+
37+
/**
38+
* Size of a single data structure addressed by {@code name}. Returns {@code 0} when the key
39+
* does not exist or the backend cannot model the structure.
40+
*/
41+
long getDataSize(String name, DataType type);
42+
43+
/**
44+
* Bulk size — same semantics as {@link #getDataSize} per element, but Redis impls are
45+
* expected to pipeline the round-trips. {@code names} and {@code types} must be the same
46+
* length; the returned list is the same length and order.
47+
*/
48+
List<Long> getDataSizes(List<String> names, List<DataType> types);
49+
50+
/**
51+
* Raw data-explorer browser used by the dashboard's "Data" page. Reads paginated rows out of
52+
* a backing data structure (LIST / ZSET / SET) or returns a single keyed value (KEY / single
53+
* ZSET member score). Backends without arbitrary keyed reads throw
54+
* {@code BackendCapabilityException}.
55+
*
56+
* @param name the storage key to read.
57+
* @param type the structure type at {@code name}.
58+
* @param key optional sub-key (e.g. ZSET member name to fetch a single score). Empty / null
59+
* returns a paginated range.
60+
* @param pageNumber zero-based page index.
61+
* @param itemPerPage page size.
62+
*/
63+
DataViewResponse viewData(
64+
String name, DataType type, String key, int pageNumber, int itemPerPage);
65+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
11+
package com.github.sonus21.rqueue.nats.dao;
12+
13+
import com.github.sonus21.rqueue.config.NatsBackendCondition;
14+
import com.github.sonus21.rqueue.dao.RqueueQStatsDao;
15+
import com.github.sonus21.rqueue.models.db.QueueStatistics;
16+
import java.util.Collection;
17+
import java.util.Collections;
18+
import java.util.List;
19+
import org.springframework.context.annotation.Conditional;
20+
import org.springframework.stereotype.Repository;
21+
22+
/**
23+
* NATS-backend stub for {@link RqueueQStatsDao}. The Redis impl persists per-queue daily
24+
* aggregates as serialized {@link QueueStatistics} objects driving the dashboard charts; on
25+
* NATS we no-op writes and return empty reads in v1 so that
26+
* {@code RqueueJobMetricsAggregatorService} can boot without a missing-bean failure even
27+
* though the chart panel is empty.
28+
*
29+
* <p>Replace with a NATS-native impl (a dedicated {@code rqueue-queue-stats} KV bucket
30+
* mirroring the pattern of {@code NatsRqueueSystemConfigDao}) when chart support lands for
31+
* NATS.
32+
*/
33+
@Repository
34+
@Conditional(NatsBackendCondition.class)
35+
public class NatsRqueueQStatsDao implements RqueueQStatsDao {
36+
37+
@Override
38+
public QueueStatistics findById(String id) {
39+
return null;
40+
}
41+
42+
@Override
43+
public List<QueueStatistics> findAll(Collection<String> ids) {
44+
return Collections.emptyList();
45+
}
46+
47+
@Override
48+
public void save(QueueStatistics queueStatistics) {
49+
// intentionally no-op until a NATS-native chart store lands
50+
}
51+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.github.sonus21.rqueue.nats.repository;
18+
19+
import com.github.sonus21.rqueue.exception.BackendCapabilityException;
20+
import com.github.sonus21.rqueue.models.enums.DataType;
21+
import com.github.sonus21.rqueue.models.response.DataViewResponse;
22+
import com.github.sonus21.rqueue.repository.MessageBrowsingRepository;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
26+
/**
27+
* NATS-backend impl of {@link MessageBrowsingRepository}. JetStream KV doesn't expose the
28+
* positional list / sorted-set primitives the dashboard's data-explorer panel requires, so
29+
* {@link #viewData} throws {@link BackendCapabilityException} (mapped to HTTP 501 by the web
30+
* advice). The size queries return {@code 0} — total in-flight / pending counts on a NATS
31+
* backend are surfaced through {@code MessageBroker.size(QueueDetail)} elsewhere; the raw
32+
* dashboard counts here represent Redis-data-structure sizes that have no NATS counterpart.
33+
*/
34+
public class NatsMessageBrowsingRepository implements MessageBrowsingRepository {
35+
36+
@Override
37+
public long getDataSize(String name, DataType type) {
38+
return 0L;
39+
}
40+
41+
@Override
42+
public List<Long> getDataSizes(List<String> names, List<DataType> types) {
43+
if (names == null || names.isEmpty()) {
44+
return new ArrayList<>();
45+
}
46+
List<Long> out = new ArrayList<>(names.size());
47+
for (int i = 0; i < names.size(); i++) {
48+
out.add(0L);
49+
}
50+
return out;
51+
}
52+
53+
@Override
54+
public DataViewResponse viewData(
55+
String name, DataType type, String key, int pageNumber, int itemPerPage) {
56+
throw new BackendCapabilityException(
57+
"nats",
58+
"viewData",
59+
"JetStream does not expose positional reads on arbitrary keys; the dashboard's"
60+
+ " data-explorer panel is Redis-only in v1.");
61+
}
62+
}

rqueue-redis/src/main/java/com/github/sonus21/rqueue/redis/config/RqueueRedisListenerConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030
import com.github.sonus21.rqueue.redis.dao.RqueueStringDaoImpl;
3131
import com.github.sonus21.rqueue.redis.lock.RqueueRedisLock;
3232
import com.github.sonus21.rqueue.redis.metrics.RedisRqueueQueueMetricsProvider;
33+
import com.github.sonus21.rqueue.redis.repository.RedisMessageBrowsingRepository;
3334
import com.github.sonus21.rqueue.redis.worker.RedisWorkerRegistryStore;
35+
import com.github.sonus21.rqueue.repository.MessageBrowsingRepository;
3436
import com.github.sonus21.rqueue.utils.RedisUtils;
3537
import com.github.sonus21.rqueue.worker.RqueueWorkerRegistry;
3638
import com.github.sonus21.rqueue.worker.RqueueWorkerRegistryImpl;
@@ -126,6 +128,14 @@ public WorkerRegistryStore redisWorkerRegistryStore(RqueueConfig rqueueConfig) {
126128
return new RedisWorkerRegistryStore(rqueueConfig);
127129
}
128130

131+
@Bean
132+
@Conditional(RedisBackendCondition.class)
133+
public MessageBrowsingRepository messageBrowsingRepository(
134+
@Qualifier("stringRqueueRedisTemplate")
135+
RqueueRedisTemplate<String> stringRqueueRedisTemplate) {
136+
return new RedisMessageBrowsingRepository(stringRqueueRedisTemplate);
137+
}
138+
129139
@Bean
130140
@Conditional(RedisBackendCondition.class)
131141
public RqueueWorkerRegistry rqueueWorkerRegistry(
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.github.sonus21.rqueue.redis.repository;
18+
19+
import com.github.sonus21.rqueue.common.RqueueRedisTemplate;
20+
import com.github.sonus21.rqueue.exception.UnknownSwitchCase;
21+
import com.github.sonus21.rqueue.models.enums.DataType;
22+
import com.github.sonus21.rqueue.models.response.DataViewResponse;
23+
import com.github.sonus21.rqueue.models.response.TableColumn;
24+
import com.github.sonus21.rqueue.models.response.TableRow;
25+
import com.github.sonus21.rqueue.repository.MessageBrowsingRepository;
26+
import com.github.sonus21.rqueue.utils.RedisUtils;
27+
import com.github.sonus21.rqueue.utils.StringUtils;
28+
import java.util.ArrayList;
29+
import java.util.Arrays;
30+
import java.util.Collections;
31+
import java.util.List;
32+
import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
33+
34+
/**
35+
* Redis-backed {@link MessageBrowsingRepository}. Wraps {@link RqueueRedisTemplate} for the
36+
* size and range primitives; delegates raw {@link org.springframework.data.redis.core.RedisTemplate}
37+
* access for the bulk pipelining used by {@link #getDataSizes} (one round-trip for N queues
38+
* instead of N).
39+
*
40+
* <p>This class exists to keep the Redis-shaped storage calls behind a stable interface so the
41+
* single {@code RqueueQDetailServiceImpl} in {@code rqueue-web} can serve both backends.
42+
*/
43+
public class RedisMessageBrowsingRepository implements MessageBrowsingRepository {
44+
45+
private final RqueueRedisTemplate<String> stringTemplate;
46+
47+
public RedisMessageBrowsingRepository(RqueueRedisTemplate<String> stringTemplate) {
48+
this.stringTemplate = stringTemplate;
49+
}
50+
51+
@Override
52+
public long getDataSize(String name, DataType type) {
53+
Long size;
54+
switch (type) {
55+
case LIST:
56+
size = stringTemplate.getListSize(name);
57+
break;
58+
case ZSET:
59+
size = stringTemplate.getZsetSize(name);
60+
break;
61+
default:
62+
// SET / KEY sizes are not used by the dashboard; return 0 rather than throw.
63+
return 0L;
64+
}
65+
return size == null ? 0L : size;
66+
}
67+
68+
@Override
69+
public List<Long> getDataSizes(List<String> names, List<DataType> types) {
70+
if (names == null || names.isEmpty()) {
71+
return Collections.emptyList();
72+
}
73+
if (types == null || names.size() != types.size()) {
74+
throw new IllegalArgumentException(
75+
"names and types must be the same length; names=" + names.size()
76+
+ " types=" + (types == null ? "null" : types.size()));
77+
}
78+
List<Object> raw = RedisUtils.executePipeLine(
79+
stringTemplate.getRedisTemplate(),
80+
(connection, keySerializer, valueSerializer) -> {
81+
for (int i = 0; i < names.size(); i++) {
82+
byte[] key = keySerializer.serialize(names.get(i));
83+
switch (types.get(i)) {
84+
case LIST:
85+
connection.lLen(key);
86+
break;
87+
case ZSET:
88+
connection.zCard(key);
89+
break;
90+
default:
91+
// Unknown size: emit something to keep pipeline alignment with the input.
92+
connection.exists(key);
93+
}
94+
}
95+
});
96+
List<Long> out = new ArrayList<>(names.size());
97+
for (Object o : raw) {
98+
if (o instanceof Number) {
99+
out.add(((Number) o).longValue());
100+
} else {
101+
out.add(0L);
102+
}
103+
}
104+
return out;
105+
}
106+
107+
@Override
108+
public DataViewResponse viewData(
109+
String name, DataType type, String key, int pageNumber, int itemPerPage) {
110+
switch (type) {
111+
case SET:
112+
return responseForSet(name);
113+
case ZSET:
114+
return responseForZset(name, key, pageNumber, itemPerPage);
115+
case LIST:
116+
return responseForList(name, pageNumber, itemPerPage);
117+
case KEY:
118+
return responseForKeyVal(name);
119+
default:
120+
throw new UnknownSwitchCase(type.name());
121+
}
122+
}
123+
124+
private DataViewResponse responseForSet(String name) {
125+
List<Object> items = new ArrayList<>(stringTemplate.getMembers(name));
126+
DataViewResponse response = new DataViewResponse();
127+
response.setHeaders(Collections.singletonList("Item"));
128+
List<TableRow> tableRows = new ArrayList<>();
129+
for (Object item : items) {
130+
tableRows.add(new TableRow(new TableColumn(item.toString())));
131+
}
132+
response.setRows(tableRows);
133+
return response;
134+
}
135+
136+
private DataViewResponse responseForKeyVal(String name) {
137+
DataViewResponse response = new DataViewResponse();
138+
response.setHeaders(Collections.singletonList("Value"));
139+
Object val = stringTemplate.get(name);
140+
response.addRow(new TableRow(new TableColumn(String.valueOf(val))));
141+
return response;
142+
}
143+
144+
private DataViewResponse responseForZset(
145+
String name, String key, int pageNumber, int itemPerPage) {
146+
DataViewResponse response = new DataViewResponse();
147+
int start = pageNumber * itemPerPage;
148+
int end = start + itemPerPage - 1;
149+
List<TableRow> tableRows = new ArrayList<>();
150+
if (!StringUtils.isEmpty(key)) {
151+
Double score = stringTemplate.getZsetMemberScore(name, key);
152+
response.setHeaders(Collections.singletonList("Score"));
153+
tableRows.add(new TableRow(new TableColumn(score)));
154+
} else {
155+
response.setHeaders(Arrays.asList("Value", "Score"));
156+
for (TypedTuple<String> tuple : stringTemplate.zrangeWithScore(name, start, end)) {
157+
tableRows.add(new TableRow(Arrays.asList(
158+
new TableColumn(String.valueOf(tuple.getValue())), new TableColumn(tuple.getScore()))));
159+
}
160+
}
161+
response.setRows(tableRows);
162+
return response;
163+
}
164+
165+
private DataViewResponse responseForList(String name, int pageNumber, int itemPerPage) {
166+
DataViewResponse response = new DataViewResponse();
167+
response.setHeaders(Collections.singletonList("Item"));
168+
int start = pageNumber * itemPerPage;
169+
int end = start + itemPerPage - 1;
170+
List<TableRow> tableRows = new ArrayList<>();
171+
for (Object s : stringTemplate.lrange(name, start, end)) {
172+
tableRows.add(new TableRow(new TableColumn(String.valueOf(s))));
173+
}
174+
response.setRows(tableRows);
175+
return response;
176+
}
177+
}

0 commit comments

Comments
 (0)