Skip to content

Commit e0e2870

Browse files
author
Tony Crisci
committed
move connection creation out of index
1 parent f5eb70f commit e0e2870

2 files changed

Lines changed: 132 additions & 130 deletions

File tree

index.js

Lines changed: 1 addition & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -1,138 +1,9 @@
1-
const EventEmitter = require('events').EventEmitter;
2-
const net = require('net');
3-
41
const constants = require('./lib/constants');
5-
const message = require('./lib/message');
6-
const clientHandshake = require('./lib/handshake');
72
const MessageBus = require('./lib/bus');
8-
const {getDbusAddressFromFs} = require('./lib/address-x11');
93
const errors = require('./lib/errors');
104
const variant = require('./lib/service/variant');
115
const iface = require('./lib/service/interface');
12-
13-
function createStream(opts) {
14-
let { busAddress } = opts;
15-
16-
// XXX according to the dbus spec, we should start a new server if the bus
17-
// address cannot be found.
18-
if (!busAddress) {
19-
busAddress = process.env.DBUS_SESSION_BUS_ADDRESS;
20-
}
21-
if (!busAddress) {
22-
busAddress = getDbusAddressFromFs();
23-
}
24-
25-
let addresses = busAddress.split(';');
26-
for (let i = 0; i < addresses.length; ++i) {
27-
let address = addresses[i];
28-
let familyParams = address.split(':');
29-
let family = familyParams[0];
30-
let params = {};
31-
familyParams[1].split(',').map(function(p) {
32-
let keyVal = p.split('=');
33-
params[keyVal[0]] = keyVal[1];
34-
});
35-
36-
try {
37-
switch (family.toLowerCase()) {
38-
case 'tcp':
39-
throw new Error('tcp dbus connections are not supported');
40-
case 'unix':
41-
if (params.socket) {
42-
return net.createConnection(params.socket);
43-
}
44-
if (params.abstract) {
45-
let abs = require('abstract-socket');
46-
return abs.connect('\u0000' + params.abstract);
47-
}
48-
if (params.path) {
49-
return net.createConnection(params.path);
50-
}
51-
throw new Error(
52-
"not enough parameters for 'unix' connection - you need to specify 'socket' or 'abstract' or 'path' parameter"
53-
);
54-
case 'unixexec':
55-
let eventStream = require('event-stream');
56-
let spawn = require('child_process').spawn;
57-
let args = [];
58-
for (let n = 1; params['arg' + n]; n++) args.push(params['arg' + n]);
59-
let child = spawn(params.path, args);
60-
61-
return eventStream.duplex(child.stdin, child.stdout);
62-
default:
63-
throw new Error('unknown address type:' + family);
64-
}
65-
} catch (e) {
66-
if (i < addresses.length - 1) {
67-
console.warn(e.message);
68-
continue;
69-
} else {
70-
throw e;
71-
}
72-
}
73-
}
74-
}
75-
76-
function createConnection(opts) {
77-
let self = new EventEmitter();
78-
opts = opts || {};
79-
let stream = (self.stream = createStream(opts));
80-
stream.setNoDelay();
81-
82-
stream.on('error', function(err) {
83-
// forward network and stream errors
84-
self.emit('error', err);
85-
});
86-
87-
stream.on('end', function() {
88-
self.emit('end');
89-
self.message = function() {
90-
self.emit('error', new Error('Tried to write a message to a closed stream'));
91-
};
92-
});
93-
94-
self.end = function() {
95-
stream.end();
96-
return self;
97-
};
98-
99-
clientHandshake(stream, opts, function(error, guid) {
100-
if (error) {
101-
return self.emit('error', error);
102-
}
103-
self.guid = guid;
104-
self.emit('connect');
105-
message.unmarshalMessages(
106-
stream,
107-
function(message) {
108-
self.emit('message', message);
109-
},
110-
opts
111-
);
112-
});
113-
114-
self._messages = [];
115-
116-
// pre-connect version, buffers all messages. replaced after connect
117-
self.message = function(msg) {
118-
self._messages.push(msg);
119-
};
120-
121-
self.once('connect', function() {
122-
self.state = 'connected';
123-
for (let i = 0; i < self._messages.length; ++i) {
124-
stream.write(message.marshall(self._messages[i]));
125-
}
126-
self._messages.length = 0;
127-
128-
// no need to buffer once connected
129-
self.message = function(msg) {
130-
stream.write(message.marshall(msg));
131-
};
132-
});
133-
134-
return self;
135-
}
6+
const createConnection = require('./lib/connection.js');
1367

1378
let createClient = function(params) {
1389
let connection = createConnection(params || {});

lib/connection.js

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
const EventEmitter = require('events').EventEmitter;
2+
const net = require('net');
3+
const message = require('./message');
4+
const clientHandshake = require('./handshake');
5+
const {getDbusAddressFromFs} = require('./address-x11');
6+
7+
function createStream(opts) {
8+
let { busAddress } = opts;
9+
10+
// XXX according to the dbus spec, we should start a new server if the bus
11+
// address cannot be found.
12+
if (!busAddress) {
13+
busAddress = process.env.DBUS_SESSION_BUS_ADDRESS;
14+
}
15+
if (!busAddress) {
16+
busAddress = getDbusAddressFromFs();
17+
}
18+
19+
let addresses = busAddress.split(';');
20+
for (let i = 0; i < addresses.length; ++i) {
21+
let address = addresses[i];
22+
let familyParams = address.split(':');
23+
let family = familyParams[0];
24+
let params = {};
25+
familyParams[1].split(',').map(function(p) {
26+
let keyVal = p.split('=');
27+
params[keyVal[0]] = keyVal[1];
28+
});
29+
30+
try {
31+
switch (family.toLowerCase()) {
32+
case 'tcp':
33+
throw new Error('tcp dbus connections are not supported');
34+
case 'unix':
35+
if (params.socket) {
36+
return net.createConnection(params.socket);
37+
}
38+
if (params.abstract) {
39+
let abs = require('abstract-socket');
40+
return abs.connect('\u0000' + params.abstract);
41+
}
42+
if (params.path) {
43+
return net.createConnection(params.path);
44+
}
45+
throw new Error(
46+
"not enough parameters for 'unix' connection - you need to specify 'socket' or 'abstract' or 'path' parameter"
47+
);
48+
case 'unixexec':
49+
let eventStream = require('event-stream');
50+
let spawn = require('child_process').spawn;
51+
let args = [];
52+
for (let n = 1; params['arg' + n]; n++) args.push(params['arg' + n]);
53+
let child = spawn(params.path, args);
54+
55+
return eventStream.duplex(child.stdin, child.stdout);
56+
default:
57+
throw new Error('unknown address type:' + family);
58+
}
59+
} catch (e) {
60+
if (i < addresses.length - 1) {
61+
console.warn(e.message);
62+
continue;
63+
} else {
64+
throw e;
65+
}
66+
}
67+
}
68+
}
69+
70+
function createConnection(opts) {
71+
let self = new EventEmitter();
72+
opts = opts || {};
73+
let stream = (self.stream = createStream(opts));
74+
stream.setNoDelay();
75+
76+
stream.on('error', function(err) {
77+
// forward network and stream errors
78+
self.emit('error', err);
79+
});
80+
81+
stream.on('end', function() {
82+
self.emit('end');
83+
self.message = function() {
84+
self.emit('error', new Error('Tried to write a message to a closed stream'));
85+
};
86+
});
87+
88+
self.end = function() {
89+
stream.end();
90+
return self;
91+
};
92+
93+
clientHandshake(stream, opts, function(error, guid) {
94+
if (error) {
95+
return self.emit('error', error);
96+
}
97+
self.guid = guid;
98+
self.emit('connect');
99+
message.unmarshalMessages(
100+
stream,
101+
function(message) {
102+
self.emit('message', message);
103+
},
104+
opts
105+
);
106+
});
107+
108+
self._messages = [];
109+
110+
// pre-connect version, buffers all messages. replaced after connect
111+
self.message = function(msg) {
112+
self._messages.push(msg);
113+
};
114+
115+
self.once('connect', function() {
116+
self.state = 'connected';
117+
for (let i = 0; i < self._messages.length; ++i) {
118+
stream.write(message.marshall(self._messages[i]));
119+
}
120+
self._messages.length = 0;
121+
122+
// no need to buffer once connected
123+
self.message = function(msg) {
124+
stream.write(message.marshall(msg));
125+
};
126+
});
127+
128+
return self;
129+
}
130+
131+
module.exports = createConnection;

0 commit comments

Comments
 (0)