@@ -37,6 +37,79 @@ function RedisAdapter(mainLogger, gmeConfig) {
3737 TAGS : ':tags'
3838 } ;
3939
40+ function normalizeRedisOptions ( options ) {
41+ var redisOptions = Object . assign ( { } , options || { } ) ;
42+
43+ // Backward compatibility with legacy host/port style.
44+ if ( redisOptions . host || redisOptions . port ) {
45+ redisOptions . socket = Object . assign ( { } , redisOptions . socket || { } ) ;
46+ if ( redisOptions . host ) {
47+ redisOptions . socket . host = redisOptions . host ;
48+ delete redisOptions . host ;
49+ }
50+
51+ if ( redisOptions . port ) {
52+ redisOptions . socket . port = redisOptions . port ;
53+ delete redisOptions . port ;
54+ }
55+ }
56+
57+ return redisOptions ;
58+ }
59+
60+ function redisCommand ( command , args ) {
61+ function normalizeArg ( arg ) {
62+ return typeof arg === 'number' ? String ( arg ) : arg ;
63+ }
64+
65+ function toObjectFromFlatArray ( items ) {
66+ var i ,
67+ result = { } ;
68+
69+ for ( i = 0 ; i < items . length ; i += 2 ) {
70+ result [ items [ i ] ] = items [ i + 1 ] ;
71+ }
72+
73+ return result ;
74+ }
75+
76+ if ( ! self . client ) {
77+ return Q . reject ( new Error ( 'Database is not open.' ) ) ;
78+ }
79+
80+ args = args || [ ] ;
81+ command = command . toUpperCase ( ) ;
82+ return Q ( self . client . sendCommand ( [ command ] . concat ( args . map ( normalizeArg ) ) ) )
83+ . then ( function ( result ) {
84+ if ( command === 'HGETALL' ) {
85+ if ( ! result ) {
86+ return { } ;
87+ }
88+
89+ // RESP2 returns flat arrays for HGETALL.
90+ if ( Array . isArray ( result ) ) {
91+ return toObjectFromFlatArray ( result ) ;
92+ }
93+ }
94+
95+ return result ;
96+ } ) ;
97+ }
98+
99+ function hmsetFromObject ( key , object ) {
100+ var args = [ key ] ;
101+
102+ Object . keys ( object || { } ) . forEach ( function ( field ) {
103+ args . push ( field , object [ field ] ) ;
104+ } ) ;
105+
106+ if ( args . length === 1 ) {
107+ return Q ( ) ;
108+ }
109+
110+ return redisCommand ( 'HSET' , args ) ;
111+ }
112+
40113 function openDatabase ( callback ) {
41114 var client ;
42115 connectionCnt += 1 ;
@@ -46,26 +119,35 @@ function RedisAdapter(mainLogger, gmeConfig) {
46119 if ( self . client === null ) {
47120 logger . debug ( 'Connecting to database...' ) ;
48121 connectDeferred = Q . defer ( ) ;
49- client = redis . createClient ( gmeConfig . storage . database . options ) ;
122+ client = redis . createClient ( normalizeRedisOptions ( gmeConfig . storage . database . options ) ) ;
50123 client . on ( 'error' , function ( err ) {
51- self . client = null ;
52124 logger . error ( 'Redis client: ' , err ) ;
53125 } ) ;
54- client . on ( 'ready' , function ( ) {
55- self . client = client ;
56- disconnectDeferred = null ;
57- logger . debug ( 'Connected.' ) ;
58- connectDeferred . resolve ( ) ;
59- } ) ;
126+
127+ Q ( client . connect ( ) )
128+ . then ( function ( ) {
129+ self . client = client ;
130+ disconnectDeferred = null ;
131+ logger . debug ( 'Connected.' ) ;
132+ connectDeferred . resolve ( ) ;
133+ } )
134+ . catch ( function ( err ) {
135+ self . client = null ;
136+ connectDeferred . reject ( err ) ;
137+ } ) ;
60138 } else {
61139 logger . debug ( 'Count is 1 but redis is not null' ) ;
140+ connectDeferred = Q ( ) ;
62141 }
63142 } else {
64143 // we are already connected
65144 logger . debug ( 'Reusing redis connection.' ) ;
145+ if ( ! connectDeferred ) {
146+ connectDeferred = Q ( ) ;
147+ }
66148 }
67149
68- return connectDeferred . promise . nodeify ( callback ) ;
150+ return connectDeferred . promise ? connectDeferred . promise . nodeify ( callback ) : connectDeferred . nodeify ( callback ) ;
69151 }
70152
71153 function closeDatabase ( callback ) {
@@ -84,12 +166,19 @@ function RedisAdapter(mainLogger, gmeConfig) {
84166 if ( connectionCnt === 0 ) {
85167 if ( self . client ) {
86168 logger . debug ( 'Closing connection to redis...' ) ;
87- self . client . on ( 'end' , function ( ) {
88- self . client = null ;
89- logger . debug ( 'Closed.' ) ;
90- disconnectDeferred . resolve ( ) ;
91- } ) ;
92- self . client . quit ( ) ;
169+ Q ( self . client . quit ( ) )
170+ . catch ( function ( err ) {
171+ // If client is already closed we still consider this a successful shutdown.
172+ if ( err && err . name !== 'ClientClosedError' && err . message !== 'The client is closed' ) {
173+ throw err ;
174+ }
175+ } )
176+ . then ( function ( ) {
177+ self . client = null ;
178+ logger . debug ( 'Closed.' ) ;
179+ disconnectDeferred . resolve ( ) ;
180+ } )
181+ . catch ( disconnectDeferred . reject ) ;
93182 } else {
94183 disconnectDeferred . resolve ( ) ;
95184 }
@@ -104,10 +193,10 @@ function RedisAdapter(mainLogger, gmeConfig) {
104193 var deferred = Q . defer ( ) ;
105194
106195 if ( self . client ) {
107- Q . ninvoke ( self . client , 'del ', projectId ,
196+ redisCommand ( 'DEL ', [ projectId ,
108197 projectId + self . CONSTANTS . BRANCHES ,
109198 projectId + self . CONSTANTS . TAGS ,
110- projectId + self . CONSTANTS . COMMITS )
199+ projectId + self . CONSTANTS . COMMITS ] )
111200 . then ( function ( result ) {
112201 if ( result > 0 ) {
113202 deferred . resolve ( true ) ;
@@ -129,7 +218,7 @@ function RedisAdapter(mainLogger, gmeConfig) {
129218 logger . debug ( 'openProject' , projectId ) ;
130219
131220 if ( self . client ) {
132- Q . ninvoke ( self . client , 'exists ', projectId )
221+ redisCommand ( 'EXISTS ', [ projectId ] )
133222 . then ( function ( result ) {
134223 // 1 if the key exists.
135224 // 0 if the key does not exist.
@@ -155,7 +244,7 @@ function RedisAdapter(mainLogger, gmeConfig) {
155244 logger . debug ( 'createProject' , projectId ) ;
156245
157246 if ( self . client ) {
158- Q . ninvoke ( self . client , 'hsetnx ', projectId , '_id' , projectId )
247+ redisCommand ( 'HSETNX ', [ projectId , '_id' , projectId ] )
159248 . then ( function ( result ) {
160249 // 1 if field is a new field in the hash and value was set.
161250 // 0 if field already exists in the hash and the value was updated.
@@ -178,19 +267,19 @@ function RedisAdapter(mainLogger, gmeConfig) {
178267 var deferred = Q . defer ( ) ;
179268
180269 if ( self . client ) {
181- Q . ninvoke ( self . client , 'renamenx ', projectId , newProjectId )
270+ redisCommand ( 'RENAMENX ', [ projectId , newProjectId ] )
182271 . then ( function ( result ) {
183272 // 1 if key was renamed to newkey.
184273 // 0 if newkey already exists.
185274 if ( result === 1 ) {
186275 // Force rename for branches and commits.
187276 Q . allSettled ( [
188- Q . ninvoke ( self . client , 'rename ',
189- projectId + self . CONSTANTS . BRANCHES , newProjectId + self . CONSTANTS . BRANCHES ) ,
190- Q . ninvoke ( self . client , 'rename ',
191- projectId + self . CONSTANTS . COMMITS , newProjectId + self . CONSTANTS . COMMITS ) ,
192- Q . ninvoke ( self . client , 'rename ',
193- projectId + self . CONSTANTS . TAGS , newProjectId + self . CONSTANTS . TAGS )
277+ redisCommand ( 'RENAME ',
278+ [ projectId + self . CONSTANTS . BRANCHES , newProjectId + self . CONSTANTS . BRANCHES ] ) ,
279+ redisCommand ( 'RENAME ',
280+ [ projectId + self . CONSTANTS . COMMITS , newProjectId + self . CONSTANTS . COMMITS ] ) ,
281+ redisCommand ( 'RENAME ',
282+ [ projectId + self . CONSTANTS . TAGS , newProjectId + self . CONSTANTS . TAGS ] )
194283 ] )
195284 . then ( function ( /*result*/ ) {
196285 // Result may contain errors if no branches or commits were created,
@@ -228,26 +317,26 @@ function RedisAdapter(mainLogger, gmeConfig) {
228317 newProject = newProject_ ;
229318 // TODO: Is there a more efficient way of doing this?
230319 return Q . all ( [
231- Q . ninvoke ( self . client , 'hgetall ', projectId ) ,
232- Q . ninvoke ( self . client , 'hgetall ', projectId + self . CONSTANTS . BRANCHES ) ,
233- Q . ninvoke ( self . client , 'hgetall ', projectId + self . CONSTANTS . COMMITS ) ,
234- Q . ninvoke ( self . client , 'hgetall ', projectId + self . CONSTANTS . TAGS ) ,
320+ redisCommand ( 'HGETALL ', [ projectId ] ) ,
321+ redisCommand ( 'HGETALL ', [ projectId + self . CONSTANTS . BRANCHES ] ) ,
322+ redisCommand ( 'HGETALL ', [ projectId + self . CONSTANTS . COMMITS ] ) ,
323+ redisCommand ( 'HGETALL ', [ projectId + self . CONSTANTS . TAGS ] ) ,
235324 ] ) ;
236325 } )
237326 . then ( function ( result ) {
238- var promises = [ Q . ninvoke ( self . client , 'hmset' , newProjectId , result [ 0 ] ) ] ;
327+ var promises = [ hmsetFromObject ( newProjectId , result [ 0 ] ) ] ;
239328
240329 // Branches and Commits might not have been created for the source project
241330 if ( result [ 1 ] ) {
242- promises . push ( Q . ninvoke ( self . client , 'hmset' , newProjectId + self . CONSTANTS . BRANCHES , result [ 1 ] ) ) ;
331+ promises . push ( hmsetFromObject ( newProjectId + self . CONSTANTS . BRANCHES , result [ 1 ] ) ) ;
243332 }
244333
245334 if ( result [ 2 ] ) {
246- promises . push ( Q . ninvoke ( self . client , 'hmset' , newProjectId + self . CONSTANTS . COMMITS , result [ 2 ] ) ) ;
335+ promises . push ( hmsetFromObject ( newProjectId + self . CONSTANTS . COMMITS , result [ 2 ] ) ) ;
247336 }
248337
249338 if ( result [ 3 ] ) {
250- promises . push ( Q . ninvoke ( self . client , 'hmset' , newProjectId + self . CONSTANTS . TAGS , result [ 3 ] ) ) ;
339+ promises . push ( hmsetFromObject ( newProjectId + self . CONSTANTS . TAGS , result [ 3 ] ) ) ;
251340 }
252341
253342 return Q . all ( promises ) ;
@@ -266,6 +355,7 @@ function RedisAdapter(mainLogger, gmeConfig) {
266355 this . createProject = createProject ;
267356 this . renameProject = renameProject ;
268357 this . duplicateProject = duplicateProject ;
358+ this . redisCommand = redisCommand ;
269359}
270360
271361module . exports = RedisAdapter ;
0 commit comments