-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpubnub-stream.js
More file actions
144 lines (121 loc) · 5.13 KB
/
pubnub-stream.js
File metadata and controls
144 lines (121 loc) · 5.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
const PubNub = (setup) => {
for (let key of Object.keys(setup)) PubNub[key] = setup[key];
};
(async ()=>{
'use strict';
const defaultSubkey = 'demo';
const defaultPubkey = 'demo';
const defaultChannel = 'pubnub';
const defaultOrigin = 'ps.pndsn.com';
const defaultUUID = `uuid-${+new Date()}`;
const subscribe = PubNub.subscribe = (setup={}) => {
let subkey = setup.subkey || PubNub.subscribeKey || defaultSubkey;
let channel = setup.channel || PubNub.channel || defaultChannel;
let origin = setup.origin || PubNub.origin || defaultOrigin;
let messages = setup.messages || PubNub.messages || (a => a);
let filter = setup.filter || PubNub.filter || '';
let authkey = setup.authkey || PubNub.authKey || '';
let timetoken = setup.timetoken || '0';
let filterExp = `${filter?'&filter-expr=':''}${encodeURIComponent(filter)}`;
let params = `auth=${authkey}${filterExp}`;
let decoder = new TextDecoder();
let boundry = /[\n]/g;
let resolver = null;
let promissory = () => new Promise(resolve => resolver = (data) => resolve(data) );
let receiver = promissory();
let reader = null;
let response = null;
let buffer = '';
let subscribed = true;
let controller = new AbortController();
let signal = controller.signal;
// Prepare Channel List
if (!PubNub.channels) PubNub.channels = [];
if (PubNub.channels.indexOf(channel) == -1) {
PubNub.channels.push(channel);
PubNub.channels.sort();
// Reset stream for changing subscriptions
if (PubNub.subscription) {
PubNub.subscription.unsubscribe();
PubNub.subscription = null;
}
}
else {
// Already Subscribed to this channel
return PubNub.subscription;
}
// Start Stream
startStream();
async function startStream() {
let channels = PubNub.channels.join(',');
let uri = `https://${origin}/stream/${subkey}/${channels}/0/${timetoken}`;
buffer = '';
try { response = await fetch(`${uri}?${params}`, {signal}) }
catch(e) { return continueStream(1000) }
try { reader = response.body.getReader() }
catch(e) { return continueStream(1000) }
try { readStream() }
catch(e) { return continueStream(1000) }
}
function continueStream(delay) {
if (!subscribed) return;
setTimeout( () => startStream(), delay || 1 );
}
async function readStream() {
let chunk = await reader.read().catch(error => {
continueStream();
});
if (!chunk) return;
buffer += decoder.decode(chunk.value || new Uint8Array);
let parts = buffer.split(boundry);
parts.forEach( (message, num) => {
if (!message) return;
try {
let jsonmsg = JSON.parse(message);
if (jsonmsg[1]) setup.timetoken = timetoken = jsonmsg[1];
// Send message to receivers/callbacks
jsonmsg[0].forEach(m => {
messages(m);
resolver(m);
receiver = promissory();
});
// Free successfully consumed message
parts[num] = '';
buffer = parts.filter(p => p).join('\n');
}
catch(error) {
// This is an unfinished chunk
// And JSON is unfinished in buffer.
// Need to wait for next chunck to construct full JSON.
}
});
if (!chunk.done) readStream();
else continueStream();
}
// Subscription Structure
async function* subscription() {
while (subscribed) yield await receiver;
}
subscription.messages = receiver => messages = setup.messages = receiver;
subscription.unsubscribe = () => {
subscribed = false;
controller.abort();
};
return (PubNub.subscription = subscription);
};
const publish = PubNub.publish = async (setup={}) => {
let pubkey = setup.pubkey || PubNub.publishKey || defaultPubkey;
let subkey = setup.subkey || PubNub.subscribeKey || defaultSubkey;
let channel = setup.channel || PubNub.channel || defaultChannel;
let uuid = setup.uuid || PubNub.uuid || defaultUUID;
let origin = setup.origin || PubNub.origin || defaultOrigin;
let authkey = setup.authkey || PubNub.authKey || '';
let message = setup.message || 'missing-message';
let metadata = setup.metadata || PubNub.metadata || {};
let uri = `https://${origin}/publish/${pubkey}/${subkey}/0/${channel}/0`;
let params = `auth=${authkey}&meta=${encodeURIComponent(JSON.stringify(metadata))}`;
let payload = { method: 'POST', body: JSON.stringify(message) };
try { return await fetch(`${uri}?${params}`, payload) }
catch(e) { return false }
};
})();