diff --git a/README.md b/README.md index 34c0332..66fc9f9 100644 --- a/README.md +++ b/README.md @@ -120,7 +120,27 @@ Sessionstore is a node.js module for multiple databases. It can be very useful i }) })); -## Connecting to elasticsearch +## Connecting to elasticsearch (https://github.com/elastic/elasticsearch) + + var sessionstore = require('sessionstore'); + + var express = require('express'); + var app = express(); + + app.use(express.session({ + store: sessionstore.createSessionStore({ + type: '@elasticsearch', + node: 'localhost:9200', // optional + prefix: '', // optional + index: 'express', // optional + typeName: 'session', // optional + pingInterval: 1000, // optional + timeout: 10000 // optional + }) + })); + + +## Connecting to elasticsearch legacy (https://github.com/elastic/elasticsearch-js-legacy) var sessionstore = require('sessionstore'); diff --git a/lib/databases/@elasticsearch.js b/lib/databases/@elasticsearch.js new file mode 100644 index 0000000..632bde2 --- /dev/null +++ b/lib/databases/@elasticsearch.js @@ -0,0 +1,222 @@ +var util = require('util'), + Session = require('../sessionInterface'), + use = require('../use'), + _ = require('lodash'), + // async = require('async'), + jsondate = require('jsondate'), + elasticsearch = use('@elastic/elasticsearch'); + + +var ElasticSearchSessionStore = function (options) { + options = options || {}; + Session.Store.call(this, options); + + var defaults = { + index: 'express', + typeName: 'session', + prefix: '', + ttl: 60 * 60 * 24 * 14, // 14 days + pingInterval: 1000 + }; + + _.defaults(options, defaults); + + if (!options.nodes && !options.node) { + options.node = 'http://localhost:9200'; + } + + this.options = options; + + this.index = this.options.index; + this.typeName = this.options.typeName; +}; + +util.inherits(ElasticSearchSessionStore, Session.Store); + +_.extend(ElasticSearchSessionStore.prototype, { + + connect: function (callback) { + var self = this; + + this.isConnected = false; + + this.client = new elasticsearch.Client(this.options); + + var callbacked = false; + this.closeCalled = false; + + var interval = setInterval(function () { + if (self.closeCalled) { + clearInterval(interval); + } + + self.client.ping(function (err) { + if (err) { + if (self.isConnected) { + self.isConnected = false; + self.emit('disconnect'); + } + if (callback && !callbacked) { + callbacked = true; + callback(err, self); + } + return; + } + + if (!self.isConnected) { + // Github issue #39 - recover after temp ping error. + if (callbacked) { + // Already callbacked, so only restore isConnected state. + self.isConnected = true; + self.emit('connect'); + } else { + // Not callbacked yet, so perform init logic and handle isConnected state. + self.client.indices.create({ + index: self.index + }, function(err) { + if (err && err.message.toLowerCase().indexOf('already') >= 0) { + err = null; + } + if (err) { + if (callback && !callbacked) { + callbacked = true; + callback(err, self); + } + return; + } + + self.client.indices.putMapping({ + index: self.index, + type: self.typeName, + body: { + session: { + _ttl: { enabled: true, default: '14d' } + } + } + }, function(err) { + if (err) { + if (callback && !callbacked) { + callbacked = true; + callback(err, self); + } + return; + } + + self.isConnected = true; + self.emit('connect'); + if (callback && !callbacked) { + callbacked = true; + callback(err, self); + } + }); + }); + } + } + }); + }, this.options.pingInterval); + }, + + disconnect: function (callback) { + this.closeCalled = true; + if (this.client) this.client.close(); + if (callback) callback(null); + }, + + set: function (sid, sess, callback) { + var ttl = this.options.ttl * 1000; + if (sess && sess.cookie && sess.cookie.expires) { + ttl = (new Date(sess.cookie.expires)).getTime() - Date.now(); + sess.expiresAt = new Date(sess.cookie.expires); + } else { + sess.expiresAt = new Date(Date.now() + this.options.ttl * 1000); + } + + // sess._version = sess._version || 1; + // sess._version++; + + var self = this; + + this.client.exists({ + index: this.index, + type: this.typeName, + id: this.options.prefix + sid + }, function (err, exists) { + var methodName = undefined; + if (exists.statusCode === 404) { + methodName = 'create'; + } + self.client.index({ + index: self.index, + type: self.typeName, + id: self.options.prefix + sid, + opType: methodName, + // version: sess._version > 2 ? sess._version - 1 : undefined, + //ttl: ttl + 'ms', + body: sess, + refresh: true + }, function (err, res) { + if (err && (err.message.toLowerCase().indexOf('version') >= 0)) { + return callback(new Error('ConcurrencyError: Session was updated by someone else!')); + } + callback(err, res); + }); + }); + }, + + //touch: function (sid, sess, callback) { + // this.set(sid, sess, callback); + //}, + + get: function (sid, callback) { + this.client.get({ + index: this.index, + type: this.typeName, + id: this.options.prefix + sid + }, function (err, res) { + if (err && err.body && err.body.found === false) { + err = null; + } + if (err) return callback(err); + if (typeof res == 'undefined' || typeof res.body == 'undefined'){ + return callback(null, null); + } + if (res.body._source) { + var sess = jsondate.parse(JSON.stringify(res.body._source)); + if (sess.expiresAt && sess.expiresAt.getTime() > Date.now()) { + delete sess.expiresAt; + return callback(null, sess); + } + } + callback(null, null); + }); + }, + + destroy: function (sid, callback) { + this.client.delete({ + index: this.index, + type: this.typeName, + id: this.options.prefix + sid + }, function (err, res) { + if (err && err.message.toLowerCase().indexOf('not found') >= 0) { + err = null; + } + if (callback) callback(err); + }); + }, + + clear: function (callback) { + var self = this; + this.client.indices.exists({index: this.index}, function (err, result) { + if (result){ + self.client.indices.delete({index: self.index}, function (err) { + if (callback) callback(err); + }); + } else { + if (callback) callback(err); + } + }); + } + +}); + +module.exports = ElasticSearchSessionStore; diff --git a/package.json b/package.json index ac9f094..187068f 100644 --- a/package.json +++ b/package.json @@ -18,6 +18,7 @@ "tolerance": "1.0.0" }, "devDependencies": { + "@elastic/elasticsearch": "^7.5.1", "cradle": ">=0.2.7", "elasticsearch": ">= 10.0.0", "eslint": ">=1.0.0", diff --git a/test/sessionStoreTest.js b/test/sessionStoreTest.js index 594fcaa..0f59901 100644 --- a/test/sessionStoreTest.js +++ b/test/sessionStoreTest.js @@ -62,7 +62,7 @@ describe('SessionStore', function() { describe('with options containing a type property with the value of', function() { - var types = ['inmemory', 'mongodb', 'tingodb', 'redis', 'memcached', 'couchdb']; + var types = ['inmemory', 'mongodb', 'tingodb', 'redis', 'memcached', 'couchdb', '@elasticsearch']; types.forEach(function(type) {