Skip to content
Snippets Groups Projects
Commit b7393922 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

first commit

base socket and select mechanism added
parents
No related branches found
No related tags found
No related merge requests found
# Auto detect text files and perform LF normalization
* text=auto
# Custom for Visual Studio
*.cs diff=csharp
*.sln merge=union
*.csproj merge=union
*.vbproj merge=union
*.fsproj merge=union
*.dbproj merge=union
# Standard to msysgit
*.doc diff=astextplain
*.DOC diff=astextplain
*.docx diff=astextplain
*.DOCX diff=astextplain
*.dot diff=astextplain
*.DOT diff=astextplain
*.pdf diff=astextplain
*.PDF diff=astextplain
*.rtf diff=astextplain
*.RTF diff=astextplain
#################
## Eclipse
#################
*.pydevproject
.project
.metadata
bin/
tmp/
*.tmp
*.bak
*.swp
*~.nib
local.properties
.classpath
.settings/
.loadpath
# External tool builders
.externalToolBuilders/
# Locally stored "Eclipse launch configurations"
*.launch
# CDT-specific
.cproject
# PDT-specific
.buildpath
#################
## Visual Studio
#################
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.
# User-specific files
*.suo
*.user
*.sln.docstates
# Build results
[Dd]ebug/
[Rr]elease/
*_i.c
*_p.c
*.ilk
*.meta
*.obj
*.pch
*.pdb
*.pgc
*.pgd
*.rsp
*.sbr
*.tlb
*.tli
*.tlh
*.tmp
*.vspscc
.builds
*.dotCover
## TODO: If you have NuGet Package Restore enabled, uncomment this
#packages/
# Visual C++ cache files
ipch/
*.aps
*.ncb
*.opensdf
*.sdf
# Visual Studio profiler
*.psess
*.vsp
# ReSharper is a .NET coding add-in
_ReSharper*
# Installshield output folder
[Ee]xpress
# DocProject is a documentation generator add-in
DocProject/buildhelp/
DocProject/Help/*.HxT
DocProject/Help/*.HxC
DocProject/Help/*.hhc
DocProject/Help/*.hhk
DocProject/Help/*.hhp
DocProject/Help/Html2
DocProject/Help/html
# Click-Once directory
publish
# Others
[Bb]in
[Oo]bj
sql
TestResults
*.Cache
ClientBin
stylecop.*
~$*
*.dbmdl
Generated_Code #added for RIA/Silverlight projects
# Backup & report files from converting an old project file to a newer
# Visual Studio version. Backup files are not needed, because we have git ;-)
_UpgradeReport_Files/
Backup*/
UpgradeLog*.XML
############
## Windows
############
# Windows image file caches
Thumbs.db
# Folder config file
Desktop.ini
#############
## Python
#############
*.py[co]
# Packages
*.egg
*.egg-info
dist
build
eggs
parts
bin
var
sdist
develop-eggs
.installed.cfg
# Installer logs
pip-log.txt
# Unit test / coverage reports
.coverage
.tox
#Translations
*.mo
#Mr Developer
.mr.developer.cfg
# Mac crap
.DS_Store
class DataUnavailableException(Exception): pass
class ConnectionFailedException(Exception): pass
\ No newline at end of file
from select import select, error as SelectError
from threading import Thread
from Queue import Queue, Empty
from satella.network.socket import BaseSocket
from satella.network.exceptions import ConnectionFailedException
class SelectLoop(Thread):
"""
Thread that does a select loop.
In general, you are expected to subclass it and write methods corresponding to tasks. The loop works like this:
- when started, invokes on_startup()
- in infinite loop (unless terminate()d ):
* calls on_tick()
* closes timeouted sockets
* accepts foreign sockets into the loop (via self.send_socket mechanism)
* select's on the sockets, closing and removing failed ones as necessary
* dispatches on_read and on_write, accepts connections.
if those calls throw ConnectionFailedException, they will be closed
- when terminated, invokes on_cleanup()
- remaining client sockets are closed. Server socket is NOT CLOSED.
"""
def __init__(self, server_socket):
self.select_timeout = 5 #: timeout for select
self.server_socket = server_socket
self.client_sockets = [] #: satella.network.socket.BaseSocket descendants
self.external_accepts = Queue() #: synchronization element used to accept
#: sockets forcibly into the loop
def send_socket(self, sock):
"""Forces this loop to accept a socket as it's own client socket.
Can be safely executed from other thread contexts.
@type sock: L{satella.network.socket.BaseSocket} descendants"""
self.external_accepts.put(sock)
def on_accept(self, socket, addr):
"""
VIRTUAL
@param socket: raw socket object
@param addr: raw address tuple.
@return: new L{satella.network.socket.BaseSocket} or None, if socket is to be forgotten
(it can be returned later by send_socket)
"""
return BaseSocket(socket)
def on_startup(self):
"""VIRTUAL. Called before the loop starts iterating, in new thread-context"""
pass
def on_tick(self):
"""VIRTUAL. Called at each iteration"""
pass
def on_cleanup(self):
"""VIRTUAL. Called when the loop finishes."""
pass
# Private variables
def loop(self): # it's two separate procedures as it allows more fine-grained flow control via return
self.on_tick() # Call on_tick()
# Close timeouted sockets
for sock in self.client_sockets[:]:
if sock.has_expired()
self.client_socket.remove(sock)
sock.close()
while True: # Accept foreign sockets
try:
new_sock = self.external_accepts.get(False)
except Empty:
break
self.client_sockets.append(new_sock)
try: # select the sockets
rs, ws, xs = select(self.client_sockets + [self.server_socket],
[sock for sock in self.client_sockets if sock.wants_to_write()],
(),
self.select_timeout)
except SelectError: # some socket has died a horrible death
for cs in self.client_sockets[:]:
try:
select(cs, (), (), 0)
except SelectError: # is was this socket
cs.close()
self.client_sockets.remove(cs)
return # repeat the loop
# dispatch on_read and on_write
for sock in ws:
try:
sock.on_write()
except ConnectionFailedException:
sock.close()
self.client_sockets.remove(sock)
return
for sock in rs:
if sock == self.server_socket: # accepting
n_sock = self.on_accept(*sock.accept())
if n_sock != None: # socket returned
self.client_sockets.append(n_sock)
else: # just a civilian socket
try:
sock.on_read()
except ConnectionFailedException:
sock.close()
self.client_sockets.remove(sock)
return
def run(self):
self.on_startup()
while True:
self.loop()
self.on_cleanup()
for sock in self.client_sockets: # Close surviving client sockets
sock.close()
from satella.network.exceptions import DataUnavailableException, ConnectionFailedException
from socket import error as SocketError
class BaseSocket(object):
"""Basic wrapper class around a socket"""
def __init__(self, socket):
"""
@param socket: socket object to wrap around
"""
self.socket = socket #: socket object wrapped
self.tx = bytearray() #: tx buffer
self.rx = bytearray() #: rx buffer
# Interface against select loop
def has_expired(self):
"""Returns whether the socket should be forcibly closed due to inactivity"""
return False
def on_read(self):
"""Internal signal from select that this socket can be read. THROWS L{socket.error}"""
try:
self.rx.extend(self.socket.recv(1024))
except SocketError:
raise ConnectionFailedException, 'recv failed'
def on_write(self):
"""Internal signal from slect that this socket can be written. THROWS L{socket.error}"""
try:
dw = self.socket.send(self.tx)
except SocketError:
raise ConnectionFailedException, 'send failed'
del self.tx[:dw]
def wants_to_write(self):
"""Returns whether this socket wants to send data"""
return len(self.rx) > 0
def fileno(self):
return self.socket.fileno()
def read(self, ln):
if len(self.rx) < ln:
raise DataUnavailableException, 'Not enough data in buffer'
k = self.rx[:ln]
del self.rx[:ln]
return k
def has_data(self, ln):
"""Returns whether a read request of ln bytes can be satisfied right away"""
return len(self.rx) >= ln
def send(self, data):
"""Queues data to be sent"""
self.tx.extend(data)
def close(self):
"""Closes the socket. NOEXCEPT"""
try:
self.socket.close()
except:
pass
class BasicTimeoutTrackingSocket(BaseSocket):
"""Basic extension of BaseSocket that tracks timeouts and deletes them if no data was received during
a time period"""
def __init__(self, socket, expiration, *args, **kwargs):
"""If no data is received on the socket for expiration seconds, it will be closed due to inactivity"""
BaseSocket.__init__(self, socket, *args, **kwargs)
self._last_received = time()
self._expiration = expiration
def on_read(self):
BaseSocket.on_read(self)
self._last_received = time()
def has_expired(self):
return (time() - self._last_received) > self._expiration
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment