|
15 | 15 | */ |
16 | 16 | package org.springframework.data.couchbase.repository.query; |
17 | 17 |
|
| 18 | +import org.reactivestreams.Publisher; |
18 | 19 | import org.springframework.data.couchbase.core.ReactiveCouchbaseOperations; |
19 | 20 | import org.springframework.data.couchbase.core.ReactiveFindBySearchOperation; |
20 | 21 | import org.springframework.data.domain.Pageable; |
21 | 22 | import org.springframework.data.repository.query.ParametersParameterAccessor; |
22 | 23 | import org.springframework.data.repository.query.RepositoryQuery; |
23 | 24 | import org.springframework.data.repository.query.ResultProcessor; |
24 | | -import org.springframework.data.repository.util.ReactiveWrapperConverters; |
25 | 25 | import org.springframework.util.Assert; |
26 | 26 |
|
27 | 27 | import org.springframework.data.couchbase.repository.Search; |
|
31 | 31 | import com.couchbase.client.java.search.SearchRequest; |
32 | 32 | import com.couchbase.client.java.search.SearchScanConsistency; |
33 | 33 |
|
| 34 | +import reactor.core.publisher.Flux; |
| 35 | +import reactor.core.publisher.Mono; |
| 36 | + |
34 | 37 | /** |
35 | 38 | * Reactive {@link RepositoryQuery} implementation for FTS {@link Search}-annotated methods. |
36 | 39 | * <p> |
@@ -73,16 +76,21 @@ public ReactiveSearchBasedCouchbaseQuery(ReactiveCouchbaseQueryMethod method, |
73 | 76 | public Object execute(Object[] parameters) { |
74 | 77 | ReactiveCouchbaseParameterAccessor accessor = new ReactiveCouchbaseParameterAccessor(method, parameters); |
75 | 78 |
|
76 | | - return accessor.resolveParameters().flatMapMany(resolvedAccessor -> { |
77 | | - ResultProcessor processor = method.getResultProcessor().withDynamicProjection(resolvedAccessor); |
78 | | - SearchRepositoryQuerySupport.validateSort(resolvedAccessor); |
| 79 | + if (method.isCollectionQuery()) { |
| 80 | + return accessor.resolveParameters().flatMapMany(resolvedAccessor -> Flux.from(doExecute(resolvedAccessor))); |
| 81 | + } |
| 82 | + return accessor.resolveParameters().flatMap(resolvedAccessor -> Mono.from(doExecute(resolvedAccessor))); |
| 83 | + } |
| 84 | + |
| 85 | + private Publisher<?> doExecute(ParametersParameterAccessor resolvedAccessor) { |
| 86 | + ResultProcessor processor = method.getResultProcessor().withDynamicProjection(resolvedAccessor); |
| 87 | + SearchRepositoryQuerySupport.validateSort(resolvedAccessor); |
79 | 88 |
|
80 | | - String resolvedQuery = SearchBasedCouchbaseQuery.resolveParameters(searchQueryTemplate, resolvedAccessor); |
81 | | - SearchRequest request = SearchRequest.create(SearchQuery.queryString(resolvedQuery)); |
82 | | - Object result = executeDependingOnType(resolvedAccessor, request); |
| 89 | + String resolvedQuery = SearchBasedCouchbaseQuery.resolveParameters(searchQueryTemplate, resolvedAccessor); |
| 90 | + SearchRequest request = SearchRequest.create(SearchQuery.queryString(resolvedQuery)); |
| 91 | + Object result = executeDependingOnType(resolvedAccessor, request); |
83 | 92 |
|
84 | | - return ReactiveWrapperConverters.toWrapper(processor.processResult(result), reactor.core.publisher.Flux.class); |
85 | | - }); |
| 93 | + return (Publisher<?>) processor.processResult(result); |
86 | 94 | } |
87 | 95 |
|
88 | 96 | @Override |
|
0 commit comments