diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index 32458a3dc5cfee988707459ade8dedde9af33cd2..5f04e5a858ada42094fd10aeefff01113946214a 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -130,6 +130,7 @@ class Publisher(Channeler, Synchronized): ]) # todo optimize it - if there's only one frame it can with previous send + # no frames will be sent if body.length == 0 for body in bodies: self.connection.send([AMQPBodyFrame(self.channel_id, body)]) @@ -219,6 +220,12 @@ class Publisher(Channeler, Synchronized): else: raise Exception(u'Invalid mode') + def on_operational(self, operational): + state = {True: u'up', False: u'down'}[operational] + mode = {Publisher.MODE_NOACK: u'noack', Publisher.MODE_CNPUB: u'cnpub'}[self.mode] + + logger.info('Publisher %s is %s', mode, state) + def on_setup(self, payload): # Assert that mode is OK @@ -230,6 +237,8 @@ class Publisher(Channeler, Synchronized): self.critically_failed = True return + + if isinstance(payload, ChannelOpenOk): # Ok, if this has a mode different from MODE_NOACK, we need to additionally set up # the functionality. @@ -243,6 +252,7 @@ class Publisher(Channeler, Synchronized): elif self.mode == Publisher.MODE_CNPUB: # Because only in this case it makes sense to check for MODE_CNPUB + if isinstance(payload, ConfirmSelectOk): # A-OK! Boot it. self.state = ST_ONLINE diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 49b990044f7224495cb0ff5a7cc9befa21f9ab8d..a734a8c8244f18c566ec22abfcd08522e8682d76 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -23,6 +23,7 @@ class Callable(object): Add a bunch of callables to one list, and just invoke'm. INTERNAL USE ONLY """ + def __init__(self, oneshots=False): """:param oneshots: if True, callables will be called and discarded""" self.callables = [] @@ -125,19 +126,20 @@ class Exchange(object): This is hashable. """ - direct = None # the direct exchange + direct = None # the direct exchange def __init__(self, name=u'', type=b'direct', durable=True, auto_delete=False): self.name = name if isinstance(type, six.text_type): type = type.encode('utf8') warnings.warn(u'type should be a binary type') - self.type = type # must be bytes + self.type = type # must be bytes self.durable = durable self.auto_delete = auto_delete def __repr__(self): - return u'Exchange(%s, %s, %s, %s)' % (repr(self.name), repr(self.type), repr(self.durable), repr(self.auto_delete)) + return u'Exchange(%s, %s, %s, %s)' % ( + repr(self.name), repr(self.type), repr(self.durable), repr(self.auto_delete)) def __hash__(self): return self.name.__hash__() @@ -168,7 +170,7 @@ class Queue(object): :param exclusive: Is this queue exclusive? :param auto_delete: Is this queue auto_delete ? """ - self.name = name.encode('utf8') #: public, this is of type bytes ALWAYS + self.name = name.encode('utf8') #: public, this is of type bytes ALWAYS # if name is '', this will be filled in with broker-generated name upon declaration self.durable = durable self.exchange = exchange @@ -177,7 +179,7 @@ class Queue(object): self.anonymous = name == '' # if this queue is anonymous, it must be regenerated upon reconnect - self.consumer_tag = name if name != '' else uuid.uuid4().hex # consumer tag to use in AMQP comms + self.consumer_tag = name if name != '' else uuid.uuid4().hex # consumer tag to use in AMQP comms def __eq__(self, other): return (self.name == other.name) and (type(self) == type(other)) @@ -195,7 +197,6 @@ class Future(concurrent.futures.Future): """ __slots__ = ('lock', 'completed', 'successfully', '_result', 'running', 'callables', 'cancelled') - def __init__(self): self.lock = threading.Lock() self.lock.acquire() @@ -335,7 +336,7 @@ class NodeDefinition(object): if u':' in self.host: host, port = self.host.split(u':', 1) self.port = int(port) - # else get that port from kwargs + # else get that port from kwargs else: raise ValueError(u'What did you exactly pass?') @@ -355,9 +356,10 @@ class NodeDefinition(object): :return: NodeDefinition instance :raise ValueError: invalid string """ - - return NodeDefinition(host=host, port=5672, user=user, password=passw, virtual_host=virtual_host, heartbeat=kwargs.get('heartbeat', None)) + return NodeDefinition(host=host, port=5672, user=user, password=passw, virtual_host=virtual_host, + heartbeat=kwargs.get('heartbeat', None)) def __str__(self): - return six.text_type(b'amqp://%s:%s@%s/%s'.encode('utf8') % (self.host, self.port, self.user, self.virtual_host)) \ No newline at end of file + return six.text_type( + b'amqp://%s:%s@%s/%s'.encode('utf8') % (self.host, self.port, self.user, self.virtual_host))