Skip to content
Snippets Groups Projects
Commit a5393c9d authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

close behaviour cleaned up

parent 1ab12c58
No related branches found
No related tags found
No related merge requests found
...@@ -21,6 +21,14 @@ class SelectLoop(BaseThread): ...@@ -21,6 +21,14 @@ class SelectLoop(BaseThread):
if those calls throw ConnectionFailedException, they will be closed if those calls throw ConnectionFailedException, they will be closed
- when terminated, invokes on_cleanup() - when terminated, invokes on_cleanup()
- remaining client sockets are closed. Server socket is NOT CLOSED. - remaining client sockets are closed. Server socket is NOT CLOSED.
When a socket is closed, or fails, it is first closed, when it's on_close() method is invoked, and
in the end self.on_sock_closed() is called with the offending socket as argument.
This class runs out of the box, if you don't overload anything, but it won't do anything of interest,
just accept connections and receive data from sockets.
This class doesn't do out-of-band data.
""" """
def __init__(self, server_socket): def __init__(self, server_socket):
self.select_timeout = 5 #: timeout for select self.select_timeout = 5 #: timeout for select
...@@ -38,7 +46,7 @@ class SelectLoop(BaseThread): ...@@ -38,7 +46,7 @@ class SelectLoop(BaseThread):
def on_accept(self, socket, addr): def on_accept(self, socket, addr):
""" """
VIRTUAL Override this.
@param socket: raw socket object @param socket: raw socket object
@param addr: raw address tuple. @param addr: raw address tuple.
...@@ -48,15 +56,19 @@ class SelectLoop(BaseThread): ...@@ -48,15 +56,19 @@ class SelectLoop(BaseThread):
return BaseSocket(socket) return BaseSocket(socket)
def on_startup(self): def on_startup(self):
"""VIRTUAL. Called before the loop starts iterating, in new thread-context""" """Override this. Called before the loop starts iterating, in new thread-context"""
pass pass
def on_tick(self): def on_tick(self):
"""VIRTUAL. Called at each iteration""" """Override this. Called at each iteration"""
pass pass
def on_cleanup(self): def on_cleanup(self):
"""VIRTUAL. Called when the loop finishes.""" """Override this. Called when the loop finishes."""
pass
def on_sock_closed(self, sock):
"""VIRTUAL. Given socket is removed from the select layer, it has already been on_close()'d"""
pass pass
# Private variables # Private variables
...@@ -68,6 +80,8 @@ class SelectLoop(BaseThread): ...@@ -68,6 +80,8 @@ class SelectLoop(BaseThread):
if sock.has_expired() if sock.has_expired()
self.client_socket.remove(sock) self.client_socket.remove(sock)
sock.close() sock.close()
sock.on_close()
self.on_sock_closed(sock)
while True: # Accept foreign sockets while True: # Accept foreign sockets
try: try:
...@@ -88,18 +102,22 @@ class SelectLoop(BaseThread): ...@@ -88,18 +102,22 @@ class SelectLoop(BaseThread):
except SelectError: # is was this socket except SelectError: # is was this socket
cs.close() cs.close()
self.client_sockets.remove(cs) self.client_sockets.remove(cs)
cs.on_close()
self.on_sock_closed(cs)
return # repeat the loop return # repeat the loop
# dispatch on_read and on_write # dispatch on_read and on_write
for sock in ws: for sock in ws: # analyze sockets ready to be written
try: try:
sock.on_write() sock.on_write()
except ConnectionFailedException: except ConnectionFailedException:
sock.close() sock.close()
self.client_sockets.remove(sock) self.client_sockets.remove(sock)
sock.on_close()
self.on_sock_closed(sock)
return return
for sock in rs: for sock in rs: # analyze sockets ready to be read
if sock == self.server_socket: # accepting if sock == self.server_socket: # accepting
n_sock = self.on_accept(*sock.accept()) n_sock = self.on_accept(*sock.accept())
if n_sock != None: # socket returned if n_sock != None: # socket returned
...@@ -110,6 +128,8 @@ class SelectLoop(BaseThread): ...@@ -110,6 +128,8 @@ class SelectLoop(BaseThread):
except ConnectionFailedException: except ConnectionFailedException:
sock.close() sock.close()
self.client_sockets.remove(sock) self.client_sockets.remove(sock)
sock.on_close()
self.on_sock_closed(sock)
return return
def run(self): def run(self):
...@@ -119,5 +139,7 @@ class SelectLoop(BaseThread): ...@@ -119,5 +139,7 @@ class SelectLoop(BaseThread):
self.on_cleanup() self.on_cleanup()
for sock in self.client_sockets: # Close surviving client sockets for sock in self.client_sockets: # Close surviving client sockets
sock.close() sock.close()
sock.on_close()
self.on_sock_closed(sock)
...@@ -2,7 +2,7 @@ from satella.network.exceptions import DataUnavailableException, ConnectionFaile ...@@ -2,7 +2,7 @@ from satella.network.exceptions import DataUnavailableException, ConnectionFaile
from socket import error as SocketError from socket import error as SocketError
class BaseSocket(object): class BaseSocket(object):
"""Basic wrapper class around a socket""" """Basic wrapper class around a socket. NOT THREADSAFE"""
def __init__(self, socket): def __init__(self, socket):
""" """
@param socket: socket object to wrap around @param socket: socket object to wrap around
...@@ -11,35 +11,43 @@ class BaseSocket(object): ...@@ -11,35 +11,43 @@ class BaseSocket(object):
self.tx = bytearray() #: tx buffer self.tx = bytearray() #: tx buffer
self.rx = bytearray() #: rx buffer self.rx = bytearray() #: rx buffer
# Interface against select loop # STUFF THAT CAN BE OVERLOADED/EXTENDED
def has_expired(self): def has_expired(self):
"""Returns whether the socket should be forcibly closed due to inactivity""" """
Overload this.
@return bool - whether the socket should be forcibly closed due to inactivity
"""
return False return False
def on_read(self): def on_read(self):
"""Internal signal from select that this socket can be read. THROWS L{socket.error}""" """
Extend this, invoking inherited method at the beginning of your own.
Internal signal from select that this socket can be read. THROWS L{socket.error}
"""
try: try:
self.rx.extend(self.socket.recv(1024)) self.rx.extend(self.socket.recv(1024))
except SocketError: except SocketError:
raise ConnectionFailedException, 'recv failed' raise ConnectionFailedException, 'recv failed'
def on_write(self):
"""Internal signal from slect that this socket can be written. THROWS L{socket.error}"""
try:
dw = self.socket.send(self.tx)
except SocketError:
raise ConnectionFailedException, 'send failed'
del self.tx[:dw]
def wants_to_write(self):
"""Returns whether this socket wants to send data"""
return len(self.rx) > 0
def fileno(self): return self.socket.fileno() def send(self, data):
"""
Extend this, invoking inherited method at the end of your own with raw data to send.
Queues data to be sent. You can override it so that it receives and sends your
objects, which could represent protocol frames, or such.
"""
self.tx.extend(data)
def peek(self, ln): def peek(self, ln):
"""
Extend this. Returns data from the buffer without removing it
"""
if len(self.rx) < ln: if len(self.rx) < ln:
raise DataUnavailableException, 'Not enough data in buffer' raise DataUnavailableException, 'Not enough data in buffer'
return self.rx[:ln] return self.rx[:ln]
def read(self, ln): def read(self, ln):
"""
Extend this. Returns data from the buffer, removing it after
"""
if len(self.rx) < ln: if len(self.rx) < ln:
raise DataUnavailableException, 'Not enough data in buffer' raise DataUnavailableException, 'Not enough data in buffer'
k = self.rx[:ln] k = self.rx[:ln]
...@@ -50,9 +58,27 @@ class BaseSocket(object): ...@@ -50,9 +58,27 @@ class BaseSocket(object):
"""Returns whether a read request of ln bytes can be satisfied right away""" """Returns whether a read request of ln bytes can be satisfied right away"""
return len(self.rx) >= ln return len(self.rx) >= ln
def send(self, data): def on_close(self):
"""Queues data to be sent""" """Override this. Invoked when the socket is discarded by the select layer"""
self.tx.extend(data) pass
# Stuff that you should leave alone
def wants_to_write(self):
"""
Returns whether this socket wants to send data. Used by select to determine whether it should go
into the select loop
"""
return len(self.rx) > 0
def fileno(self): return self.socket.fileno()
def on_write(self):
"""Internal signal from slect that this socket can be written. THROWS L{socket.error}"""
try:
dw = self.socket.send(self.tx)
except SocketError:
raise ConnectionFailedException, 'send failed'
del self.tx[:dw]
def close(self): def close(self):
"""Closes the socket. NOEXCEPT""" """Closes the socket. NOEXCEPT"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment