Skip to content
Snippets Groups Projects
Commit f0060c6d authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

Added bidirectional queues (pipes)

parent 4fc908ba
No related branches found
No related tags found
No related merge requests found
from threading import Thread, Lock, RLock from threading import Thread, Lock, RLock
from satella.threads.pipe import Pipe, create_pipes
class BaseThread(Thread): class BaseThread(Thread):
"""Thread with internal termination flag""" """Thread with internal termination flag"""
......
import Queue
class Pipe(Queue.Queue):
"""Represents a bidirectional inter-thread queue.
Essentially it is a Queue.Queue, so that .put elements
appear in the other queue.
This will contain cyclic references, so it should be
kinda persistent.
Call .close() to terminate that cyclic reference.
"""
Empty = Queue.Empty
def __init__(self):
Queue.Queue.__init__(self)
self.other_queue = None
def put(self, item, block=True, timeout=None):
Queue.Queue.put(self.other_queue, item, block, timeout)
def create_pipes():
"""Returns a pair (tuple) of Pipes connecting to each
other"""
a = Pipe()
b = Pipe()
a.other_queue = b
b.other_queue = a
return a, b
\ No newline at end of file
from monitor import MonitorTest from monitor import MonitorTest
from basethread import ThreadingTest from basethread import ThreadingTest
\ No newline at end of file from pipe import PipeTest
\ No newline at end of file
from satella.threads import create_pipes
from time import sleep
import unittest
class PipeTest(unittest.TestCase):
def test_pipe(self):
"""Tests basic pipe predicaments"""
a, b = create_pipes()
a.put('Lol')
self.assertEqual(b.get(), 'Lol')
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment