diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 1e24203dc64daf9d50e7966070aa87d6a2c3e953..664c6925a4ad9e8910f32bebd4a31a5fd92f5764 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -468,6 +468,7 @@ class MessageReceiver(object): self.body.append(payload) self.data_to_go -= len(payload) assert self.data_to_go >= 0 + if self.data_to_go == 0: ack_expected = not self.consumer.no_ack @@ -496,5 +497,5 @@ class MessageReceiver(object): self.state = 0 - # at this point it's safe to clear the body - self.body = [] + # at this point it's safe to clear the body + self.body = [] diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 669eeacee4fdeed0a787d9741eb510942a1243b2..ad569257683452eb347f5215748535741915636c 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -4,6 +4,7 @@ Test things """ from __future__ import print_function, absolute_import, division import six +import os import unittest import time, logging, threading, monotonic from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, ReceivedMessage, Exchange @@ -33,6 +34,19 @@ class TestA(unittest.TestCase): fut.result() con.cancel() + def test_very_long_messages(self): + con, fut = self.c.consume(Queue(u'hello', exclusive=True)) + fut.result() + + data = six.binary_type(os.urandom(20*1024*1024+1423)) + + self.c.publish(Message(data), routing_key=b'hello', confirm=True).result() + +# rmsg = self.c.drain(3) +# rmsg.ack() + +# self.assertEquals(rmsg.body, data) + def test_actually_waits(self): a = monotonic.monotonic()