import logging import os import platform import signal import sys from satella.instrumentation import install_dump_frames_on from satella.instrumentation.memory import MemoryPressureManager, CustomCondition, All, Any, \ dump_memory_on, get_size import time import unittest logger = logging.getLogger(__name__) class OnDemandCondition(CustomCondition): def __init__(self): self.value = False super().__init__(lambda: self.value) def can_fire(self, *args) -> bool: return self.value class TestMemory(unittest.TestCase): @unittest.skipIf(platform.python_implementation(), 'This will not work on PyPy') def test_get_size_dict(self): a = {'aba': 'aba'} class Aba: def __init__(self): self.aba = 'aba' self.assertGreater(get_size(a), 6) self.assertGreater(get_size(Aba()), 6) @unittest.skipIf(sys.platform == 'win32', 'testable only on unices') def test_install_dump_on(self): install_dump_frames_on(signal.SIGUSR1) os.kill(os.getpid(), signal.SIGUSR1) @unittest.skipIf(platform.python_implementation() == 'PyPy', 'does not work on PyPy') def test_get_size(self): a = 'a' * 1024 self.assertGreaterEqual(get_size(a), 1024) @unittest.skipIf(platform.python_implementation() != 'PyPy', 'requires PyPy') def test_get_size_runtime_error(self): a = 'a' * 1024 self.assertRaises(RuntimeError, lambda: get_size(a)) def test_dump_memory(self): dump_memory_on() def test_memory(self): odc = OnDemandCondition() a = {'memory': False, 'calls': 0, 'improved': False, 'times_entered_1': 0, 'level_2_engaged': False, 'level_2_confirmed': False, 'cancelled': 0, 'mem_normal': 0} class ObjectToCleanup: def __init__(self): self.cleaned_up = False def cleanup(self): self.cleaned_up = True obj1 = ObjectToCleanup() obj2 = ObjectToCleanup() cc = CustomCondition(lambda: a['level_2_engaged']) MemoryPressureManager(None, [odc, All(cc, Any(cc, cc))], 2) MemoryPressureManager.cleanup_on_entered(1, obj1) MemoryPressureManager.cleanup_on_entered(2, obj2) def memory_normal(): nonlocal a a['mem_normal'] += 1 def cancel(): nonlocal a a['cancelled'] += 1 MemoryPressureManager.register_on_memory_normal(memory_normal) cc = MemoryPressureManager.register_on_entered_severity(1)(cancel) @MemoryPressureManager.register_on_entered_severity(2) def call_on_level_2(): a['level_2_confirmed'] = True @MemoryPressureManager.register_on_remaining_in_severity(1) def call_on_memory_still(): a['calls'] += 1 @MemoryPressureManager.register_on_entered_severity(1) def call_on_no_memory(): a['memory'] = True a['times_entered_1'] += 1 @MemoryPressureManager.register_on_left_severity(1) def call_improved(): a['improved'] = True self.assertFalse(a['memory']) self.assertFalse(a['improved']) self.assertEqual(a['mem_normal'], 0) time.sleep(3) odc.value = True time.sleep(5) self.assertEqual(a['cancelled'], 1) cc.cancel() self.assertTrue(a['memory']) self.assertFalse(a['improved']) self.assertGreater(a['calls'], 0) self.assertTrue(obj1.cleaned_up) self.assertEqual(a['times_entered_1'], 1) del obj1 odc.value = False time.sleep(3) self.assertTrue(a['improved']) self.assertEqual(a['times_entered_1'], 1) self.assertTrue(a['memory']) self.assertEqual(a['mem_normal'], 1) a['level_2_engaged'] = True time.sleep(3) self.assertLessEqual(len(MemoryPressureManager().objects_to_cleanup_on_entered[1]), 1) self.assertEqual(MemoryPressureManager().severity_level, 2) self.assertEqual(a['cancelled'], 1) self.assertEqual(a['times_entered_1'], 2) self.assertTrue(a['level_2_confirmed']) self.assertEqual(a['mem_normal'], 1) self.assertTrue(obj2.cleaned_up)