Skip to content

Commit 9ec98b0

Browse files
authored
Merge pull request #1019: [tools] introduce ProximaShell for scripting
2 parents 2dbe6a4 + ce092db commit 9ec98b0

7 files changed

Lines changed: 467 additions & 299 deletions

File tree

tools/src/main/java/cz/o2/proxima/tools/groovy/Console.java

Lines changed: 11 additions & 291 deletions
Original file line numberDiff line numberDiff line change
@@ -18,85 +18,47 @@
1818
import cz.o2.proxima.core.repository.AttributeDescriptor;
1919
import cz.o2.proxima.core.repository.EntityDescriptor;
2020
import cz.o2.proxima.core.repository.Repository;
21-
import cz.o2.proxima.core.scheme.ValueSerializer;
22-
import cz.o2.proxima.core.storage.StreamElement;
23-
import cz.o2.proxima.core.storage.commitlog.Position;
24-
import cz.o2.proxima.core.util.Optionals;
25-
import cz.o2.proxima.direct.core.DirectDataOperator;
26-
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
2721
import cz.o2.proxima.direct.server.rpc.proto.service.RetrieveServiceGrpc;
2822
import cz.o2.proxima.direct.server.rpc.proto.service.RetrieveServiceGrpc.RetrieveServiceBlockingStub;
2923
import cz.o2.proxima.direct.server.rpc.proto.service.Rpc;
3024
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
3125
import cz.o2.proxima.internal.com.google.common.base.Preconditions;
32-
import cz.o2.proxima.internal.com.google.common.collect.Streams;
3326
import cz.o2.proxima.tools.groovy.internal.ProximaInterpreter;
34-
import cz.o2.proxima.tools.io.ConsoleRandomReader;
3527
import cz.o2.proxima.typesafe.config.Config;
3628
import cz.o2.proxima.typesafe.config.ConfigFactory;
37-
import freemarker.template.Configuration;
38-
import freemarker.template.TemplateExceptionHandler;
3929
import groovy.lang.Binding;
40-
import groovy.lang.Closure;
4130
import io.grpc.Channel;
4231
import io.grpc.ManagedChannelBuilder;
4332
import java.io.IOException;
4433
import java.io.InputStream;
45-
import java.util.ArrayList;
46-
import java.util.Arrays;
47-
import java.util.List;
48-
import java.util.Optional;
49-
import java.util.ServiceLoader;
50-
import java.util.UUID;
5134
import java.util.concurrent.BlockingQueue;
52-
import java.util.concurrent.CountDownLatch;
5335
import java.util.concurrent.ExecutorService;
5436
import java.util.concurrent.Executors;
5537
import java.util.concurrent.LinkedBlockingDeque;
56-
import java.util.concurrent.atomic.AtomicReference;
57-
import java.util.stream.Collectors;
58-
import javax.annotation.Nullable;
59-
import lombok.Getter;
6038
import lombok.extern.slf4j.Slf4j;
6139
import org.apache.groovy.groovysh.Groovysh;
6240
import org.codehaus.groovy.tools.shell.IO;
6341

6442
/** This is the groovysh based console. */
6543
@Slf4j
66-
public class Console implements AutoCloseable {
67-
68-
private static AtomicReference<Console> INSTANCE = new AtomicReference<>();
44+
public class Console extends ShellRunnable implements AutoCloseable {
6945

7046
public static final String INITIAL_STATEMENT = "env = new Environment()";
7147

72-
/**
73-
* This is supposed to be called only from the groovysh initialized in this main method.
74-
*
75-
* @return the singleton instance
76-
*/
77-
public static final Console get() {
78-
return INSTANCE.get();
79-
}
80-
8148
public static Console get(String[] args) {
82-
if (INSTANCE.get() == null) {
83-
synchronized (Console.class) {
84-
if (INSTANCE.get() == null) {
85-
INSTANCE.set(new Console(args));
86-
}
87-
}
49+
if (ShellRunnable.get() == null) {
50+
return new Console(args);
8851
}
89-
return INSTANCE.get();
52+
Preconditions.checkState(ShellRunnable.get() instanceof Console);
53+
return (Console) ShellRunnable.get();
9054
}
9155

9256
public static Console create(Config config, Repository repo) {
93-
INSTANCE.set(new Console(config, repo, new String[] {}));
94-
return INSTANCE.get();
57+
return new Console(config, repo, new String[] {});
9558
}
9659

9760
public static Console create(Config config, Repository repo, String[] args) {
98-
INSTANCE.set(new Console(config, repo, args));
99-
return INSTANCE.get();
61+
return new Console(config, repo, args);
10062
}
10163

10264
public static void main(String[] args) throws Exception {
@@ -106,12 +68,7 @@ public static void main(String[] args) throws Exception {
10668
}
10769
}
10870

109-
private final ClassLoader previous;
110-
private final String[] args;
11171
private final BlockingQueue<Integer> input = new LinkedBlockingDeque<>();
112-
@Getter private final Repository repo;
113-
private final List<ConsoleRandomReader> readers = new ArrayList<>();
114-
private final Configuration conf;
11572
private final Config config;
11673
private final ExecutorService executor =
11774
Executors.newCachedThreadPool(
@@ -123,8 +80,6 @@ public static void main(String[] args) throws Exception {
12380
(thrd, err) -> log.error("Error in thread {}", thrd.getName(), err));
12481
return t;
12582
});
126-
StreamProvider streamProvider;
127-
@Nullable private final DirectDataOperator direct;
12883
Groovysh shell;
12984

13085
Console(String[] args) {
@@ -137,24 +92,8 @@ public static void main(String[] args) throws Exception {
13792

13893
@VisibleForTesting
13994
Console(Config config, Repository repo, String[] args) {
140-
this.args = args;
95+
super(repo, args);
14196
this.config = config;
142-
this.repo = repo;
143-
this.direct =
144-
repo.hasOperator("direct") ? repo.getOrCreateOperator(DirectDataOperator.class) : null;
145-
this.previous = Thread.currentThread().getContextClassLoader();
146-
conf = new Configuration(Configuration.VERSION_2_3_23);
147-
conf.setDefaultEncoding("utf-8");
148-
conf.setClassForTemplateLoading(getClass(), "/");
149-
conf.setTemplateExceptionHandler(TemplateExceptionHandler.RETHROW_HANDLER);
150-
conf.setLogTemplateExceptions(false);
151-
152-
initializeStreamProvider();
153-
updateClassLoader();
154-
155-
if (INSTANCE.get() == null) {
156-
INSTANCE.set(this);
157-
}
15897
}
15998

16099
@VisibleForTesting
@@ -180,226 +119,10 @@ private void setShell(Groovysh shell) {
180119
this.shell = shell;
181120
}
182121

183-
public void createWrapperClass() throws Exception {
184-
updateClassLoader();
185-
ToolsClassLoader classLoader =
186-
(ToolsClassLoader) Thread.currentThread().getContextClassLoader();
187-
log.debug("Creating Environment class in classloader {}", classLoader);
188-
GroovyEnv.createWrapperInLoader(conf, repo, classLoader);
189-
}
190-
191-
@VisibleForTesting
192-
void initializeStreamProvider() {
193-
ServiceLoader<StreamProvider> loader = ServiceLoader.load(StreamProvider.class);
194-
// sort possible test implementations on top
195-
streamProvider =
196-
Streams.stream(loader)
197-
.min(
198-
(a, b) -> {
199-
String cls1 = a.getClass().getSimpleName();
200-
String cls2 = b.getClass().getSimpleName();
201-
if (cls1.startsWith("Test") ^ cls2.startsWith("Test")) {
202-
if (cls1.startsWith("Test")) {
203-
return -1;
204-
}
205-
return 1;
206-
}
207-
return cls1.compareTo(cls2);
208-
})
209-
.orElseThrow(
210-
() ->
211-
new IllegalArgumentException(
212-
String.format(
213-
"Unable to find any StreamProvider in classpath. Please check dependencies. Looking for service implements '%s' interface.",
214-
StreamProvider.class.getName())));
215-
log.info("Using {} as StreamProvider", streamProvider);
216-
streamProvider.init(repo, args == null ? new String[] {} : args);
217-
}
218-
219-
private void updateClassLoader() {
220-
if (!(Thread.currentThread().getContextClassLoader() instanceof ToolsClassLoader)) {
221-
Thread.currentThread().setContextClassLoader(new ToolsClassLoader());
222-
}
223-
}
224-
225122
private static Config getConfig() {
226123
return ConfigFactory.load().resolve();
227124
}
228125

229-
public <T> Stream<StreamElement> getStream(
230-
AttributeDescriptor<T> attrDesc, Position position, boolean stopAtCurrent) {
231-
232-
return getStream(attrDesc, position, stopAtCurrent, false);
233-
}
234-
235-
public <T> Stream<StreamElement> getStream(
236-
AttributeDescriptor<T> attrDesc,
237-
Position position,
238-
boolean stopAtCurrent,
239-
boolean eventTime) {
240-
241-
return streamProvider.getStream(
242-
position, stopAtCurrent, eventTime, this::unboundedStreamInterrupt, attrDesc);
243-
}
244-
245-
public Stream<StreamElement> getUnionStream(
246-
Position position,
247-
boolean eventTime,
248-
boolean stopAtCurrent,
249-
AttributeDescriptorProvider<?>... descriptors) {
250-
251-
return streamProvider.getStream(
252-
position,
253-
stopAtCurrent,
254-
eventTime,
255-
this::unboundedStreamInterrupt,
256-
Arrays.stream(descriptors)
257-
.distinct()
258-
.map(AttributeDescriptorProvider::desc)
259-
.toArray(AttributeDescriptor[]::new));
260-
}
261-
262-
public WindowedStream<StreamElement> getBatchSnapshot(AttributeDescriptor<?> attrDesc) {
263-
return getBatchSnapshot(attrDesc, Long.MIN_VALUE, Long.MAX_VALUE);
264-
}
265-
266-
public WindowedStream<StreamElement> getBatchSnapshot(
267-
AttributeDescriptor<?> attrDesc, long fromStamp, long toStamp) {
268-
269-
return streamProvider.getBatchSnapshot(
270-
fromStamp, toStamp, this::unboundedStreamInterrupt, attrDesc);
271-
}
272-
273-
public WindowedStream<StreamElement> getBatchUpdates(
274-
long startStamp, long endStamp, AttributeDescriptorProvider<?>... attrs) {
275-
276-
List<AttributeDescriptor<?>> attrList =
277-
Arrays.stream(attrs).map(AttributeDescriptorProvider::desc).collect(Collectors.toList());
278-
279-
return streamProvider.getBatchUpdates(
280-
startStamp,
281-
endStamp,
282-
this::unboundedStreamInterrupt,
283-
attrList.toArray(new AttributeDescriptor[attrList.size()]));
284-
}
285-
286-
public <T> WindowedStream<T> getImpulse(@Nullable String name, Closure<T> factory) {
287-
return streamProvider.impulse(factory);
288-
}
289-
290-
public <T> WindowedStream<T> getPeriodicImpulse(
291-
@Nullable String name, Closure<T> factory, long durationMs) {
292-
return streamProvider.periodicImpulse(factory, durationMs);
293-
}
294-
295-
public ConsoleRandomReader getRandomAccessReader(String entity) {
296-
Preconditions.checkState(
297-
direct != null,
298-
"Can create random access reader with direct operator only. Add runtime dependency.");
299-
EntityDescriptor entityDesc = findEntityDescriptor(entity);
300-
ConsoleRandomReader reader = new ConsoleRandomReader(entityDesc, direct);
301-
readers.add(reader);
302-
return reader;
303-
}
304-
305-
public void put(
306-
EntityDescriptor entityDesc,
307-
AttributeDescriptor<?> attrDesc,
308-
String key,
309-
String attribute,
310-
String value)
311-
throws InterruptedException {
312-
313-
put(entityDesc, attrDesc, key, attribute, System.currentTimeMillis(), value);
314-
}
315-
316-
public void put(
317-
EntityDescriptor entityDesc,
318-
AttributeDescriptor<?> attrDesc,
319-
String key,
320-
String attribute,
321-
long stamp,
322-
String value)
323-
throws InterruptedException {
324-
325-
Preconditions.checkState(
326-
direct != null, "Can write with direct operator only. Add runtime dependency");
327-
328-
@SuppressWarnings("unchecked")
329-
ValueSerializer<Object> valueSerializer =
330-
(ValueSerializer<Object>) attrDesc.getValueSerializer();
331-
byte[] payload = valueSerializer.serialize(valueSerializer.fromJsonValue(value));
332-
OnlineAttributeWriter writer =
333-
Optionals.get(direct.getWriter(attrDesc), "Cannot find writer for attribute %s", attrDesc);
334-
CountDownLatch latch = new CountDownLatch(1);
335-
AtomicReference<Throwable> exc = new AtomicReference<>();
336-
writer.write(
337-
StreamElement.upsert(
338-
entityDesc, attrDesc, UUID.randomUUID().toString(), key, attribute, stamp, payload),
339-
(success, ex) -> {
340-
if (!success) {
341-
exc.set(ex);
342-
}
343-
latch.countDown();
344-
});
345-
latch.await();
346-
if (exc.get() != null) {
347-
throw new RuntimeException(exc.get());
348-
}
349-
}
350-
351-
public void delete(
352-
EntityDescriptor entityDesc, AttributeDescriptor<?> attrDesc, String key, String attribute)
353-
throws InterruptedException {
354-
355-
delete(entityDesc, attrDesc, key, attribute, System.currentTimeMillis());
356-
}
357-
358-
public void delete(
359-
EntityDescriptor entityDesc,
360-
AttributeDescriptor<?> attrDesc,
361-
String key,
362-
String attribute,
363-
long stamp)
364-
throws InterruptedException {
365-
366-
Preconditions.checkState(
367-
direct != null, "Can write with direct operator only. Add runtime dependency");
368-
OnlineAttributeWriter writer = Optionals.get(direct.getWriter(attrDesc));
369-
CountDownLatch latch = new CountDownLatch(1);
370-
AtomicReference<Throwable> exc = new AtomicReference<>();
371-
final StreamElement delete;
372-
if (attrDesc.isWildcard() && attribute.equals(attrDesc.getName())) {
373-
delete =
374-
StreamElement.deleteWildcard(
375-
entityDesc, attrDesc, UUID.randomUUID().toString(), key, stamp);
376-
} else {
377-
delete =
378-
StreamElement.delete(
379-
entityDesc, attrDesc, UUID.randomUUID().toString(), key, attribute, stamp);
380-
}
381-
writer.write(
382-
delete,
383-
(success, ex) -> {
384-
if (!success) {
385-
exc.set(ex);
386-
}
387-
latch.countDown();
388-
});
389-
latch.await();
390-
if (exc.get() != null) {
391-
throw new RuntimeException(exc.get());
392-
}
393-
}
394-
395-
public Optional<DirectDataOperator> getDirect() {
396-
return Optional.ofNullable(direct);
397-
}
398-
399-
public EntityDescriptor findEntityDescriptor(String entity) {
400-
return repo.getEntity(entity);
401-
}
402-
403126
public Rpc.ListResponse rpcList(
404127
EntityDescriptor entity,
405128
String key,
@@ -441,17 +164,14 @@ public Rpc.GetResponse rpcGet(
441164
@Override
442165
public void close() {
443166
log.debug("Console shutting down.");
444-
Optional.ofNullable(streamProvider).ifPresent(StreamProvider::close);
445-
readers.forEach(ConsoleRandomReader::close);
446-
readers.clear();
167+
super.close();
447168
executor.shutdownNow();
448169
input.clear();
449170
Preconditions.checkState(input.offer(-1));
450-
Optional.ofNullable(direct).ifPresent(DirectDataOperator::close);
451-
Thread.currentThread().setContextClassLoader(previous);
452171
}
453172

454-
private boolean unboundedStreamInterrupt() {
173+
@Override
174+
boolean unboundedStreamInterrupt() {
455175
try {
456176
return takeInputChar() == 'q';
457177
} catch (InterruptedException ex) {

0 commit comments

Comments
 (0)