Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,421 changes: 377 additions & 2,044 deletions package-lock.json

Large diffs are not rendered by default.

11 changes: 5 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"homepage": "https://github.com/webgme/webgme-engine#README.md",
"dependencies": {
"@azure/msal-node": "^2.9.2",
"@socket.io/redis-adapter": "^8.3.0",
"adm-zip": "^0.5.10",
"agentkeepalive": "^4.5.0",
"archiver": "^7",
Expand All @@ -55,7 +56,7 @@
"browserify": "^17.0.0",
"buffer-equal-constant-time": "^1.0.1",
"chance": "^1.1.11",
"commander": "14.0.3",
"commander": "^14.0.3",
"compression": "^1.7.4",
"connect-multiparty": "^2.2.0",
"content-disposition": "^1",
Expand All @@ -76,12 +77,11 @@
"pug": "^3.0.0",
"q": "^1.5.1",
"raml2html": "^7.8.0",
"redis": "^3.1.2",
"redis": "^5.12.1",
"requirejs": "^2.3.6",
"requirejs-text": "^2.0.16",
"socket.io": "^2.5.0",
"socket.io-client": "^2.5.0",
"socket.io-redis": "^5.4.0",
"socket.io": "^4",
"socket.io-client": "^4",
"superagent": "^10.3.0",
"underscore": "^1.13.6",
"webgme-ot": "0.0.16",
Expand All @@ -94,7 +94,6 @@
"esprima": "^4.0.0",
"mocha": "^11",
"msgpack-js": "^0.3.0",
"nyc": "^15.1.0",
"pkginfo": "^0.4.1",
"rimraf": "^6"
}
Expand Down
158 changes: 124 additions & 34 deletions src/server/storage/datastores/redisadapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,79 @@ function RedisAdapter(mainLogger, gmeConfig) {
TAGS: ':tags'
};

function normalizeRedisOptions(options) {
var redisOptions = Object.assign({}, options || {});

// Backward compatibility with legacy host/port style.
if (redisOptions.host || redisOptions.port) {
redisOptions.socket = Object.assign({}, redisOptions.socket || {});
if (redisOptions.host) {
redisOptions.socket.host = redisOptions.host;
delete redisOptions.host;
}

if (redisOptions.port) {
redisOptions.socket.port = redisOptions.port;
delete redisOptions.port;
}
}

return redisOptions;
}

function redisCommand(command, args) {
function normalizeArg(arg) {
return typeof arg === 'number' ? String(arg) : arg;
}

function toObjectFromFlatArray(items) {
var i,
result = {};

for (i = 0; i < items.length; i += 2) {
result[items[i]] = items[i + 1];
}

return result;
}

if (!self.client) {
return Q.reject(new Error('Database is not open.'));
}

args = args || [];
command = command.toUpperCase();
return Q(self.client.sendCommand([command].concat(args.map(normalizeArg))))
.then(function (result) {
if (command === 'HGETALL') {
if (!result) {
return {};
}

// RESP2 returns flat arrays for HGETALL.
if (Array.isArray(result)) {
return toObjectFromFlatArray(result);
}
}

return result;
});
}

function hmsetFromObject(key, object) {
var args = [key];

Object.keys(object || {}).forEach(function (field) {
args.push(field, object[field]);
});

if (args.length === 1) {
return Q();
}

return redisCommand('HSET', args);
}

function openDatabase(callback) {
var client;
connectionCnt += 1;
Expand All @@ -46,26 +119,35 @@ function RedisAdapter(mainLogger, gmeConfig) {
if (self.client === null) {
logger.debug('Connecting to database...');
connectDeferred = Q.defer();
client = redis.createClient(gmeConfig.storage.database.options);
client = redis.createClient(normalizeRedisOptions(gmeConfig.storage.database.options));
client.on('error', function (err) {
self.client = null;
logger.error('Redis client: ', err);
});
client.on('ready', function () {
self.client = client;
disconnectDeferred = null;
logger.debug('Connected.');
connectDeferred.resolve();
});

Q(client.connect())
.then(function () {
self.client = client;
disconnectDeferred = null;
logger.debug('Connected.');
connectDeferred.resolve();
})
.catch(function (err) {
self.client = null;
connectDeferred.reject(err);
});
} else {
logger.debug('Count is 1 but redis is not null');
connectDeferred = Q();
}
} else {
// we are already connected
logger.debug('Reusing redis connection.');
if (!connectDeferred) {
connectDeferred = Q();
}
}

return connectDeferred.promise.nodeify(callback);
return connectDeferred.promise ? connectDeferred.promise.nodeify(callback) : connectDeferred.nodeify(callback);
}

function closeDatabase(callback) {
Expand All @@ -84,12 +166,19 @@ function RedisAdapter(mainLogger, gmeConfig) {
if (connectionCnt === 0) {
if (self.client) {
logger.debug('Closing connection to redis...');
self.client.on('end', function () {
self.client = null;
logger.debug('Closed.');
disconnectDeferred.resolve();
});
self.client.quit();
Q(self.client.quit())
.catch(function (err) {
// If client is already closed we still consider this a successful shutdown.
if (err && err.name !== 'ClientClosedError' && err.message !== 'The client is closed') {
throw err;
}
})
.then(function () {
self.client = null;
logger.debug('Closed.');
disconnectDeferred.resolve();
})
.catch(disconnectDeferred.reject);
} else {
disconnectDeferred.resolve();
}
Expand All @@ -104,10 +193,10 @@ function RedisAdapter(mainLogger, gmeConfig) {
var deferred = Q.defer();

if (self.client) {
Q.ninvoke(self.client, 'del', projectId,
redisCommand('DEL', [projectId,
projectId + self.CONSTANTS.BRANCHES,
projectId + self.CONSTANTS.TAGS,
projectId + self.CONSTANTS.COMMITS)
projectId + self.CONSTANTS.COMMITS])
.then(function (result) {
if (result > 0) {
deferred.resolve(true);
Expand All @@ -129,7 +218,7 @@ function RedisAdapter(mainLogger, gmeConfig) {
logger.debug('openProject', projectId);

if (self.client) {
Q.ninvoke(self.client, 'exists', projectId)
redisCommand('EXISTS', [projectId])
.then(function (result) {
// 1 if the key exists.
// 0 if the key does not exist.
Expand All @@ -155,7 +244,7 @@ function RedisAdapter(mainLogger, gmeConfig) {
logger.debug('createProject', projectId);

if (self.client) {
Q.ninvoke(self.client, 'hsetnx', projectId, '_id', projectId)
redisCommand('HSETNX', [projectId, '_id', projectId])
.then(function (result) {
// 1 if field is a new field in the hash and value was set.
// 0 if field already exists in the hash and the value was updated.
Expand All @@ -178,19 +267,19 @@ function RedisAdapter(mainLogger, gmeConfig) {
var deferred = Q.defer();

if (self.client) {
Q.ninvoke(self.client, 'renamenx', projectId, newProjectId)
redisCommand('RENAMENX', [projectId, newProjectId])
.then(function (result) {
// 1 if key was renamed to newkey.
// 0 if newkey already exists.
if (result === 1) {
// Force rename for branches and commits.
Q.allSettled([
Q.ninvoke(self.client, 'rename',
projectId + self.CONSTANTS.BRANCHES, newProjectId + self.CONSTANTS.BRANCHES),
Q.ninvoke(self.client, 'rename',
projectId + self.CONSTANTS.COMMITS, newProjectId + self.CONSTANTS.COMMITS),
Q.ninvoke(self.client, 'rename',
projectId + self.CONSTANTS.TAGS, newProjectId + self.CONSTANTS.TAGS)
redisCommand('RENAME',
[projectId + self.CONSTANTS.BRANCHES, newProjectId + self.CONSTANTS.BRANCHES]),
redisCommand('RENAME',
[projectId + self.CONSTANTS.COMMITS, newProjectId + self.CONSTANTS.COMMITS]),
redisCommand('RENAME',
[projectId + self.CONSTANTS.TAGS, newProjectId + self.CONSTANTS.TAGS])
])
.then(function (/*result*/) {
// Result may contain errors if no branches or commits were created,
Expand Down Expand Up @@ -228,26 +317,26 @@ function RedisAdapter(mainLogger, gmeConfig) {
newProject = newProject_;
// TODO: Is there a more efficient way of doing this?
return Q.all([
Q.ninvoke(self.client, 'hgetall', projectId),
Q.ninvoke(self.client, 'hgetall', projectId + self.CONSTANTS.BRANCHES),
Q.ninvoke(self.client, 'hgetall', projectId + self.CONSTANTS.COMMITS),
Q.ninvoke(self.client, 'hgetall', projectId + self.CONSTANTS.TAGS),
redisCommand('HGETALL', [projectId]),
redisCommand('HGETALL', [projectId + self.CONSTANTS.BRANCHES]),
redisCommand('HGETALL', [projectId + self.CONSTANTS.COMMITS]),
redisCommand('HGETALL', [projectId + self.CONSTANTS.TAGS]),
]);
})
.then(function (result) {
var promises = [Q.ninvoke(self.client, 'hmset', newProjectId, result[0])];
var promises = [hmsetFromObject(newProjectId, result[0])];

// Branches and Commits might not have been created for the source project
if (result[1]) {
promises.push(Q.ninvoke(self.client, 'hmset', newProjectId + self.CONSTANTS.BRANCHES, result[1]));
promises.push(hmsetFromObject(newProjectId + self.CONSTANTS.BRANCHES, result[1]));
}

if (result[2]) {
promises.push(Q.ninvoke(self.client, 'hmset', newProjectId + self.CONSTANTS.COMMITS, result[2]));
promises.push(hmsetFromObject(newProjectId + self.CONSTANTS.COMMITS, result[2]));
}

if (result[3]) {
promises.push(Q.ninvoke(self.client, 'hmset', newProjectId + self.CONSTANTS.TAGS, result[3]));
promises.push(hmsetFromObject(newProjectId + self.CONSTANTS.TAGS, result[3]));
}

return Q.all(promises);
Expand All @@ -266,6 +355,7 @@ function RedisAdapter(mainLogger, gmeConfig) {
this.createProject = createProject;
this.renameProject = renameProject;
this.duplicateProject = duplicateProject;
this.redisCommand = redisCommand;
}

module.exports = RedisAdapter;
Loading