satella package¶
Subpackages¶
- satella.cassandra package
- satella.coding package
- Subpackages
- satella.coding.concurrent package
- Subpackages
- Submodules
- satella.coding.concurrent.atomic module
- satella.coding.concurrent.callablegroup module
- satella.coding.concurrent.functions module
- satella.coding.concurrent.id_allocator module
- satella.coding.concurrent.list_processor module
- satella.coding.concurrent.locked_dataset module
- satella.coding.concurrent.locked_structure module
- satella.coding.concurrent.monitor module
- satella.coding.concurrent.queue module
- satella.coding.concurrent.sync module
- satella.coding.concurrent.thread module
- satella.coding.concurrent.thread_collection module
- satella.coding.concurrent.timer module
- satella.coding.concurrent.value module
- Module contents
- satella.coding.decorators package
- satella.coding.resources package
- satella.coding.sequences package
- satella.coding.structures package
- Subpackages
- Submodules
- satella.coding.structures.hashable_objects module
- satella.coding.structures.immutable module
- satella.coding.structures.lru module
- satella.coding.structures.proxy module
- satella.coding.structures.push_iterable module
- satella.coding.structures.queues module
- satella.coding.structures.ranking module
- satella.coding.structures.singleton module
- satella.coding.structures.sorted_list module
- satella.coding.structures.sparse_matrix module
- satella.coding.structures.syncable_droppable module
- satella.coding.structures.tuples module
- satella.coding.structures.typednamedtuple module
- satella.coding.structures.zip_dict module
- Module contents
- satella.coding.transforms package
- Submodules
- satella.coding.transforms.base64 module
- satella.coding.transforms.interpol module
- satella.coding.transforms.jsonify module
- satella.coding.transforms.merge_list module
- satella.coding.transforms.merger module
- satella.coding.transforms.misc module
- satella.coding.transforms.percentile module
- satella.coding.transforms.predicates module
- satella.coding.transforms.words module
- Module contents
- satella.coding.concurrent package
- Submodules
- satella.coding.algos module
- satella.coding.ctxt_managers module
- satella.coding.deep_compare module
- satella.coding.deleters module
- satella.coding.environment module
- satella.coding.expect_exception module
- satella.coding.fun_static module
- satella.coding.generators module
- satella.coding.iterators module
- satella.coding.metaclasses module
- satella.coding.misc module
- satella.coding.optionals module
- satella.coding.overloading module
- satella.coding.predicates module
- satella.coding.recast_exceptions module
- satella.coding.typing module
- Module contents
- Subpackages
- satella.configuration package
- Subpackages
- satella.configuration.schema package
- satella.configuration.sources package
- Submodules
- satella.configuration.sources.base module
- satella.configuration.sources.derivative module
- satella.configuration.sources.envvars module
- satella.configuration.sources.file module
- satella.configuration.sources.format module
- satella.configuration.sources.from_dict module
- satella.configuration.sources.object_from module
- Module contents
- Module contents
- Subpackages
- satella.debug package
- satella.exception_handling package
- satella.instrumentation package
- satella.opentracing package
- satella.os package
- satella.time package
Submodules¶
satella.dao module¶
- class satella.dao.Loadable(load_lazy=False)¶
Bases:
object
Any class that can be loaded lazily.
It’s keyword argument, load_lazy is expected to control lazy loading. If set to True, DB will be hit as a part of this object’s constructor.
If False, you will need to load it on-demand via
must_be_loaded()
decorator.- Parameters:
load_lazy (bool) –
- abstract refresh(load_from=None)¶
Ask the database about this object, or load it from provided serialized representation.
Override me, calling me in a super method.
- Parameters:
load_from – serialized object. If not given, the DB will be asked for it
- Return type:
None
satella.db module¶
- class satella.db.transaction(connection_or_getter, close_the_connection_after=False, log_exception=True)¶
Bases:
object
A context manager for wrapping a transaction and getting a cursor from the Python DB API 2.
Use it as a context manager. commit and rollback will be automatically called for you.
Use like:
>>> with transaction(conn) as cur: >>> cur.execute('DROP DATABASE')
Leaving the context manager will automatically close the cursor for you.
- Parameters:
connection_or_getter – the connection object to use, or a callable, that called with zero arguments will provide us with a connection
close_the_connection_after (bool) – whether the connection should be closed after use
log_exception (bool) – whether to log an exception if it happens
- property connection¶
satella.exceptions module¶
- exception satella.exceptions.AlreadyAllocated(*args, **kwargs)¶
Bases:
ResourceLocked
Given ID has been already marked as allocated
- exception satella.exceptions.BaseSatellaError(*args, **kwargs)¶
Bases:
CustomException
“Base class for all Satella exceptions
- exception satella.exceptions.BaseSatellaException(*args, **kwargs)¶
Bases:
BaseSatellaError
- exception satella.exceptions.CodedCustomException(message='', code=None, *args, **kwargs)¶
Bases:
CustomException
An exception with the property that every CodedCustomException having the code of it’s child instance will be considered it’s child by isinstance.
I.e. following is true:
>>> class MyCustomError(CodedCustomException): >>> code = 5 >>> a = CodedCustomException() >>> a.code = 5 >>> assert isinstance(a, MyCustomError)
Note that you however cannot use your child classes, ie. in following way:
>>> try: >>> e = CodedCustomException() >>> e.code = 5 >>> except MyCustomError: >>> ...
due to Python limitations. See https://mail.python.org/pipermail/python-ideas/2015-November/037104.html to learn more about the issue.
Note that the exception you’re checking needs to be included in the class hierarchy. Here’s the snippet that illustrates this:
>>> class MyException(CodedCustomException): >>> pass >>> class MyCode5(MyException): >>> code = 5 >>> class DifferentHierarchy(CodedCustomException): >>> pass >>> class MyCode5Diff(DifferentHierarchy): >>> code = 5 >>> a = MyException() >> a.code = 5 >>> assert not isinstance(a, MyCode5Diff)
- class satella.exceptions.CodedCustomExceptionMetaclass¶
Bases:
type
Metaclass implementing the isinstance check for coded custom exceptions
- code: Optional[Any] = None¶
- exception satella.exceptions.ConfigurationError(*args, **kwargs)¶
Bases:
BaseSatellaError
,ValueError
A generic error during configuration
- exception satella.exceptions.ConfigurationMisconfiguredError(*args, **kwargs)¶
Bases:
ConfigurationError
Configuration was improperly passed to Satella
- exception satella.exceptions.ConfigurationSchemaError(*args, **kwargs)¶
Bases:
ConfigurationError
Schema mismatch to what was seen
- exception satella.exceptions.ConfigurationValidationError(msg, value=None, **kwargs)¶
Bases:
ConfigurationSchemaError
A validator failed
- Parameters:
value – value found
- exception satella.exceptions.CustomException(*args, **kwargs)¶
Bases:
Exception
” Base class for your custom exceptions. It will:
Accept any number of arguments
Provide faithful __repr__ and a reasonable __str__
It passed all arguments that your exception received via super(). Just remember to actually pass these arguments in your inheriting classes!
- exception satella.exceptions.Empty(*args, **kwargs)¶
Bases:
BaseSatellaError
,Empty
The queue was empty
- exception satella.exceptions.ImpossibleError¶
Bases:
BaseException
For these cases where your execution flow goes some place, that should be impossible for it to reach.
This is a BaseException, since it should be propagated upwards as soon as possible!
- exception satella.exceptions.MetricAlreadyExists(msg, name, requested_type, existing_type)¶
Bases:
BaseSatellaError
Metric with given name already exists, but with a different type
- exception satella.exceptions.NotEnoughBytes(*args, **kwargs)¶
Bases:
BaseSatellaError
Not enough bytes in the parser remain to satisfy this request
- exception satella.exceptions.PreconditionError(*args, **kwargs)¶
Bases:
BaseSatellaError
,ValueError
A precondition was not met for the argument
- exception satella.exceptions.ProcessFailed(rc, stdout_so_far)¶
Bases:
BaseSatellaError
,OSError
A process finished with other result code than it was requested
- Parameters:
rc (int) – return code of the process
stdout_so_far (Union[bytes, str]) – process’ stdout gathered so far
- exception satella.exceptions.ResourceLocked(*args, **kwargs)¶
Bases:
ResourceLockingError
Given resource has been already locked
- exception satella.exceptions.ResourceLockingError(*args, **kwargs)¶
Bases:
BaseSatellaError
Base class for resource locking issues
- exception satella.exceptions.ResourceNotLocked(*args, **kwargs)¶
Bases:
ResourceLockingError
Locking given resource is needed in order to access it
- exception satella.exceptions.WouldWaitMore(*args, **kwargs)¶
Bases:
ResourceLockingError
,TimeoutError
wait()’s timeout has expired
satella.files module¶
- class satella.files.AutoflushFile(file, mode, *con_args, **con_kwargs)¶
Bases:
Proxy
[FileIO
]A file that is supposed to be closed after each write command issued.
The file will be open only when there’s an action to do on it called.
Best for appending so that other processes can read.
Use like:
>>> f = AutoflushFile('test.txt', 'rb+', encoding='utf-8') >>> f.write('test') >>> with open('test.txt', 'a+', encoding='utf-8') as fin: >>> assert fin.read() == 'test'
- Parameters:
file (str) – path to the file
mode (str) – mode to open the file with. Allowed values are w, wb, w+, a+, wb+, ab+, a, ab. w+ and wb+ will truncate the file. Effective mode will be chosen by the class whatever just makes sense.
- Raises:
ValueError – invalid mode chosen
- close()¶
Closes the file.
- Return type:
None
- read(*args, **kwargs)¶
Read a file, returning the read-in data
- Returns:
data readed
- Return type:
Union[str, bytes]
- readall()¶
Read all contents into the file
- Return type:
Union[str, bytes]
- seek(offset, whence=0)¶
Seek to a provided position within the file
- Parameters:
offset (int) – offset to seek file
whence (int) – how to count. Refer to documentation of file.seek()
- Returns:
current pointer
- Return type:
int
- truncate(_size=None)¶
Truncate file to __size starting bytes
- Parameters:
_size (Optional[int]) –
- Return type:
int
- write(*args, **kwargs)¶
Write a particular value to the file, close it afterwards.
- Returns:
amount of bytes written
- Return type:
int
- class satella.files.DevNullFilelikeObject(binary=False)¶
Bases:
FileIO
A /dev/null filelike object. For multiple uses.
- Parameters:
binary (bool) – is this a binary file
- close()¶
Close this stream. Further write()s and flush()es will raise a ValueError. No-op if invoked multiple times
- Return type:
None
- flush()¶
- Raises:
ValueError – when this object has been closed
- Return type:
None
- read(byte_count=None)¶
- Raises:
ValueError – this object has been closed
io.UnsupportedOperation – since reading from this is forbidden
- Parameters:
byte_count (Optional[int]) –
- Return type:
Union[str, bytes]
- seek(v)¶
Seek to a particular file offset
- Parameters:
v (int) –
- Return type:
int
- seekable()¶
Is this file seekable?
- Return type:
bool
- tell()¶
Return the current file offset
- Returns:
the current file offset
- Return type:
int
- truncate(_DevNullFilelikeObject__size=None)¶
Truncate file to __size starting bytes
- Parameters:
_DevNullFilelikeObject__size (Optional[int]) –
- Return type:
int
- writable()¶
Is this object writable
- Return type:
bool
- write(y)¶
Discard any amount of bytes.
This will raise a RuntimeWarning warning upon writing invalid type.
- Raises:
ValueError – this object has been closed
TypeError – eg. type set to binary and text provided to write
- Returns:
length of written content
- Parameters:
y (Union[str, bytes]) –
- Return type:
int
- satella.files.find_files(path, wildcard='(.*)', prefix_with=None, scan_subdirectories=True, apply_wildcard_to_entire_path=False, prefix_with_path=True)¶
Look at given path’s files and all subdirectories and return an iterator of file names (paths included) that conform to given wildcard.
Note that wildcard is only applied to the file name if apply_wildcard_to_entire_path is False, else the wildcard is applied to entire path (including the application of prefix_with!).
Files will be additionally prefixed with path, but only if prefix_with_path is True
Warning
Note that this will try to match only the start of the path. For a complete match remember to put a $ at the end of the string!
- Parameters:
path (str) – path to look into.
wildcard (str) – a regular expression to match
prefix_with (Optional[str]) – an optional path component to prefix before the filename with os.path.join
scan_subdirectories (bool) – whether to scan subdirectories
apply_wildcard_to_entire_path (bool) – whether to take the entire relative path into account when checking wildcard
prefix_with_path (bool) – whether to add path to the resulting path
- Returns:
paths with the files. They will be relative paths, relative to path
- Return type:
Iterator[str]
- class satella.files.jump_to_directory(path, mode=511)¶
Bases:
object
This will temporarily change current working directory. Note however is doesn’t proof you against deliberately changing the working directory by the user.
Non existing directories will be created.
- Variables:
path – (str) target path
prev_path – (str) path that was here before this was called.
- Parameters:
path (tp.Optional[str]) –
- path¶
- prev_path¶
- satella.files.make_noncolliding_name(path, exists_checker=<function exists>)¶
Try to make a noncolliding name in such a way that .1, .2, .3, and so on will be appended to the file name right before the extension (yielding test.1.txt) or at the end of the file name if the extension isn’t present
- Parameters:
path (str) – path of the file that has not to exist
exists_checker (Callable[[str], bool]) – a callable to check with if the file exists
- Returns:
name mutated in such a way that exists_checker returned False on it
- Return type:
str
- satella.files.read_in_file(path, encoding=None, default=<class 'satella.files._NOTSET'>)¶
Opens a file for reading, reads it in, converts to given encoding (or returns as bytes if not given), and closes it.
- Parameters:
path (str) – path of file to read
encoding (Optional[str]) – optional encoding. If default, this will be returned as bytes
default (Optional[Union[bytes, str]]) – value to return when the file does not exist. Default (None) will raise a FileNotFoundError
- Returns:
file content, either decoded as a str, or not as bytes
- Raises:
FileNotFoundError – file did not exist and default was not set
- Return type:
Union[bytes, str]
- satella.files.read_lines(path, delete_empty_lines=True, encoding='utf-8')¶
Read lines from a particular file, removing end-of-line characters and optionally empty lines. Additionally whitespaces (and end-of-line characters) will be removed from both ends of each line.
- Parameters:
path (str) – path of file to read
delete_empty_lines (bool) – set to False if empty lines are not to be removed
encoding (str) – encoding to read the file with
- Returns:
each line as a separate entry
- Return type:
List[str]
- satella.files.read_re_sub_and_write(path, pattern, repl)¶
Read a text file, treat with re.sub and write the contents.
Note that this is not thread or multiprocess safe.
- Parameters:
path (str) – path of file to treat
pattern (Union[compile, str]) – regexp compiled pattern or a string, a pattern to match the file contents
repl (Callable[[Any], str]) – string or a callable(re.Match)->str to replace the contents
- Return type:
None
- satella.files.split(path)¶
An exact reverse of os.path.join
Is is true that
>>> os.path.join(split(a)) == a
- Parameters:
path (str) –
- Return type:
List[str]
- satella.files.try_unlink(path)¶
A syntactic sugar for:
>>> try: >>> os.unlink(path) >>> return True >>> except FileNotFoundError: >>> return False
Note that if path is a directory, rmtree from shlex will be called on it, and any OSErrors will report the deletion as False
- Parameters:
path (str) – path of file to delete
- Returns:
whether the deletion happened
- Return type:
bool
- satella.files.write_out_file_if_different(path, data, encoding=None)¶
Syntactic sugar for
>>> try: >>> if read_in_file(path, encoding) != data: >>> write_to_file(path, data, encoding) >>> return True >>> else: >>> return False >>> except OSError: >>> write_to_file(path, data, encoding) >>> return True
- Parameters:
path (str) – Path to put the file under
data (Union[bytes, str]) – Data to write. Must be bytes if no encoding is given, str otherwise
encoding (Optional[str]) – Encoding. Default is None, which means no encoding (bytes will be written)
- Returns:
if write has happened
- Return type:
bool
- satella.files.write_to_file(path, data, encoding=None)¶
Write provided content as a file, applying given encoding (or data is bytes, if none given)
- Parameters:
path (str) – Path to put the file under
data (Union[bytes, str]) – Data to write. Must be bytes if no encoding is given, str otherwise
encoding (Optional[str]) – Encoding. Default is None, which means no encoding (bytes will be written)
- Return type:
None
satella.imports module¶
- satella.imports.import_class(path)¶
Import a class identified with given module path and class name
- Parameters:
path (str) – path, eg. subprocess.Popen
- Returns:
imported class
- Return type:
type
- satella.imports.import_from(path, package_prefix, all_, locals_, recursive=True, fail_on_attributerror=True, create_all=True, skip_single_underscores=True, skip_not_having_all=False)¶
Import everything from a given module. Append these module’s all to.
This will examine __all__ of given module (if it has any, else it will just import everything from it, which is probably a bad practice and will heavily pollute the namespace.
As a side effect, this will equip all of your packages with __all__.
- Parameters:
path (List[str]) – module’s __path__
package_prefix (str) – package prefix to import from. Use __name__
all – module’s __all__ to append to
recursive (bool) – whether to import packages as well
fail_on_attributerror (bool) – whether to fail if a module reports something in their __all__ that is physically not there (ie. getattr() raised AttributeError
locals – module’s locals, obtain them by calling locals() in importing module’s context
create_all (bool) – whether to create artificial __all__’s for modules that don’t have them
skip_single_underscores (bool) – whether to refrain from importing things that are preceded with a single underscore. Pertains to modules, as well as items
skip_not_having_all (bool) – skip module’s not having an __all__ entry
all_ (List[str]) –
locals_ (Dict[str, Any]) –
- Raises:
AttributeError – module’s __all__ contained entry that was not in this module
- Return type:
None
satella.json module¶
- class satella.json.JSONAble¶
Bases:
object
- abstract to_json()¶
Return a JSON-able representation of this object
- Return type:
Jsonable
- class satella.json.JSONAbleDataObject(**kwargs)¶
Bases:
object
A data-class that supports conversion of its classes to JSON
Define like this:
>>> class CultureContext(JSONAbleDataObject): >>> language: str >>> timezone: str >>> units: str = 'metric'
Note that type annotation is mandatory and default values are supported. Being data value objects, these are eq-able and hashable.
And use like this:
>>> a = CultureContext(language='pl', timezone='Europe/Warsaw') >>> assert a.to_json() == {'language': 'pl', 'timezone': 'Europe/Warsaw', 'units': 'metric'} >>> assert CultureContext.from_json(a.to_json) == a
- Raises:
ValueError – a non-default value was not provided
- classmethod from_json(jsonable)¶
- Return type:
- to_json()¶
Convert self to JSONable value
- Return type:
Dict
- class satella.json.JSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)¶
Bases:
JSONEncoder
This encoder will encode everything!
enums will be dumped to their value.
Constructor for JSONEncoder, with sensible defaults.
If skipkeys is false, then it is a TypeError to attempt encoding of keys that are not str, int, float or None. If skipkeys is True, such items are simply skipped.
If ensure_ascii is true, the output is guaranteed to be str objects with all incoming non-ASCII characters escaped. If ensure_ascii is false, the output can contain non-ASCII characters.
If check_circular is true, then lists, dicts, and custom encoded objects will be checked for circular references during encoding to prevent an infinite recursion (which would cause an RecursionError). Otherwise, no such check takes place.
If allow_nan is true, then NaN, Infinity, and -Infinity will be encoded as such. This behavior is not JSON specification compliant, but is consistent with most JavaScript based encoders and decoders. Otherwise, it will be a ValueError to encode such floats.
If sort_keys is true, then the output of dictionaries will be sorted by key; this is useful for regression tests to ensure that JSON serializations can be compared on a day-to-day basis.
If indent is a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation.
If specified, separators should be an (item_separator, key_separator) tuple. The default is (’, ‘, ‘: ‘) if indent is
None
and (‘,’, ‘: ‘) otherwise. To get the most compact JSON representation, you should specify (‘,’, ‘:’) to eliminate whitespace.If specified, default is a function that gets called for objects that can’t otherwise be serialized. It should return a JSON encodable version of the object or raise a
TypeError
.- default(o)¶
Implement this method in a subclass such that it returns a serializable object for
o
, or calls the base implementation (to raise aTypeError
).For example, to support arbitrary iterators, you could implement default like this:
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return JSONEncoder.default(self, o)
- Parameters:
o (Any) –
- Return type:
Jsonable
- satella.json.json_encode(x)¶
Convert an object to JSON. Will properly handle subclasses of JSONAble
- Parameters:
x (Any) – object to convert
- Return type:
str
- satella.json.read_json_from_file(path)¶
Load a JSON from a provided file, as UTF-8 encoded plain text.
- Parameters:
path (str) – path to the file
- Returns:
JSON content
- Raises:
ValueError – the file contained an invalid JSON
OSError – the file was not readable or did not exist
- Return type:
- satella.json.write_json_to_file(path, value, **kwargs)¶
Write out a JSON to a file as UTF-8 encoded plain text.
This will use Satella’s
JSONEncoder
internally.- Parameters:
path (str) – path to the file
value (JSONAble) – JSON-able content
kwargs – Legacy argument do not use it, will raise a warning upon non-empty. This never did anything.
- Return type:
None
- satella.json.write_json_to_file_if_different(path, value, encoding='utf-8', **kwargs)¶
Read JSON from a file. Write out a JSON to a file if it’s value is different, as UTF-8 encoded plain text.
This will use Satella’s
JSONEncoder
internally.- Parameters:
path (str) – path to the file
value (JSONAble) – JSON-able content
encoding (str) – encoding to use while parsing the contents of the file
kwargs – will be passed to ujson/json dumps
- Returns:
whether the write actually happened
- Return type:
bool
satella.parsing module¶
- class satella.parsing.BinaryParser(b_stream, offset=0, length=None)¶
Bases:
object
A class that allows parsing binary streams easily.
This supports __len__ to return the amount of bytes in the stream, and __bytes__ to return the bytes.
This is a zero-copy solution, and
get_parser()
will be zero copy as well.- Parameters:
b_stream (Union[bytes, bytearray]) – an object that allows subscripts to span ranges, which will return items parsable by struct
offset (int) – initial offset into the stream
length (Optional[int]) – optional maximum length of byte count
- Raises:
NotEnoughBytes – offset larger than stream length
- Variables:
pointer – pointer to the next bytes. Can be read and modified at will to preserve the earlier state of the BinaryParser.
- assert_has_bytes(n)¶
Assert that we have at least n bytes to consume.
This does not advance the pointer.
- Parameters:
n (int) – amount of bytes to consume
- Raises:
NotEnoughBytes – not enough bytes remain in the stream!
- Return type:
None
- get_bytes(n)¶
Return this many bytes
- Parameters:
n (int) – amount of bytes to return
- Returns:
bytes returned
- Raises:
NotEnoughBytes – not enough bytes remain in the stream!
- Return type:
bytes
- get_parser(length)¶
Return a subclassed binary parser providing a window to another binary parser’s data.
This will advance the pointer by length bytes
- Parameters:
length (int) – amount of bytes to view
- Returns:
a BinaryParser
- Raises:
NotEnoughBytes – not enough bytes remain in the stream!
- Return type:
- get_remaining_bytes()¶
Return the remaining bytes.
This will not advance the pointer
- Return type:
Union[bytes, bytearray]
- get_remaining_bytes_count()¶
Return the amount of bytes remaining. This will not advance the pointer
- Return type:
int
- get_struct(st)¶
Try to obtain as many bytes as this struct requires and return them parsed.
This must be a single-character struct!
This will advance the pointer by size of st. Struct objects will be served from internal instance-specific cache.
- Parameters:
st (Union[str, Struct]) – a single-character struct.Struct or a single character struct specification
- Returns:
a value returned from it
- Raises:
NotEnoughBytes – not enough bytes remain in the stream!
AssertionError – struct was not a single element one!
- Return type:
Union[int, float]
- get_structs(st)¶
Try to obtain as many bytes as this struct requires and return them parsed.
This will advance the pointer by size of st. Struct objects will be served from internal instance-specific cache.
- Parameters:
st (Union[str, Struct]) – a struct.Struct or a multi character struct specification
- Returns:
a tuple of un-parsed values
- Raises:
NotEnoughBytes – not enough bytes remain in the stream!
- Return type:
Tuple[Union[int, float], …]
- reset()¶
Reset the internal pointer to starting value :return:
- Return type:
None
- skip(n)¶
Advance the pointer by n bytes
- Parameters:
n (int) – bytes to advance
- Raises:
NotEnoughBytes – not enough bytes remain in the stream!
- Return type:
None
satella.posix module¶
- class satella.posix.PIDFileLock(pid_file, base_dir='/var/run')¶
Bases:
object
Acquire a PID lock file.
Usage:
>>> with PIDFileLock('my_service.pid'): >>> ... rest of code ..
Any alternatively
>>> pid_lock = PIDFileLock('my_service.pid') >>> pid_lock.acquire() >>> ... >>> pid_lock.release()
The constructor doesn’t throw, __enter__ or acquire() does, one of:
- ResourceLocked - lock is already held. This has two attributes - pid (int), the PID of holder,
and is_alive (bool) - whether the holder is an alive process
Initialize a PID lock file object
- Parameters:
pid_file – rest of path
base_dir – base lock directory
- acquire()¶
Acquire the PID lock
- Raises:
ResourceLocked – if lock if held
- Return type:
None
- file_no¶
- path¶
- release()¶
Free the lock :raises RuntimeError: lock not acquired
- Return type:
None
- satella.posix.daemonize(exit_via=<built-in function exit>, redirect_std_to_devnull=True, uid=None, gid=None)¶
Make this process into a daemon.
This entails:
umask 0
forks twice
be the child of init
becomes session leader
changes root directory to /
closes stdin, stdout, stderr
(option) redirects stdin, stdout, stderr to /dev/null
Refer - “Advanced Programming in the UNIX Environment” 13.3
- Parameters:
exit_via (Callable) – callable used to terminate process
redirect_std_to_devnull (bool) – whether to redirect stdin, stdout and stderr to /dev/null
uid (Optional[int]) – User to set (via seteuid). Default - this won’t be done. You can pass either user name as string or UID.
gid (Optional[int]) – Same as UID, but for groups. These will be resolved too.
- Raises:
KeyError – uid/gid was passed as string, but getpwnam() failed
OSError – platform is Windows
- satella.posix.hang_until_sig(extra_signals=None, sleep_interval=2)¶
Will hang until this process receives SIGTERM or SIGINT. If you pass extra signal IDs (signal.SIG*) with extra_signals, then also on those signals this call will release.
Periodic sleeping with polling was chosen as the approach of choice, as pause() seemed to work a bit shakily multi-platform.
- Parameters:
extra_signals (Optional[Sequence[int]]) – a list of extra signals to listen to
sleep_interval (float) – amount of time to sleep between checking for termination condition.
- Return type:
None
- satella.posix.is_running_as_root()¶
Is this process running as root?
Checks whether effective UID is 0
- Returns:
bool
- Raises:
OSError – called on Windows!
- Return type:
bool
- satella.posix.suicide(kill_entire_pg=True)¶
Kill self.
- Parameters:
kill_entire_pg (bool) – whether to kill entire PG if a session leader. Won’t work on Windows.
- Return type:
None
satella.processes module¶
- satella.processes.call_and_return_stdout(args, timeout=None, encoding=None, expected_return_code=None, **kwargs)¶
Call a process and return it’s stdout.
Everything in kwargs will be passed to subprocess.Popen
A bytes object will be returned if encoding is not defined, else stdout will be decoded according to specified encoding.
Deprecated since version Use:
subprocess.check_output
instead.- Parameters:
args (Union[str, List[str]]) – arguments to run the program with. Can be either a string or a list of strings.
timeout (Optional[Union[str, int]]) – amount of seconds to wait for the process result. If process does not complete within this time, it will be sent a SIGKILL. Can be also a time string. If left at default, ie. None, timeout won’t be considered at all.
encoding (Optional[str]) – encoding with which to decode stdout. If none is passed, it will be returned as a bytes object
expected_return_code (Optional[int]) – an expected return code of this process. 0 is the default. If process returns anything else, ProcessFailed will be raise. If left default (None) return code won’t be checked at all
- Raises:
ProcessFailed – process’ result code was different from the requested
TimeoutError – timeout was specified and the process didn’t complete
- Return type:
Union[bytes, str]
- satella.processes.read_nowait(process, output_list)¶
This spawns a thread to read given process’ stdout and append it to a list, in order to prevent buffer filling up completely.
To retrieve entire stdout after process finishes do
>>> ''.join(list)
This thread will terminate automatically after the process closes it’s stdout or finishes.
- Parameters:
process (Popen) –
output_list (List[str]) –
satella.random module¶
- satella.random.random_binary(length)¶
Return a random bytes string of given length.
An attempt will be made to utilize /dev/random, if exists
- Parameters:
length (int) – length of string to generate
- Return type:
bytes
- satella.random.random_word(length, choice='abcdefghijklmnopqrstuvwxyz', join_fun=<function <lambda>>)¶
Build and return a random word of provided length.
The word will be built by calling join_fun with length of arguments picked at random from choice.
Best used with strings. Provide a word length, a string to choose from as choice (defaults to string.ascii_lowercase). Will return by default a string (which coincidentally happens to be a sequence of strings, albeit one-character ones).
- Parameters:
length (int) – length of the word
choice (Sequence[T]) – a range of characters to use. By default is string.ascii_lowercase
join_fun (Callable[[List[T]], T]) – an argument to be called with a list of randomly picked values. Defaults to ‘’.join(args), so your T must be a string. If you’re passing a different type, remember to alter this function because the default one expects strings!
- Returns:
a random word
- Return type:
Sequence[T]
- satella.random.shuffle_together(*args)¶
args, being sequences of equal length, will be permuted in such a way that their indices will still correspond to each other.
So given:
>>> a = [1, 2, 3] >>> b = ['a', 'b', 'c'] >>> c = shuffle_together(a, b)
Might equal
>>> c == [[3, 1, 2], ['c', 'a', 'b']]
- Parameters:
args (Sequence) –
- Return type:
List[List]
satella.warnings module¶
- exception satella.warnings.ExperimentalWarning¶
Bases:
FutureWarning
This feature is experimental!
- satella.warnings.mark_temporarily_disabled(reason='')¶
A decorator to mark a function unusable due to some forthcoming changes.
A call to the function will raise NotImplementedError
and a mention of this function in code will mark a function disabled for whatever reason.
Usage:
>>> @mark_temporarily_disabled('Due to analysis schema changes') >>> def serialize_report(analysis): >>> ...
- Parameters:
reason (str) –