Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
C
coolamqp
Manage
Activity
Members
Labels
Plan
Issues
0
Issue boards
Milestones
Code
Merge requests
0
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
public
coolamqp
Commits
abf7c9f9
Commit
abf7c9f9
authored
7 years ago
by
Piotr Maślanka
Browse files
Options
Downloads
Patches
Plain Diff
less long lines
parent
77656b38
No related branches found
No related tags found
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
MANIFEST.in
+1
-1
1 addition, 1 deletion
MANIFEST.in
coolamqp/attaches/consumer.py
+26
-13
26 additions, 13 deletions
coolamqp/attaches/consumer.py
with
27 additions
and
14 deletions
MANIFEST.in
+
1
−
1
View file @
abf7c9f9
include LICENSE
include LICENSE
include README.md
include README.md
include requirements.txt
include requirements.txt
\ No newline at end of file
This diff is collapsed.
Click to expand it.
coolamqp/attaches/consumer.py
+
26
−
13
View file @
abf7c9f9
...
@@ -287,7 +287,8 @@ class Consumer(Channeler):
...
@@ -287,7 +287,8 @@ class Consumer(Channeler):
def
on_delivery
(
self
,
sth
):
def
on_delivery
(
self
,
sth
):
"""
"""
Callback for delivery-related shit
Callback for delivery-related shit
:param sth: AMQPMethodFrame WITH basic-deliver, AMQPHeaderFrame or AMQPBodyFrame
:param sth: AMQPMethodFrame WITH basic-deliver, AMQPHeaderFrame or
AMQPBodyFrame
"""
"""
if
self
.
receiver
is
None
:
if
self
.
receiver
is
None
:
...
@@ -301,7 +302,8 @@ class Consumer(Channeler):
...
@@ -301,7 +302,8 @@ class Consumer(Channeler):
elif
isinstance
(
sth
,
AMQPHeaderFrame
):
elif
isinstance
(
sth
,
AMQPHeaderFrame
):
self
.
receiver
.
on_head
(
sth
)
self
.
receiver
.
on_head
(
sth
)
# No point in listening for more stuff, that's all the watches even listen for
# No point in listening for more stuff, that's all the watches
# even listen for
def
on_setup
(
self
,
payload
):
def
on_setup
(
self
,
payload
):
"""
Called with different kinds of frames - during setup
"""
"""
Called with different kinds of frames - during setup
"""
...
@@ -390,7 +392,8 @@ class Consumer(Channeler):
...
@@ -390,7 +392,8 @@ class Consumer(Channeler):
# Register watches for receiving shit
# Register watches for receiving shit
# this is multi-shot by default
# this is multi-shot by default
self
.
hb_watch
=
HeaderOrBodyWatch
(
self
.
channel_id
,
self
.
on_delivery
)
self
.
hb_watch
=
HeaderOrBodyWatch
(
self
.
channel_id
,
self
.
on_delivery
)
self
.
connection
.
watch
(
self
.
hb_watch
)
self
.
connection
.
watch
(
self
.
hb_watch
)
# multi-shot watches need manual cleanup!
# multi-shot watches need manual cleanup!
...
@@ -438,21 +441,28 @@ class MessageReceiver(object):
...
@@ -438,21 +441,28 @@ class MessageReceiver(object):
self
.
bdeliver
=
None
# payload of Basic-Deliver
self
.
bdeliver
=
None
# payload of Basic-Deliver
self
.
header
=
None
# AMQPHeaderFrame
self
.
header
=
None
# AMQPHeaderFrame
if
consumer
.
body_receive_mode
==
BodyReceiveMode
.
MEMORYVIEW
:
if
consumer
.
body_receive_mode
==
BodyReceiveMode
.
MEMORYVIEW
:
self
.
body
=
None
# None is an important sign - first piece of message
self
.
body
=
None
# None is an important sign - first piece of
# message
else
:
else
:
self
.
body
=
[]
# list of payloads
self
.
body
=
[]
# list of payloads
self
.
data_to_go
=
None
# set on receiving header, how much bytes we need yet
self
.
data_to_go
=
None
# set on receiving header, how much bytes we
# need yet
self
.
message_size
=
None
# in bytes, of currently received message
self
.
message_size
=
None
# in bytes, of currently received message
self
.
offset
=
0
# used only in MEMORYVIEW mode - pointer to self.body (which would be a buffer)
self
.
offset
=
0
# used only in MEMORYVIEW mode - pointer to self.body
# (which would be a buffer)
self
.
acks_pending
=
set
()
# list of things to ack/reject
self
.
acks_pending
=
set
()
# list of things to ack/reject
self
.
recv_mode
=
consumer
.
body_receive_mode
self
.
recv_mode
=
consumer
.
body_receive_mode
# if BYTES, pieces (as mvs) are received into .body and b''.join()ed at the end
# if BYTES, pieces (as mvs) are received into .body and b''.join()ed
# at the end
# if MEMORYVIEW:
# if MEMORYVIEW:
# upon first piece, if it's a single-frame message, it's returned at once
# upon first piece, if it's a single-frame message,
# if multiframe, self.body is made into a buffer and further are received into it
# it's returned at once
# if LIST_OF_MEMORYVIEW, pieces (as mvs) are stored into .body, and that's returned
# if multiframe, self.body is made into a buffer
# and further are received into it
# if LIST_OF_MEMORYVIEW, pieces (as mvs) are stored into .body, and
# that's returned
def
on_gone
(
self
):
def
on_gone
(
self
):
"""
Called by Consumer to inform upon discarding this receiver
"""
"""
Called by Consumer to inform upon discarding this receiver
"""
...
@@ -462,7 +472,8 @@ class MessageReceiver(object):
...
@@ -462,7 +472,8 @@ class MessageReceiver(object):
"""
"""
This crafts a constructor for confirming messages.
This crafts a constructor for confirming messages.
This should return a callable/0, whose calling will ACK or REJECT the message.
This should return a callable/0, whose calling will ACK or REJECT the
message.
Calling it multiple times should have no ill effect.
Calling it multiple times should have no ill effect.
If this receiver is long gone,
If this receiver is long gone,
...
@@ -496,7 +507,8 @@ class MessageReceiver(object):
...
@@ -496,7 +507,8 @@ class MessageReceiver(object):
self
.
state
=
2
self
.
state
=
2
if
self
.
header
.
body_size
==
0
:
if
self
.
header
.
body_size
==
0
:
# An empty message is no common guest. It won't have a BODY field though...
# An empty message is no common guest. It won't have a BODY field
# though...
self
.
on_body
(
EMPTY_MEMORYVIEW
)
# trigger it manually
self
.
on_body
(
EMPTY_MEMORYVIEW
)
# trigger it manually
def
on_basic_deliver
(
self
,
payload
):
def
on_basic_deliver
(
self
,
payload
):
...
@@ -546,7 +558,8 @@ class MessageReceiver(object):
...
@@ -546,7 +558,8 @@ class MessageReceiver(object):
# common case :)
# common case :)
body
=
self
.
body
[
0
].
tobytes
()
body
=
self
.
body
[
0
].
tobytes
()
else
:
else
:
# since b''.join() with list comprehension and .tobytes() would create
# since b''.join() with list comprehension and .tobytes()
# would create
# an extra copy of string
# an extra copy of string
bio
=
io
.
BytesIO
()
bio
=
io
.
BytesIO
()
for
mv
in
body
:
for
mv
in
body
:
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment