From 191885b49e05fc378d25ae03a56d8f44f016149c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Mon, 4 Mar 2024 08:59:46 +0100 Subject: [PATCH] fix DevNullFilelikeObject and AutoflushFile --- satella/files.py | 77 ++++++++++++++++++++++++++------------------- tests/test_files.py | 11 +++---- 2 files changed, 50 insertions(+), 38 deletions(-) diff --git a/satella/files.py b/satella/files.py index 3d364284..dcc5edce 100644 --- a/satella/files.py +++ b/satella/files.py @@ -41,18 +41,19 @@ class DevNullFilelikeObject(io.FileIO): A /dev/null filelike object. For multiple uses. :param binary: is this a binary file - :param ignore_typing_issues: """ - __slots__ = 'is_closed', 'binary', 'ignore_typing_issues' - def __init__(self, binary: bool = False, ignore_typing_issues: bool = False): + def __init__(self, binary: bool = False): self.is_closed = False self.binary = binary - self.ignore_typing_issues = ignore_typing_issues @closed_devnull def tell(self) -> int: - """Return the current file offset""" + """ + Return the current file offset + + :return: the current file offset + """ return 0 @closed_devnull @@ -91,15 +92,13 @@ class DevNullFilelikeObject(io.FileIO): This will raise a RuntimeWarning warning upon writing invalid type. :raises ValueError: this object has been closed + :raises TypeError: eg. type set to binary and text provided to write :return: length of written content """ - if not self.ignore_typing_issues: - if isinstance(y, bytes) and not self.binary: - warnings.warn('Non binary data written to stream, but required binary', RuntimeWarning) - elif isinstance(y, str) and self.binary: - warnings.warn('Non binary data written to stream, but required binary', RuntimeWarning) - else: - warnings.warn('Non binary data written to stream, but required binary', RuntimeWarning) + if not isinstance(y, str) and not self.binary: + raise TypeError(f'Expected text data, but got {type(y)}') + elif not isinstance(y, bytes) and self.binary: + raise TypeError(f'Expected binary data, but got {type(y)}') return len(y) @closed_devnull @@ -392,6 +391,13 @@ def close_file_after(fun): return inner +def open_file(fun): + @wraps(fun) + def inner(self, *args, **kwargs): + return fun(self, self._open_file(), *args, **kwargs) + return inner + + class AutoflushFile(Proxy[io.FileIO]): """ A file that is supposed to be closed after each write command issued. @@ -427,7 +433,7 @@ class AutoflushFile(Proxy[io.FileIO]): with reraise_as(KeyError, ValueError, f'Unsupported mode "{mode}"'): mode = {'w': 'a', 'wb': 'ab', 'w+': 'a+', 'wb+': 'ab+', 'a': 'a', 'ab': 'ab'}[mode] - self.__dict__['con_args'] = (file, mode, *con_args) + self.__dict__['con_args'] = file, mode, *con_args fle = self._open_file() super().__init__(fle) @@ -436,24 +442,31 @@ class AutoflushFile(Proxy[io.FileIO]): @is_closed_getter @close_file_after - def seek(self, *args, **kwargs) -> int: - """Seek to a provided position within the file""" - fle = self._open_file() - v = fle.seek(*args, **kwargs) + @open_file + def seek(self, fle: AutoflushFile, offset: int, whence: int = os.SEEK_SET) -> int: + """ + Seek to a provided position within the file + + :param offset: offset to seek file + :param whence: how to count. Refer to documentation of file.seek() + + :return: current pointer + """ + v = fle.seek(offset, whence) self.__dict__['pointer'] = fle.tell() return v @is_closed_getter @close_file_after - def read(self, *args, **kwargs) -> tp.Union[str, bytes]: + @open_file + def read(self, fle: AutoflushFile, *args, **kwargs) -> tp.Union[str, bytes]: """ Read a file, returning the read-in data :return: data readed """ - file = self._open_file() - p = file.read(*args, **kwargs) - self.__dict__['pointer'] = file.tell() + p = fle.read(*args, **kwargs) + self.__dict__['pointer'] = fle.tell() return p def _get_file(self) -> tp.Optional[AutoflushFile]: @@ -461,9 +474,9 @@ class AutoflushFile(Proxy[io.FileIO]): @is_closed_getter @close_file_after - def readall(self) -> tp.Union[str, bytes]: + @open_file + def readall(self, fle) -> tp.Union[str, bytes]: """Read all contents into the file""" - file = self._open_file() return file.readall() def _open_file(self) -> io.FileIO: @@ -484,32 +497,32 @@ class AutoflushFile(Proxy[io.FileIO]): self.__dict__['_Proxy__obj'] = None @is_closed_getter - def close(self) -> None: + @open_file + @close_file_after + def close(self, fle) -> None: # pylint: disable=unused-argument """ Closes the file. """ - self._open_file() - self._close_file() self.__dict__['closed'] = True @is_closed_getter @close_file_after - def write(self, *args, **kwargs) -> int: + @open_file + def write(self, fle, *args, **kwargs) -> int: """ Write a particular value to the file, close it afterwards. :return: amount of bytes written """ - file = self._open_file() - val = file.write(*args, **kwargs) - self.__dict__['pointer'] = file.tell() + val = fle.write(*args, **kwargs) + self.__dict__['pointer'] = fle.tell() return val @is_closed_getter @close_file_after - def truncate(self, __size: tp.Optional[int] = None) -> int: + @open_file + def truncate(self, fle, __size: tp.Optional[int] = None) -> int: """Truncate file to __size starting bytes""" - fle = self._open_file() v = fle.truncate(__size) self.__dict__['pointer'] = fle.tell() return v diff --git a/tests/test_files.py b/tests/test_files.py index 5587299b..1d7d4606 100644 --- a/tests/test_files.py +++ b/tests/test_files.py @@ -70,10 +70,10 @@ class TestFiles(unittest.TestCase): def test_devnullfilelikeobject_2(self): null = DevNullFilelikeObject(binary=True) null.write(b'test') - null.write('ala') + self.assertRaises(TypeError, lambda: null.write('ala')) - null = DevNullFilelikeObject(ignore_typing_issues=True) - null.write(b'test') + null = DevNullFilelikeObject() + self.assertRaises(TypeError, lambda: null.write(b'test')) null.write('test') def test_devnullfilelikeobject_3(self): @@ -84,8 +84,8 @@ class TestFiles(unittest.TestCase): assert null.seekable() assert null.truncate(0) == 0 self.assertEqual(null.write(b'ala'), 3) - self.assertEqual(null.read(), '') - self.assertEqual(null.read(7), '') + self.assertEqual(null.read(), b'') + self.assertEqual(null.read(7), b'') null.flush() null.close() self.assertRaises(ValueError, lambda: null.write('test')) @@ -93,7 +93,6 @@ class TestFiles(unittest.TestCase): self.assertRaises(ValueError, lambda: null.read()) null.close() - def try_directory(self): os.system('mkdir test') self.assertRaises(FileNotFoundError, lambda: read_in_file('test')) -- GitLab