diff --git a/cassandra/connection.py b/cassandra/connection.py index f07160e385..eae018649b 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -986,6 +986,8 @@ def factory(cls, endpoint, timeout, host_conn = None, *args, **kwargs): conn.close() raise OperationTimedOut("Timed out creating connection (%s seconds)" % timeout, timeout=timeout) + elif conn.is_closed: + raise ConnectionShutdown("Connection to %s was closed by server" % conn.endpoint) else: return conn diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index 3da809931f..6cceb6c6bc 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -124,6 +124,7 @@ def _cleanup(self): for watcher in (conn._write_watcher, conn._read_watcher): if watcher: watcher.stop() + conn._socket.close() self.notify() # wake the timer watcher @@ -221,6 +222,8 @@ def _loop_will_run(self, prepare): conn._read_watcher.stop() # clear reference cycles from IO callback del conn._read_watcher + conn._socket.close() + log.debug("Closed socket to %s", conn.endpoint) changed = True @@ -233,7 +236,7 @@ def _loop_will_run(self, prepare): def _atexit_cleanup(): """Cleanup function called by atexit that uses the current _global_loop value. - + This wrapper ensures that cleanup receives the actual LibevLoop instance instead of None, which was the value of _global_loop when the module was imported. @@ -308,8 +311,6 @@ def close(self): log.debug("Closing connection (%s) to %s", id(self), self.endpoint) _global_loop.connection_destroyed(self) - self._socket.close() - log.debug("Closed socket to %s", self.endpoint) # don't leave in-progress operations hanging if not self.is_defunct: @@ -320,6 +321,8 @@ def close(self): self.connected_event.set() def handle_write(self, watcher, revents, errno=None): + if self.is_closed: + return if revents & libev.EV_ERROR: if errno: exc = IOError(errno, os.strerror(errno)) @@ -361,6 +364,8 @@ def handle_write(self, watcher, revents, errno=None): return def handle_read(self, watcher, revents, errno=None): + if self.is_closed: + return if revents & libev.EV_ERROR: if errno: exc = IOError(errno, os.strerror(errno))