Skip to content
Snippets Groups Projects
Commit fd974694 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

read stdout without overflowing the buffer

parent 86fd76fe
No related branches found
No related tags found
No related merge requests found
processes processes
========= =========
Note that this function will consume stdout as soon as it's
available, so that you don't need to worry about
the buffer overflowing and such.
.. autofunction:: satella.processes.call_and_return_stdout .. autofunction:: satella.processes.call_and_return_stdout
import subprocess import subprocess
import typing as tp import typing as tp
import threading
from .exceptions import ProcessFailed from .exceptions import ProcessFailed
def read_nowait(process: subprocess.Popen, output_list: tp.List[str]):
try:
while process.stdout.readable():
line = process.stdout.readline()
output_list.append(line)
except (IOError, OSError):
pass
def call_and_return_stdout(args: tp.Union[str, tp.List[str]], def call_and_return_stdout(args: tp.Union[str, tp.List[str]],
timeout: tp.Optional[int] = None,
expected_return_code: int = 0, **kwargs) -> tp.Union[bytes, str]: expected_return_code: int = 0, **kwargs) -> tp.Union[bytes, str]:
""" """
Call a process and return it's stdout. Call a process and return it's stdout.
Everything in kwargs will be passed to subprocess.Popen
:param args: arguments to run the program with. If passed a string, it will be split on space. :param args: arguments to run the program with. If passed a string, it will be split on space.
:param timeout: amount of seconds to wait for the process result. If process does not complete
within this time, it will be sent a SIGKILL
:param expected_return_code: an expected return code of this process. 0 is the default. If process :param expected_return_code: an expected return code of this process. 0 is the default. If process
returns anything else, ProcessFailed will be raise returns anything else, ProcessFailed will be raise
:param ProcessFailed: process' result code was different from the requested :param ProcessFailed: process' result code was different from the requested
...@@ -18,12 +33,26 @@ def call_and_return_stdout(args: tp.Union[str, tp.List[str]], ...@@ -18,12 +33,26 @@ def call_and_return_stdout(args: tp.Union[str, tp.List[str]],
args = args.split(' ') args = args.split(' ')
kwargs['capture_output'] = True kwargs['capture_output'] = True
kwargs['stdout'] = subprocess.PIPE
stdout_list = []
proc = subprocess.Popen(args, **kwargs)
reader_thread = threading.Thread(target=read_nowait, args=(proc, stdout_list), daemon=True)
reader_thread.start()
proc = subprocess.run(args, **kwargs) try:
proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait()
if proc.returncode != expected_return_code: if proc.returncode != expected_return_code:
raise ProcessFailed(proc.returncode) raise ProcessFailed(proc.returncode)
else: else:
return proc.stdout if kwargs.get('encoding', None) is None:
return b''.join(stdout_list)
else:
return ''.join(stdout_list)
...@@ -8,3 +8,6 @@ class TestProcesses(unittest.TestCase): ...@@ -8,3 +8,6 @@ class TestProcesses(unittest.TestCase):
def test_return_stdout(self): def test_return_stdout(self):
output = call_and_return_stdout('cat /proc/meminfo', shell=True, encoding='utf8') output = call_and_return_stdout('cat /proc/meminfo', shell=True, encoding='utf8')
self.assertIn('MemTotal', output) self.assertIn('MemTotal', output)
output = call_and_return_stdout('cat /proc/meminfo', shell=True)
self.assertIn(b'MemTotal', output)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment