Skip to content
Snippets Groups Projects
Commit 0e803194 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

test hang_until_sig

parent 06f4314b
No related branches found
No related tags found
No related merge requests found
.git/
.idea/
...@@ -5,3 +5,5 @@ ...@@ -5,3 +5,5 @@
within given timeout within given timeout
* added power support to `AtomicNumber` * added power support to `AtomicNumber`
* slight refactor in `AtomicNumber` * slight refactor in `AtomicNumber`
* removed `end_on_keyboard_interrupt` from `hang_until_sig`
* unit test for `hang_until_sig` at last
FROM python:3.8
ADD requirements.txt /app/requirements.txt
RUN pip install -r /app/requirements.txt
RUN pip install nose2 mock coverage
ADD satella /app/satella
ADD tests /app/tests
WORKDIR /app
CMD ["coverage", "run", "-m", "nose2", "-vv"]
...@@ -34,3 +34,9 @@ is available for the brave souls that do decide to use this library. ...@@ -34,3 +34,9 @@ is available for the brave souls that do decide to use this library.
See [LICENSE](LICENSE) for text of the license. This library may contain See [LICENSE](LICENSE) for text of the license. This library may contain
code taken from elsewhere on the internets, so this is copyright (c) respective authors. code taken from elsewhere on the internets, so this is copyright (c) respective authors.
# Running unit tests
Just build and run the attached
[Dockerfile](Dockerfile).
These tests run on Python 3.8
\ No newline at end of file
...@@ -30,3 +30,8 @@ time_us ...@@ -30,3 +30,8 @@ time_us
Syntactic sugar for `int(time.time()*1000000)` Syntactic sugar for `int(time.time()*1000000)`
.. autofunction:: satella.time.time_us .. autofunction:: satella.time.time_us
sleep
-----
.. autofunction:: satella.time.sleep
__version__ = '2.8.13_a6' __version__ = '2.8.13_a7'
import signal import signal
import time import time
import logging
import typing as tp import typing as tp
logger = logging.getLogger(__name__)
end = False end = False
def __sighandler(a, b): def __sighandler(a, b):
logger.warning('Handling a signal')
global end global end
end = True end = True
def hang_until_sig(extra_signals: tp.Optional[tp.List[int]] = None, def hang_until_sig(extra_signals: tp.Optional[tp.List[int]] = None) -> None:
end_on_keyboard_interrupt: bool = True) -> None:
""" """
Will hang until this process receives SIGTERM or SIGINT. Will hang until this process receives SIGTERM or SIGINT.
If you pass extra signal IDs (signal.SIG*) with extra_signals, If you pass extra signal IDs (signal.SIG*) with extra_signals,
then also on those signals this call will release. then also on those signals this call will release.
:param extra_signals: a list of extra signals to listen to :param extra_signals: a list of extra signals to listen to
:param end_on_keyboard_interrupt: whether to consider receiving a KeyboardInterrupt as
a signal to finish
""" """
global end global end
extra_signals = extra_signals or [] extra_signals = extra_signals or ()
signal.signal(signal.SIGTERM, __sighandler) signal.signal(signal.SIGTERM, __sighandler)
signal.signal(signal.SIGINT, __sighandler) signal.signal(signal.SIGINT, __sighandler)
...@@ -31,10 +31,6 @@ def hang_until_sig(extra_signals: tp.Optional[tp.List[int]] = None, ...@@ -31,10 +31,6 @@ def hang_until_sig(extra_signals: tp.Optional[tp.List[int]] = None,
signal.signal(s, __sighandler) signal.signal(s, __sighandler)
while not end: while not end:
try: time.sleep(0.5)
time.sleep(0.5)
except KeyboardInterrupt:
if end_on_keyboard_interrupt:
end = True
end = False # reset for next use end = False # reset for next use
...@@ -3,9 +3,35 @@ import inspect ...@@ -3,9 +3,35 @@ import inspect
import time import time
import typing as tp import typing as tp
from concurrent.futures import Future from concurrent.futures import Future
from functools import wraps from satella.coding.decorators import wraps
__all__ = ['measure', 'time_as_int', 'time_ms'] __all__ = ['measure', 'time_as_int', 'time_ms', 'sleep']
def sleep(x: float, abort_on_interrupt: bool = False) -> bool:
"""
Sleep for given interval.
This won't be interrupted by KeyboardInterrupted, and always will sleep for given time interval.
This will return at once if x is negative
:param x: the interval to wait
:param abort_on_interrupt: whether to abort at once when KeyboardInterrupt is seen
:returns: whether the function has completed its sleep naturally. False is seen on
aborts thanks to KeyboardInterrupt only if abort_on_interrupt is True
"""
if x < 0:
return
with measure() as measurement:
while measurement() < x:
try:
time.sleep(x - measurement())
except KeyboardInterrupt:
if abort_on_interrupt:
return False
pass
return True
def time_as_int() -> int: def time_as_int() -> int:
......
...@@ -3,8 +3,8 @@ import signal ...@@ -3,8 +3,8 @@ import signal
import sys import sys
import threading import threading
import unittest import unittest
import time
from satella import time
from satella.os import hang_until_sig from satella.os import hang_until_sig
...@@ -12,10 +12,10 @@ class TestHangUntilSig(unittest.TestCase): ...@@ -12,10 +12,10 @@ class TestHangUntilSig(unittest.TestCase):
@unittest.skipIf('win' in sys.platform, 'Cannot test on Windows') @unittest.skipIf('win' in sys.platform, 'Cannot test on Windows')
def test_hang_until_sig(self): def test_hang_until_sig(self):
def send_sig(): def send_sig():
time.sleep(1) time.sleep(1)
os.kill(os.getpid(), signal.SIGINT) os.kill(0, signal.SIGTERM)
threading.Thread(target=send_sig).start() threading.Thread(target=send_sig).start()
hang_until_sig() hang_until_sig()
self.assertEqual(True, False)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment