Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Latest commit

 

History

History
239 lines (157 loc) · 6.75 KB

File metadata and controls

239 lines (157 loc) · 6.75 KB

Node.js Integration

The Reactive Extensions for JavaScript provides integration points to the core Node.js libraries.

RxNode Methods

Event Handlers

Stream Handlers

RxNode Methods

RxNode.toEventEmitter(observable, eventName)

#

Converts the given observable sequence to an event emitter with the given event name. The errors are handled on the 'error' event and completion on the 'end' event.

Arguments

  1. observable (Observable): The observable sequence to convert to an EventEmitter.
  2. eventName (String): The event name to subscribe.

Returns

(EventEmitter): An EventEmitter which emits the given eventName for each onNext call in addition to 'error' and 'end' events.

Example

var Rx = require('rx');
var RxNode = require('rx-node');

var source = Rx.Observable.return(42);

var emitter = RxNode.toEventEmitter(source, 'data');

emitter.on('data', function (data) {
    console.log('Data: ' + data);
});

emitter.on('end', function () {
    console.log('End');
});

// Ensure to call publish to fire events from the observable
emitter.publish();

// => Data: 42
// => End

Location

  • index.js

Stream Handlers

RxNode.fromStream(stream, finishEventName, dataEventName)

#

Converts a flowing stream to an Observable sequence.

Arguments

  1. stream (Stream): A stream to convert to a observable sequence.
  2. [finishEventName] (String): Event that notifies about closed stream. ("end" by default)
  3. [dataEventName] (String): Event that notifies about incoming data. ("data" by default)

Returns

(Observable): An observable sequence which fires on each 'data' event as well as handling 'error' and finish events like end or finish.

Example

var RxNode = require('rx-node');

var subscription = RxNode.fromStream(process.stdin, 'end')
    .subscribe(function (x) { console.log(x); });

// => r<Buffer 72>
// => x<Buffer 78>

Location

  • index.js

RxNode.fromReadableStream(stream, dataEventName)

#

Converts a flowing readable stream to an Observable sequence.

Arguments

  1. stream (Stream): A stream to convert to a observable sequence.
  2. [dataEventName] (String): Event that notifies about incoming data. ("data" by default)

Returns

(Observable): An observable sequence which fires on each 'data' event as well as handling 'error' and 'end' events.

Example

var RxNode = require('rx-node');

var subscription = RxNode.fromReadableStream(process.stdin)
    .subscribe(function (x) { console.log(x); });

// => r<Buffer 72>
// => x<Buffer 78>

RxNode.fromReadLineStream(stream)

#

Converts a flowing readable stream to an Observable sequence.

Arguments

  1. stream (Stream): A stream to convert to a observable sequence.

Returns

(Observable): An observable sequence which fires on each 'line' event as well as handling 'error' and 'close' events.

var readline = require('readline');
var fs = require('fs');
var RxNode = require('rx-node');

var rl = readline.createInterface({
  input: fs.createReadStream('sample.txt')
});

var subscription = RxNode.fromReadLineStream(rl)
    .subscribe(function (x) { console.log(x); });

// Prints contents of 'sample.txt' line by line:
// => rx
// => supports 'readline'

Location

  • index.js

RxNode.fromWritableStream(stream)

#

Converts a flowing writeable stream to an Observable sequence.

Arguments

  1. stream (Stream): A stream to convert to a observable sequence.

Returns

(Observable): An observable sequence which fires on each 'data' event as well as handling 'error' and 'finish' events.

Example

var RxNode = require('rx-node');

var subscription = RxNode.fromWritableStream(process.stdout)
    .subscribe(function (x) { console.log(x); });

// => r<Buffer 72>
// => x<Buffer 78>

Location

  • index.js

RxNode.fromTransformStream(stream)

#

Converts a flowing transform stream to an Observable sequence.

Arguments

  1. stream (Stream): A stream to convert to a observable sequence.

Returns

(Observable): An observable sequence which fires on each 'data' event as well as handling 'error' and 'finish' events.

Example

var RxNode = require('rx-node');

var subscription = RxNode.fromTransformStream(getTransformStreamSomehow());

Location

  • index.js

RxNode.writeToStream(observable, stream, [encoding])

#

Writes an observable sequence to a stream.

Arguments

  1. observable (Observable): Observable sequence to write to a stream.
  2. stream (Stream): The stream to write to.
  3. [encoding] (String): The encoding of the item to write.

Returns

(Disposable): The subscription handle.

Example

var Rx = require('rx');
var RxNode = require('rx-node');

var source = Rx.Observable.range(0, 5);

var subscription = RxNode.writeToStream(source, process.stdout, 'utf8');

// => 01234

Location

  • index.js