Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cassandra/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,8 @@ def factory(cls, endpoint, timeout, host_conn = None, *args, **kwargs):
conn.close()
raise OperationTimedOut("Timed out creating connection (%s seconds)" % timeout,
timeout=timeout)
elif conn.is_closed:
raise ConnectionShutdown("Connection to %s was closed by server" % conn.endpoint)
else:
return conn

Expand Down
11 changes: 8 additions & 3 deletions cassandra/io/libevreactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def _cleanup(self):
for watcher in (conn._write_watcher, conn._read_watcher):
if watcher:
watcher.stop()
conn._socket.close()

self.notify() # wake the timer watcher

Expand Down Expand Up @@ -221,6 +222,8 @@ def _loop_will_run(self, prepare):
conn._read_watcher.stop()
# clear reference cycles from IO callback
del conn._read_watcher
conn._socket.close()
log.debug("Closed socket to %s", conn.endpoint)

changed = True

Expand All @@ -233,7 +236,7 @@ def _loop_will_run(self, prepare):

def _atexit_cleanup():
"""Cleanup function called by atexit that uses the current _global_loop value.

This wrapper ensures that cleanup receives the actual LibevLoop instance
instead of None, which was the value of _global_loop when the module was
imported.
Expand Down Expand Up @@ -308,8 +311,6 @@ def close(self):
log.debug("Closing connection (%s) to %s", id(self), self.endpoint)

_global_loop.connection_destroyed(self)
self._socket.close()
log.debug("Closed socket to %s", self.endpoint)

# don't leave in-progress operations hanging
if not self.is_defunct:
Expand All @@ -320,6 +321,8 @@ def close(self):
self.connected_event.set()

def handle_write(self, watcher, revents, errno=None):
if self.is_closed:
return
if revents & libev.EV_ERROR:
if errno:
exc = IOError(errno, os.strerror(errno))
Expand Down Expand Up @@ -361,6 +364,8 @@ def handle_write(self, watcher, revents, errno=None):
return

def handle_read(self, watcher, revents, errno=None):
if self.is_closed:
return
if revents & libev.EV_ERROR:
if errno:
exc = IOError(errno, os.strerror(errno))
Expand Down
Loading