The Reactive Extensions for JavaScript provides integration points to the core Node.js libraries.
fromStreamfromReadableStreamfromReadLineStreamfromWritableStreamfromTransformStreamwriteToStream
Converts the given observable sequence to an event emitter with the given event name. The errors are handled on the 'error' event and completion on the 'end' event.
observable(Observable): The observable sequence to convert to an EventEmitter.eventName(String): The event name to subscribe.
(EventEmitter): An EventEmitter which emits the given eventName for each onNext call in addition to 'error' and 'end' events.
var Rx = require('rx');
var RxNode = require('rx-node');
var source = Rx.Observable.return(42);
var emitter = RxNode.toEventEmitter(source, 'data');
emitter.on('data', function (data) {
console.log('Data: ' + data);
});
emitter.on('end', function () {
console.log('End');
});
// Ensure to call publish to fire events from the observable
emitter.publish();
// => Data: 42
// => End- index.js
Converts a flowing stream to an Observable sequence.
stream(Stream): A stream to convert to a observable sequence.[finishEventName](String): Event that notifies about closed stream. ("end" by default)[dataEventName](String): Event that notifies about incoming data. ("data" by default)
(Observable): An observable sequence which fires on each 'data' event as well as handling 'error' and finish events like end or finish.
var RxNode = require('rx-node');
var subscription = RxNode.fromStream(process.stdin, 'end')
.subscribe(function (x) { console.log(x); });
// => r<Buffer 72>
// => x<Buffer 78>- index.js
Converts a flowing readable stream to an Observable sequence.
stream(Stream): A stream to convert to a observable sequence.[dataEventName](String): Event that notifies about incoming data. ("data" by default)
(Observable): An observable sequence which fires on each 'data' event as well as handling 'error' and 'end' events.
var RxNode = require('rx-node');
var subscription = RxNode.fromReadableStream(process.stdin)
.subscribe(function (x) { console.log(x); });
// => r<Buffer 72>
// => x<Buffer 78>Converts a flowing readable stream to an Observable sequence.
stream(Stream): A stream to convert to a observable sequence.
(Observable): An observable sequence which fires on each 'line' event as well as handling 'error' and 'close' events.
var readline = require('readline');
var fs = require('fs');
var RxNode = require('rx-node');
var rl = readline.createInterface({
input: fs.createReadStream('sample.txt')
});
var subscription = RxNode.fromReadLineStream(rl)
.subscribe(function (x) { console.log(x); });
// Prints contents of 'sample.txt' line by line:
// => rx
// => supports 'readline'- index.js
Converts a flowing writeable stream to an Observable sequence.
stream(Stream): A stream to convert to a observable sequence.
(Observable): An observable sequence which fires on each 'data' event as well as handling 'error' and 'finish' events.
var RxNode = require('rx-node');
var subscription = RxNode.fromWritableStream(process.stdout)
.subscribe(function (x) { console.log(x); });
// => r<Buffer 72>
// => x<Buffer 78>- index.js
Converts a flowing transform stream to an Observable sequence.
stream(Stream): A stream to convert to a observable sequence.
(Observable): An observable sequence which fires on each 'data' event as well as handling 'error' and 'finish' events.
var RxNode = require('rx-node');
var subscription = RxNode.fromTransformStream(getTransformStreamSomehow());- index.js
Writes an observable sequence to a stream.
observable(Observable): Observable sequence to write to a stream.stream(Stream): The stream to write to.[encoding](String): The encoding of the item to write.
(Disposable): The subscription handle.
var Rx = require('rx');
var RxNode = require('rx-node');
var source = Rx.Observable.range(0, 5);
var subscription = RxNode.writeToStream(source, process.stdout, 'utf8');
// => 01234- index.js