Skip to content

Commit 27f4ae5

Browse files
committed
SOLR-18130: rename parameter "zkHost" to "solrCloud" in "solrj-streaming" module
1 parent 7ab7a51 commit 27f4ae5

28 files changed

Lines changed: 416 additions & 744 deletions

solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/Lang.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,8 +370,6 @@ public static void register(StreamFactory streamFactory) {
370370
.withFunctionName("nodes", GatherNodesStream.class)
371371
.withFunctionName("select", SelectStream.class)
372372
.withFunctionName("shortestPath", ShortestPathStream.class)
373-
.withFunctionName("gatherNodes", GatherNodesStream.class)
374-
.withFunctionName("nodes", GatherNodesStream.class)
375373
.withFunctionName("scoreNodes", ScoreNodesStream.class)
376374
.withFunctionName("model", ModelStream.class)
377375
.withFunctionName("fetch", FetchStream.class)

solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
*/
6969
public class GatherNodesStream extends TupleStream implements Expressible {
7070

71-
private String zkHost;
71+
private String solrCloud;
7272
private String collection;
7373
private StreamContext streamContext;
7474
private Map<String, String> queryParams;
@@ -97,7 +97,7 @@ public class GatherNodesStream extends TupleStream implements Expressible {
9797
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
9898

9999
public GatherNodesStream(
100-
String zkHost,
100+
String solrCloud,
101101
String collection,
102102
TupleStream tupleStream,
103103
String traverseFrom,
@@ -110,7 +110,7 @@ public GatherNodesStream(
110110
int maxDocFreq) {
111111

112112
init(
113-
zkHost,
113+
solrCloud,
114114
collection,
115115
tupleStream,
116116
traverseFrom,
@@ -130,7 +130,6 @@ public GatherNodesStream(StreamExpression expression, StreamFactory factory) thr
130130

131131
String collectionName = factory.getValueOperand(expression, 0);
132132
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
133-
StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
134133

135134
List<StreamExpression> streamExpressions =
136135
factory.getExpressionOperandsRepresentingTypes(
@@ -276,6 +275,7 @@ public GatherNodesStream(StreamExpression expression, StreamFactory factory) thr
276275
Map<String, String> params = new HashMap<String, String>();
277276
for (StreamExpressionNamedParameter namedParam : namedParams) {
278277
if (!namedParam.getName().equals("zkHost")
278+
&& !namedParam.getName().equals("solrCloud")
279279
&& !namedParam.getName().equals("gather")
280280
&& !namedParam.getName().equals("walk")
281281
&& !namedParam.getName().equals("scatter")
@@ -287,29 +287,12 @@ public GatherNodesStream(StreamExpression expression, StreamFactory factory) thr
287287
}
288288
}
289289

290-
// zkHost, optional - if not provided then will look into factory list to get
291-
String zkHost = null;
292-
if (null == zkHostExpression) {
293-
zkHost = factory.getCollectionZkHost(collectionName);
294-
if (zkHost == null) {
295-
zkHost = factory.getDefaultZkHost();
296-
}
297-
} else if (zkHostExpression.getParameter() instanceof StreamExpressionValue) {
298-
zkHost = ((StreamExpressionValue) zkHostExpression.getParameter()).getValue();
299-
}
300-
301-
if (null == zkHost) {
302-
throw new IOException(
303-
String.format(
304-
Locale.ROOT,
305-
"invalid expression %s - zkHost not found for collection '%s'",
306-
expression,
307-
collectionName));
308-
}
290+
// solrCloud, optional - if not provided then will look into factory list to get
291+
String solrCloud = getSolrCloud(factory, expression, collectionName);
309292

310293
// We've got all the required items
311294
init(
312-
zkHost,
295+
solrCloud,
313296
collectionName,
314297
stream,
315298
traverseFrom,
@@ -326,7 +309,7 @@ public GatherNodesStream(StreamExpression expression, StreamFactory factory) thr
326309
}
327310

328311
private void init(
329-
String zkHost,
312+
String solrCloud,
330313
String collection,
331314
TupleStream tupleStream,
332315
String traverseFrom,
@@ -340,7 +323,7 @@ private void init(
340323
int window,
341324
int lag,
342325
int interval) {
343-
this.zkHost = zkHost;
326+
this.solrCloud = solrCloud;
344327
this.collection = collection;
345328
this.tupleStream = tupleStream;
346329
this.traverseFrom = traverseFrom;
@@ -405,8 +388,9 @@ private StreamExpression toExpression(StreamFactory factory, boolean includeStre
405388
}
406389
}
407390

408-
expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
409-
expression.addParameter(new StreamExpressionNamedParameter("gather", zkHost));
391+
expression.addParameter(new StreamExpressionNamedParameter("zkHost", solrCloud));
392+
// TODO: is a bug? or is solrCloud (ex. zkHost) actually supposed to be passed as gather?
393+
expression.addParameter(new StreamExpressionNamedParameter("gather", solrCloud));
410394
if (maxDocFreq > -1) {
411395
expression.addParameter(
412396
new StreamExpressionNamedParameter("maxDocFreq", Integer.toString(maxDocFreq)));
@@ -582,7 +566,7 @@ public List<Tuple> call() {
582566
try {
583567
stream =
584568
new UniqueStream(
585-
new CloudSolrStream(zkHost, collection, joinSParams),
569+
new CloudSolrStream(solrCloud, collection, joinSParams),
586570
new MultipleFieldEqualitor(
587571
new FieldEqualitor(gather), new FieldEqualitor(traverseTo)));
588572
stream.setStreamContext(streamContext);

solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java

Lines changed: 22 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class ShortestPathStream extends TupleStream implements Expressible {
7070
private String toField;
7171
private int joinBatchSize;
7272
private int maxDepth;
73-
private String zkHost;
73+
private String solrCloud;
7474
private String collection;
7575
private final Deque<Tuple> shortestPaths = new ArrayDeque<>();
7676
private boolean found;
@@ -79,7 +79,7 @@ public class ShortestPathStream extends TupleStream implements Expressible {
7979
private SolrParams queryParams;
8080

8181
public ShortestPathStream(
82-
String zkHost,
82+
String solrCloud,
8383
String collection,
8484
String fromNode,
8585
String toNode,
@@ -91,7 +91,7 @@ public ShortestPathStream(
9191
int maxDepth) {
9292

9393
init(
94-
zkHost,
94+
solrCloud,
9595
collection,
9696
fromNode,
9797
toNode,
@@ -107,7 +107,6 @@ public ShortestPathStream(StreamExpression expression, StreamFactory factory) th
107107

108108
String collectionName = factory.getValueOperand(expression, 0);
109109
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
110-
StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
111110

112111
// Collection Name
113112
if (null == collectionName) {
@@ -194,42 +193,24 @@ public ShortestPathStream(StreamExpression expression, StreamFactory factory) th
194193
Integer.parseInt(((StreamExpressionValue) depthExpression.getParameter()).getValue());
195194
}
196195

197-
ModifiableSolrParams params = new ModifiableSolrParams();
198-
for (StreamExpressionNamedParameter namedParam : namedParams) {
199-
if (!namedParam.getName().equals("zkHost")
200-
&& !namedParam.getName().equals("to")
201-
&& !namedParam.getName().equals("from")
202-
&& !namedParam.getName().equals("edge")
203-
&& !namedParam.getName().equals("maxDepth")
204-
&& !namedParam.getName().equals("threads")
205-
&& !namedParam.getName().equals("partitionSize")) {
206-
params.set(namedParam.getName(), namedParam.getParameter().toString().trim());
207-
}
208-
}
209-
210-
// zkHost, optional - if not provided then will look into factory list to get
211-
String zkHost = null;
212-
if (null == zkHostExpression) {
213-
zkHost = factory.getCollectionZkHost(collectionName);
214-
if (zkHost == null) {
215-
zkHost = factory.getDefaultZkHost();
216-
}
217-
} else if (zkHostExpression.getParameter() instanceof StreamExpressionValue) {
218-
zkHost = ((StreamExpressionValue) zkHostExpression.getParameter()).getValue();
219-
}
220-
221-
if (null == zkHost) {
222-
throw new IOException(
223-
String.format(
224-
Locale.ROOT,
225-
"invalid expression %s - zkHost not found for collection '%s'",
226-
expression,
227-
collectionName));
228-
}
196+
ModifiableSolrParams params =
197+
getModifiableSolrParamsWithExclusions(
198+
namedParams,
199+
"solrCloud",
200+
"zkHost",
201+
"to",
202+
"from",
203+
"edge",
204+
"maxDepth",
205+
"threads",
206+
"partitionSize");
207+
208+
// solrCloud, optional - if not provided then will look into factory list to get
209+
String solrCloud = getSolrCloud(factory, expression, collectionName);
229210

230211
// We've got all the required items
231212
init(
232-
zkHost,
213+
solrCloud,
233214
collectionName,
234215
fromNode,
235216
toNode,
@@ -242,7 +223,7 @@ public ShortestPathStream(StreamExpression expression, StreamFactory factory) th
242223
}
243224

244225
private void init(
245-
String zkHost,
226+
String solrCloud,
246227
String collection,
247228
String fromNode,
248229
String toNode,
@@ -252,7 +233,7 @@ private void init(
252233
int joinBatchSize,
253234
int threads,
254235
int maxDepth) {
255-
this.zkHost = zkHost;
236+
this.solrCloud = solrCloud;
256237
this.collection = collection;
257238
this.fromNode = fromNode;
258239
this.toNode = toNode;
@@ -285,7 +266,7 @@ public StreamExpressionParameter toExpression(StreamFactory factory) throws IOEx
285266
expression.addParameter(new StreamExpressionNamedParameter(param.getKey().toString(), value));
286267
}
287268

288-
expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
269+
expression.addParameter(new StreamExpressionNamedParameter("solrCloud", solrCloud));
289270
expression.addParameter(
290271
new StreamExpressionNamedParameter("maxDepth", Integer.toString(maxDepth)));
291272
expression.addParameter(
@@ -497,7 +478,7 @@ public List<Edge> call() {
497478
try {
498479
stream =
499480
new UniqueStream(
500-
new CloudSolrStream(zkHost, collection, joinParams),
481+
new CloudSolrStream(solrCloud, collection, joinParams),
501482
new MultipleFieldEqualitor(
502483
new FieldEqualitor(toField), new FieldEqualitor(fromField)));
503484
stream.setStreamContext(streamContext);

solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java

Lines changed: 19 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
6666

6767
private static final long serialVersionUID = 1;
6868

69-
protected String zkHost;
69+
protected String solrCloud;
7070
protected String collection;
7171
protected ModifiableSolrParams params;
7272
protected Map<String, String> fieldMappings;
@@ -81,22 +81,21 @@ public class CloudSolrStream extends TupleStream implements Expressible {
8181
protected CloudSolrStream() {}
8282

8383
/**
84-
* @param zkHost Zookeeper ensemble connection string
84+
* @param solrCloud Zookeeper or HTTPS(s) ensemble connection string
8585
* @param collectionName Name of the collection to operate on
8686
* @param params Map&lt;String, String[]&gt; of parameter/value pairs
8787
* @throws IOException Something went wrong
8888
*/
89-
public CloudSolrStream(String zkHost, String collectionName, SolrParams params)
89+
public CloudSolrStream(String solrCloud, String collectionName, SolrParams params)
9090
throws IOException {
91-
init(collectionName, zkHost, params);
91+
init(collectionName, solrCloud, params);
9292
}
9393

9494
public CloudSolrStream(StreamExpression expression, StreamFactory factory) throws IOException {
9595
// grab all parameters out
9696
String collectionName = factory.getValueOperand(expression, 0);
9797
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
9898
StreamExpressionNamedParameter aliasExpression = factory.getNamedOperand(expression, "aliases");
99-
StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
10099

101100
// Collection Name
102101
if (null == collectionName) {
@@ -107,7 +106,8 @@ public CloudSolrStream(StreamExpression expression, StreamFactory factory) throw
107106
expression));
108107
}
109108

110-
// Validate there are no unknown parameters - zkHost and alias are namedParameter, so we don't
109+
// Validate there are no unknown parameters - solrCloud/zkHost and alias are namedParameter, so
110+
// we don't
111111
// need to count it twice
112112
if (expression.getParameters().size() != 1 + namedParams.size()) {
113113
throw new IOException(
@@ -123,12 +123,8 @@ public CloudSolrStream(StreamExpression expression, StreamFactory factory) throw
123123
expression));
124124
}
125125

126-
ModifiableSolrParams mParams = new ModifiableSolrParams();
127-
for (StreamExpressionNamedParameter namedParam : namedParams) {
128-
if (!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("aliases")) {
129-
mParams.add(namedParam.getName(), namedParam.getParameter().toString().trim());
130-
}
131-
}
126+
ModifiableSolrParams mParams =
127+
getModifiableSolrParamsWithExclusions(namedParams, "solrCloud", "zkHost", "aliases");
132128

133129
// Aliases, optional, if provided then need to split
134130
if (null != aliasExpression
@@ -149,19 +145,11 @@ public CloudSolrStream(StreamExpression expression, StreamFactory factory) throw
149145
}
150146
}
151147

152-
// zkHost, optional - if not provided then will look into factory list to get
153-
String zkHost = null;
154-
if (null == zkHostExpression) {
155-
zkHost = factory.getCollectionZkHost(collectionName);
156-
if (zkHost == null) {
157-
zkHost = factory.getDefaultZkHost();
158-
}
159-
} else if (zkHostExpression.getParameter() instanceof StreamExpressionValue) {
160-
zkHost = ((StreamExpressionValue) zkHostExpression.getParameter()).getValue();
161-
}
148+
// solrCloud, optional - if not provided then will look into factory list to get
149+
String solrCloud = getSolrCloud(factory, expression, collectionName);
162150

163151
// We've got all the required items
164-
init(collectionName, zkHost, mParams);
152+
init(collectionName, solrCloud, mParams);
165153
}
166154

167155
@Override
@@ -189,8 +177,8 @@ public StreamExpression toExpression(StreamFactory factory) throws IOException {
189177
}
190178
}
191179

192-
// zkHost
193-
expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
180+
// solrCloud
181+
expression.addParameter(new StreamExpressionNamedParameter("solrCloud", solrCloud));
194182

195183
// aliases
196184
if (null != fieldMappings && 0 != fieldMappings.size()) {
@@ -241,8 +229,8 @@ public Explanation toExplanation(StreamFactory factory) throws IOException {
241229
return explanation;
242230
}
243231

244-
void init(String collectionName, String zkHost, SolrParams params) throws IOException {
245-
this.zkHost = zkHost;
232+
void init(String collectionName, String solrCloud, SolrParams params) throws IOException {
233+
this.solrCloud = solrCloud;
246234
this.collection = collectionName;
247235
this.params = new ModifiableSolrParams(params);
248236

@@ -384,17 +372,17 @@ protected void constructStreams() throws IOException {
384372
if (streamContext != null && streamContext.get("shards") != null) {
385373
// stream of shard url with core
386374
final List<String> shards =
387-
getShards(this.zkHost, this.collection, this.streamContext, mParams);
375+
getShards(this.solrCloud, this.collection, this.streamContext, mParams);
388376
if (shards.isEmpty())
389-
throw new IOException("No shards available from ZooKeeper: " + this.zkHost);
377+
throw new IOException("No shards available from ZooKeeper: " + this.solrCloud);
390378
streamOfSolrStream = shards.stream().map(s -> new SolrStream(s, mParams));
391379
} else {
392380
// stream of replicas to reuse the same SolrHttpClient per baseUrl
393381
// avoids re-parsing data we already have in the replicas
394382
final List<Replica> replicas =
395-
getReplicas(this.zkHost, this.collection, this.streamContext, mParams);
383+
getReplicas(this.solrCloud, this.collection, this.streamContext, mParams);
396384
if (replicas.isEmpty())
397-
throw new IOException("No replicas available from ZooKeeper: " + this.zkHost);
385+
throw new IOException("No replicas available from ZooKeeper: " + this.solrCloud);
398386
streamOfSolrStream =
399387
replicas.stream().map(r -> new SolrStream(r.getBaseUrl(), mParams, r.getCoreName()));
400388
}

0 commit comments

Comments
 (0)