-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.js
More file actions
152 lines (143 loc) · 6.03 KB
/
index.js
File metadata and controls
152 lines (143 loc) · 6.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/* SOURCE FILE - Copyright (c) 2018 rpc-json - Tanase Laurentiu Iulian - https://github.com/RealTimeCom/rpc-json */
const Transform = require('stream').Transform;
const cache = { // cache common values
n: Buffer.from('\r\n'), // header-body separator
z: Buffer.allocUnsafe(0) // alloc an empty buffer
};
const init = { // init object values
p: null, // data header request cache, next chunk is header
w: true, // connection is open?
c: cache.z // init empty cache buffer
};
class server extends Transform {
constructor(f) {
super();
this._ = {
f: typeof f === 'function' ? f.bind(this) : async (response, head, body) => await response(head, body), // cache server function
e: 'serverError' // custom error event name
};
Object.assign(this._, init);
}
}
server.prototype._transform = trans;
server.prototype._flush = flush;
class client extends Transform {
constructor() {
super();
this._ = {
resolve: undefined, // Promise resolve function
reject: undefined, // Promise reject function
e: 'clientError' // custom error event name
}
Object.assign(this._, init);
}
}
client.prototype._transform = trans;
client.prototype._flush = flush;
client.prototype.exec = async function(head, body) {
return await send.bind(this)(head, body);
};
function parse(t, chunk) {
return new Promise((resolve, reject) => {
t._.c = Buffer.concat([t._.c, chunk]);
if (t._.p === null) { // chunk is header
const i = t._.c.indexOf(cache.n); // search for separator
if (i !== -1) { // separator is found
const h = t._.c.slice(0, i).toString().trim();
try { // try JSON.parse()
const p = JSON.parse(h);
if (!('h' in p)) { p.h = undefined; } // head not found, set to 'undefined'
if ('l' in p && typeof p.l === 'number' && p.l >= 0) { // p.l - body length
const body = t._.c.slice(i + cache.n.length, i + cache.n.length + p.l);
if (body.length === p.l) { // body complete
t._.c = t._.c.slice(i + cache.n.length + p.l); // cache data left
if (t._.e === 'serverError') { // is server
t._.f(send.bind(t), p.h, body).then(resolve).catch(reject);
} else { // is client
if (t._.resolve) { t._.resolve({ head: p.h, body: body }); } // send().resolve
resolve(); // parse().resolve
}
} else { // need more data for body
t._.c = t._.c.slice(i + cache.n.length); // cache body part
t._.p = p; // cache header, next chunk is body
resolve();
}
} else {
t._.c = cache.z; // clear cache
t._.w = false; // prevent sending more data after push(null)
t.push(null); // close connection
if (t._.reject) { t._.reject(new Error('invalid header')); }
reject(new Error('invalid header')); // will emit() error event
}
} catch (e) { // JSON error
t._.c = cache.z; // clear cache
t._.w = false; // prevent sending more data after push(null)
t.push(null); // close connection
if (t._.reject) { t._.reject(e); }
reject(e); // will emit() error event
}
} else { // need more data for header
resolve();
}
} else { // chunk is body
const p = t._.p,
body = t._.c.slice(0, p.l);
if (body.length === p.l) { // body complete
t._.p = null; // next chunk is header
t._.c = t._.c.slice(p.l); // cache data left
if (t._.e === 'serverError') { // is server
t._.f(send.bind(t), p.h, body).then(resolve).catch(reject);
} else { // is client
if (t._.resolve) { t._.resolve({ head: p.h, body: body }); } // send().resolve
resolve(); // parse().resolve
}
} else {
resolve();
}
}
});
}
function send(head, body) {
let t = this;
return new Promise((resolve, reject) => {
if (t._.w) { // connection is open
if (body === undefined) { body = cache.z; }
if (!Buffer.isBuffer(body)) { // convert body value into Buffer
body = typeof body === 'string' ? Buffer.from(body) : Buffer.from(body + '');
}
try { // try JSON.stringify()
const h = JSON.stringify({ h: head, l: body.length }); // 'l' is header value of the body length, see 'p.l' above
if (t._.e !== 'serverError') { // is client, resolve after server response
t._.resolve = resolve;
t._.reject = reject;
}
t.push(Buffer.concat([Buffer.from(h), cache.n, body]));
if (t._.e === 'serverError') { resolve(); } // is server
} catch (e) { // JSON error
reject(e);
}
} else { // connection is close
reject(new Error('can not send data after close'));
}
});
}
async function trans(chunk, enc, cb) {
try {
await parse(this, chunk); // parse chunk
} catch (e) {
this.emit(this._.e, e);
}
cb();
}
async function flush(cb) {
this._.w = false; // connection is closed, prevent sending more data
if (this._.c.length > 0) {
try {
await parse(this, cache.z); // parse data left
} catch (e) {
this.emit(this._.e, e);
}
}
cb();
}
module.exports = { server: server, client: client };