Skip to content

Commit 553cb45

Browse files
Use Conductor-based streams for StdIO (#107)
* Add StdIO streams + Bump conductor version * Abstract away error and output displays * Formatting changes * Ensure the write functions are called asynchronously and cache readers and writers * More formatting changes * Add input function * Pass the ErrorValue creation process to the displayError function * Remove console.log from streams file * Fix controller close + use allSettled instead of Promise.all --------- Co-authored-by: Martin Henz <henz@comp.nus.edu.sg>
1 parent b7ebd4c commit 553cb45

9 files changed

Lines changed: 188 additions & 48 deletions

File tree

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
"typescript-eslint": "^8.56.1"
5656
},
5757
"dependencies": {
58-
"@sourceacademy/conductor": "^0.2.3",
58+
"@sourceacademy/conductor": "^0.3.0",
5959
"@sourceacademy/wasm-util": "^1.0.4",
6060
"fast-levenshtein": "^3.0.0",
6161
"mathjs": "^14.9.1",

src/conductor/PyEvaluator.ts

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,17 @@
22
// https://github.com/source-academy/conductor
33
// Original author(s): Source Academy Team
44

5+
import { ErrorType } from "@sourceacademy/conductor/common";
56
import { BasicEvaluator, IRunnerPlugin } from "@sourceacademy/conductor/runner";
67
import { Context } from "../cse-machine/context";
8+
import {
9+
createErrorStream,
10+
createInputStream,
11+
createOutputStream,
12+
destroyStreams,
13+
displayError,
14+
} from "../cse-machine/streams";
715
import { IOptions, runInContext } from "../runner/pyRunner";
8-
import { Finished } from "../types";
916

1017
const defaultContext = new Context();
1118
const defaultOptions: IOptions = {
@@ -26,16 +33,25 @@ export default class PyEvaluator extends BasicEvaluator {
2633

2734
async evaluateChunk(chunk: string): Promise<void> {
2835
try {
29-
const result = await runInContext(
36+
this.context.streams = {
37+
initialised: true,
38+
stdout: createOutputStream(this.conductor),
39+
stderr: createErrorStream(this.conductor),
40+
stdin: createInputStream(this.conductor),
41+
};
42+
await runInContext(
3043
chunk, // Code
3144
this.context,
3245
this.options,
3346
);
34-
this.conductor.sendOutput(
35-
`${(result as Finished).representation.toString((result as Finished).value)}`,
36-
);
3747
} catch (error) {
38-
this.conductor.sendOutput(`Error: ${error instanceof Error ? error.message : error}`);
48+
if (error instanceof SyntaxError) {
49+
await displayError(this.context, error, ErrorType.EVALUATOR_SYNTAX);
50+
return;
51+
}
52+
await displayError(this.context, error, ErrorType.INTERNAL);
53+
} finally {
54+
await destroyStreams(this.context);
3955
}
4056
}
4157
}

src/cse-machine/context.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,28 @@
1+
import { ConductorError } from "@sourceacademy/conductor/common";
12
import { StmtNS } from "../ast-types";
23
import { ModuleContext, NativeStorage } from "../types";
34
import { Control } from "./control";
45
import { Environment } from "./environment";
56
import { CseError } from "./error";
67
import { Heap } from "./heap";
78
import { Stash, Value } from "./stash";
9+
import { ReadableContext, WritableContext } from "./streams";
810
import { Node } from "./types";
911

1012
export class Context {
1113
public control: Control;
1214
public stash: Stash;
13-
public output: string = "";
15+
16+
public streams:
17+
| {
18+
initialised: false;
19+
}
20+
| {
21+
initialised: true;
22+
stdout: WritableContext<string>;
23+
stderr: WritableContext<ConductorError>;
24+
stdin: ReadableContext<string>;
25+
};
1426
//public environment: Environment;
1527
public errors: CseError[] = [];
1628
public moduleContexts: { [name: string]: ModuleContext };
@@ -46,6 +58,7 @@ export class Context {
4658
this.runtime.environments.push(globalEnvironment);
4759
this.runtime.environmentTree.insert(globalEnvironment);
4860
}
61+
this.streams = this.createEmptyStreams();
4962
this.nativeStorage = {
5063
builtins: new Map<string, Value>(),
5164
previousProgramsIdentifiers: new Set<string>(),
@@ -82,6 +95,10 @@ export class Context {
8295
changepointSteps: [],
8396
});
8497

98+
createEmptyStreams = (): { initialised: false } => ({
99+
initialised: false,
100+
});
101+
85102
public reset(program?: StmtNS.Stmt): void {
86103
this.control = new Control(program);
87104
this.stash = new Stash();

src/cse-machine/interpreter.ts

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
/* tslint:disable:max-classes-per-file */
88

9+
import { ErrorType } from "@sourceacademy/conductor/common";
910
import { ExprNS, StmtNS } from "../ast-types";
1011
import * as error from "../errors/errors";
1112
import { BuiltinReassignmentError } from "../errors/errors";
@@ -31,6 +32,7 @@ import {
3132
isFalsy,
3233
} from "./operators";
3334
import { Stash, Value } from "./stash";
35+
import { displayError } from "./streams";
3436
import {
3537
AppInstr,
3638
AssmtInstr,
@@ -53,12 +55,7 @@ type CmdEvaluator = (
5355
control: Control,
5456
stash: Stash,
5557
isPrelude: boolean,
56-
) => void;
57-
58-
let cseFinalPrint = "";
59-
export function addPrint(str: string) {
60-
cseFinalPrint = cseFinalPrint + str + "\n";
61-
}
58+
) => void | Promise<void>;
6259

6360
/**
6461
* Function that returns the appropriate Promise<Result> given the output of CSE machine evaluating, depending
@@ -73,7 +70,7 @@ export function CSEResultPromise(context: Context, value: Value): Promise<Result
7370
resolve({ status: "suspended-cse-eval", context });
7471
} else if (value.type === "error") {
7572
const msg = value.message;
76-
const representation = new Representation(cseFinalPrint + msg);
73+
const representation = new Representation(msg);
7774
resolve({ status: "finished", context, value, representation });
7875
} else {
7976
const representation = new Representation(toPythonString(value));
@@ -93,26 +90,26 @@ let source = "";
9390
* @param options Evaluation options.
9491
* @returns The result of running the CSE machine.
9592
*/
96-
export function evaluate(
93+
export async function evaluate(
9794
code: string,
9895
program: StmtNS.Stmt,
9996
context: Context,
10097
options: RecursivePartial<IOptions> = {},
101-
): Value {
98+
): Promise<Value> {
10299
source = code;
103100
try {
104101
// TODO: is undefined variables check necessary for Python?
105102
// checkProgramForUndefinedVariables(program, context)
106103
} catch (error) {
107-
return { type: "error", message: error instanceof Error ? error.message : String(error) };
104+
return displayError(context, error, ErrorType.EVALUATOR_RUNTIME);
108105
}
109106

110107
try {
111108
context.runtime.isRunning = true;
112109
context.control = new Control(program);
113110
context.stash = new Stash();
114111
// Adaptation for new feature
115-
const result = runCSEMachine(
112+
const result = await runCSEMachine(
116113
code,
117114
context,
118115
context.control,
@@ -121,9 +118,9 @@ export function evaluate(
121118
options.stepLimit!,
122119
options.isPrelude,
123120
);
124-
return context.output ? { type: "string", value: context.output } : result;
121+
return result;
125122
} catch (error) {
126-
return { type: "error", message: error instanceof Error ? error.message : String(error) };
123+
return await displayError(context, error, ErrorType.EVALUATOR_RUNTIME);
127124
} finally {
128125
context.runtime.isRunning = false;
129126
}
@@ -179,15 +176,15 @@ export function evaluate(
179176
* @param isPrelude Whether the program is the prelude.
180177
* @returns The top value of the stash after execution.
181178
*/
182-
export function runCSEMachine(
179+
export async function runCSEMachine(
183180
code: string,
184181
context: Context,
185182
control: Control,
186183
stash: Stash,
187184
envSteps: number,
188185
stepLimit: number,
189186
isPrelude: boolean = false,
190-
): Value {
187+
): Promise<Value> {
191188
const eceState = generateCSEMachineStateStream(
192189
code,
193190
context,
@@ -200,7 +197,7 @@ export function runCSEMachine(
200197

201198
// Execute the generator until it completes
202199
// eslint-disable-next-line @typescript-eslint/no-unused-vars
203-
for (const _value of eceState) {
200+
for await (const _value of eceState) {
204201
}
205202

206203
// Return the value at the top of the storage as the result
@@ -219,7 +216,7 @@ export function runCSEMachine(
219216
* @param isPrelude Whether the program is the prelude.
220217
* @yields The current state of the stash, control stack, and step count.
221218
*/
222-
export function* generateCSEMachineStateStream(
219+
export async function* generateCSEMachineStateStream(
223220
code: string,
224221
context: Context,
225222
control: Control,
@@ -265,7 +262,7 @@ export function* generateCSEMachineStateStream(
265262
context.runtime.nodes.shift();
266263
context.runtime.nodes.unshift(command);
267264

268-
cmdEvaluators[nodeType](code, command, context, control, stash, isPrelude);
265+
await cmdEvaluators[nodeType](code, command, context, control, stash, isPrelude);
269266

270267
if (context.runtime.break && context.runtime.debuggerOn) {
271268
// TODO
@@ -278,7 +275,7 @@ export function* generateCSEMachineStateStream(
278275
} else {
279276
// Command is an instruction
280277
const instr = command as Instr;
281-
cmdEvaluators[instr.instrType](code, command, context, control, stash, isPrelude);
278+
await cmdEvaluators[instr.instrType](code, command, context, control, stash, isPrelude);
282279
}
283280

284281
command = control.peek();
@@ -760,7 +757,7 @@ const cmdEvaluators: { [type: string]: CmdEvaluator } = {
760757
stash.pop();
761758
},
762759

763-
[InstrType.APPLICATION]: function (
760+
[InstrType.APPLICATION]: async function (
764761
code: string,
765762
command: ControlItem,
766763
context: Context,
@@ -802,7 +799,7 @@ const cmdEvaluators: { [type: string]: CmdEvaluator } = {
802799
}
803800
} else {
804801
if (callable && callable.type === "builtin") {
805-
const result = callable.func(args, code, instr.srcNode, context);
802+
const result = await callable.func(args, code, instr.srcNode, context);
806803
stash.push(result);
807804
}
808805
}

src/cse-machine/stash.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ export interface NoneValue {
8181
export interface BuiltinValue {
8282
type: "builtin";
8383
name: string;
84-
func: (args: Value[], code: string, command: ControlItem, context: Context) => Value;
84+
func:
85+
| ((args: Value[], code: string, command: ControlItem, context: Context) => Value)
86+
| ((args: Value[], code: string, command: ControlItem, context: Context) => Promise<Value>);
8587
}
8688

8789
export class Stash extends Stack<Value> {

src/cse-machine/streams.ts

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import { ConductorError, ErrorType } from "@sourceacademy/conductor/common";
2+
import { IRunnerPlugin } from "@sourceacademy/conductor/runner";
3+
import { Context } from "./context";
4+
import { ErrorValue } from "./stash";
5+
6+
export type WritableContext<T> = {
7+
stream: WritableStream<T>;
8+
writer: WritableStreamDefaultWriter<T>;
9+
};
10+
export type ReadableContext<T> = {
11+
stream: ReadableStream<T>;
12+
reader: ReadableStreamDefaultReader<T>;
13+
};
14+
15+
export function createOutputStream(conductor: IRunnerPlugin): WritableContext<string> {
16+
const stream = new WritableStream<string>({
17+
write: chunk => {
18+
conductor.sendOutput(chunk);
19+
},
20+
});
21+
const writer = stream.getWriter();
22+
return { stream, writer };
23+
}
24+
25+
export function createErrorStream(conductor: IRunnerPlugin): WritableContext<ConductorError> {
26+
const stream = new WritableStream<ConductorError>({
27+
write: chunk => {
28+
conductor.sendError(chunk);
29+
},
30+
});
31+
32+
const writer = stream.getWriter();
33+
return { stream, writer };
34+
}
35+
36+
export const createInputStream = (conductor: IRunnerPlugin): ReadableContext<string> => {
37+
const stream = new ReadableStream<string>({
38+
async pull(controller) {
39+
const input = await conductor.requestInput();
40+
controller.enqueue(input);
41+
},
42+
});
43+
const reader = stream.getReader();
44+
return { stream, reader };
45+
};
46+
47+
export const displayError = async (
48+
context: Context,
49+
error: unknown,
50+
type: ErrorType,
51+
): Promise<ErrorValue> => {
52+
const name =
53+
typeof error === "object" && error !== null && "name" in error && typeof error.name === "string"
54+
? error.name
55+
: "Error";
56+
const message =
57+
typeof error === "object" &&
58+
error !== null &&
59+
"message" in error &&
60+
typeof error.message === "string"
61+
? error.message
62+
: String(error);
63+
if (context.streams.initialised) {
64+
await context.streams.stderr.writer.write({ name, message, errorType: type });
65+
}
66+
return { type: "error", message: message };
67+
};
68+
69+
export const displayOutput = async (context: Context, output: string) => {
70+
if (context.streams.initialised) {
71+
await context.streams.stdout.writer.write(output);
72+
}
73+
};
74+
75+
export const receiveInput = async (context: Context): Promise<string> => {
76+
if (context.streams.initialised) {
77+
const reader = context.streams.stdin.reader;
78+
const { value } = await reader.read();
79+
return value ?? "";
80+
}
81+
return "";
82+
};
83+
84+
export const destroyStreams = async (context: Context) => {
85+
if (context.streams.initialised) {
86+
context.streams.stdout.writer.releaseLock();
87+
context.streams.stderr.writer.releaseLock();
88+
context.streams.stdin.reader.releaseLock();
89+
90+
await Promise.allSettled([
91+
context.streams.stdout.stream.close(),
92+
context.streams.stderr.stream.close(),
93+
context.streams.stdin.stream.cancel(),
94+
]);
95+
}
96+
context.streams = { initialised: false };
97+
};

src/runner/pyRunner.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1+
import { StmtNS } from "../ast-types";
12
import { Context } from "../cse-machine/context";
23
import { CSEResultPromise, evaluate } from "../cse-machine/interpreter";
3-
import { RecursivePartial, Result } from "../types";
44
import { parse } from "../parser/parser-adapter";
55
import { analyze } from "../resolver/analysis";
6-
import { StmtNS } from "../ast-types";
6+
import { RecursivePartial, Result } from "../types";
77

88
type Stmt = StmtNS.Stmt;
99

@@ -33,12 +33,12 @@ export async function runInContext(
3333
return result;
3434
}
3535

36-
export function runCSEMachine(
36+
export async function runCSEMachine(
3737
code: string,
3838
program: Stmt,
3939
context: Context,
4040
options: RecursivePartial<IOptions> = {},
4141
): Promise<Result> {
42-
const result = evaluate(code, program, context, options as IOptions);
42+
const result = await evaluate(code, program, context, options as IOptions);
4343
return CSEResultPromise(context, result);
4444
}

0 commit comments

Comments
 (0)