-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathjoin.test.ts
More file actions
139 lines (114 loc) · 3.49 KB
/
join.test.ts
File metadata and controls
139 lines (114 loc) · 3.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import { should } from 'chai'; should();
import group from '../../pin/group';
import source from '../../pin/source';
import fork from '../../pin/fork';
import map from '../../pin/map';
import join, { peekJoin, Join } from '../join';
describe('Join', () => {
it('should join all values from the same forked emission.', done => {
let res = <{x: number, y: number}[]>[];
let a = source();
let f = a.to(fork());
let j = join('x', 'y');
f.to(j.in('x'));
f.to(map((y, done) => setTimeout(() => done(y * 10), 20 - y * 5))).to(j.in('y'));
j.output.subscribe(v => {
res.push(v);
if (res.length == 2) {
res[0].should.eql({x: 2, y: 20});
res[1].should.eql({x: 1, y: 10});
done();
}
});
a.send(1);
a.send(2);
});
it('should work properly with chain fork/joins.', done => {
let res = <any[]>[];
let a = source();
let f1 = a.to(fork());
let f2 = f1.to(fork());
let j1 = join('xy', 'z');
let j2 = join('x', 'y');
f2.to(j2.in('x'));
f2.to(map((y, done) => setTimeout(() => done(y * 10), 20 - y * 5))).to(j2.in('y'));
j2.output.to(j1.in('xy'));
f1.to(map((z, done) => setTimeout(() => done(z * 100), z))).to(j1.in('z'));
j1.output.subscribe(v => {
res.push(v);
if (res.length == 2) {
res[0].should.eql({xy: {x: 2, y: 20}, z: 200});
res[1].should.eql({xy: {x: 1, y: 10}, z: 100});
done();
}
});
a.send(1);
a.send(2);
});
it('should not pop the fork tags when `pop=false` is set.', done => {
let res = <any[]>[];
let a = source();
let f = a.to(fork());
let j1 = join('xy', 'z');
let j2 = peekJoin('x', 'y');
f.to(j2.in('x'));
f.to(map((y, done) => setTimeout(() => done(y * 10), 20 - y * 5))).to(j2.in('y'));
j2.output.to(j1.in('xy'));
f.to(map((z, done) => setTimeout(() => done(z * 100), z))).to(j1.in('z'));
j1.output.subscribe(v => {
res.push(v);
if (res.length == 2) {
res[0].should.eql({xy: {x: 2, y: 20}, z: 200});
res[1].should.eql({xy: {x: 1, y: 10}, z: 100});
done();
}
});
a.send(1);
a.send(2);
});
it('should work re-emit cross-fork joins based on number of forks.', () => {
let res = <any[]>[];
let a = source();
let f1 = a.to(fork());
let f2 = a.to(fork());
let j = join('x', 'y');
group(f1, f2).to(j.in('x'));
f1.to(map((x: any) => x * 2)).to(j.in('y'));
f2.to(map((x: any) => x * 3)).to(j.in('y'));
j.output.subscribe(v => res.push(v));
a.send(1);
res[0].should.eql({x: 1, y: 2});
res[1].should.eql({x: 1, y: 3});
});
it('should properly join cross-forked emissions.', () => {
let res = <any[]>[];
let a = source();
let f1 = a.to(fork());
let f2 = a.to(fork());
let m = map((x: any) => x * 10);
let j1 = join('x', 'y');
let j2 = join('z', 'w');
f1.to(j1.in('x'));
group(f1, f2).to(m).to(j1.in('y'), j2.in('z'));
f2.to(j2.in('w'));
j1.output.subscribe(v => res.push(v));
j2.output.subscribe(v => res.push(v));
a.send(1);
res[0].should.eql({x: 1, y: 10});
res[1].should.eql({z: 10, w: 1});
});
});
describe('join()', () => {
it('should return a `Join` with `pop=true`', () => {
let j = join();
j.should.be.instanceof(Join);
j.pop.should.be.true;
});
});
describe('peekJoin()', () => {
it('should return a `Join` with `pop=false`', () => {
let j = peekJoin();
j.should.be.instanceof(Join);
j.pop.should.be.false;
});
});