md.message.rabbitmq.pika
md.message.rabbitmq.pika component provides RabbitMQ message queue implementation of md.message contracts as adapter on top of pika client and provides few useful tools out from box.
Architecture overview
Installation
Usage example
Replicate queue message
Example below reuses connection to rabbitmq to read queue-to-read
queue
in two threads and copy data from it to queue-to-write
queue.
#!/usr/bin/env python3
import queue
import ssl
import threading
import json
import signal
import pika
import pika.channel
import md.message
import md.message.rabbitmq.pika
class MessageHandler(md.message.HandleInterface):
def __init__(self, send_queue: md.message.SendInterface) -> None:
self._send_queue = send_queue
def handle(self, message: md.message.MessageInterface) -> None:
payload = message.get_payload()
assert isinstance(payload, bytes)
data = json.loads(payload.decode('utf-8'))
print(data)
self._send_queue.send(message=message)
def main() -> int:
rmq_user = 'user'
rmq_password = 'password'
rmq_host = 'server.tld'
rmq_vhost = 'server.tld'
rmq_port = 5671
workers_count = 2
rmq_read_queue = 'queue-to-read'
rmq_exchange = 'queue-to-write'
rmq_routing_key = 'queue-to-write'
# arrange pika
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_options = pika.SSLOptions(context)
credentials = pika.PlainCredentials(username=rmq_user, password=rmq_password)
connection_parameters = pika.ConnectionParameters(
host=rmq_host,
credentials=credentials,
ssl_options=ssl_options,
virtual_host=rmq_vhost,
port=rmq_port,
)
connection = pika.BlockingConnection(connection_parameters)
# arrange internals
internal_queue = queue.Queue()
consumer = md.message.rabbitmq.pika.Consumer(
connection=connection,
internal_queue=internal_queue,
queue_name=rmq_read_queue,
prefetch_count=workers_count,
)
message_queue_receive = md.message.rabbitmq.pika.Receive(consumer=consumer, internal_queue=internal_queue)
message_queue_send = md.message.rabbitmq.pika.Send(connection=connection, exchange=rmq_exchange, routing_key=rmq_routing_key)
message_handler = MessageHandler(send_queue=message_queue_send)
application = md.message.ReceiveApplication(
receive_message=message_queue_receive,
handle_message=message_handler,
retry_exception=None
)
# act
signal.signal(signal.SIGINT, lambda: message_queue_receive.stop()) # 2
signal.signal(signal.SIGTERM, lambda: message_queue_receive.stop()) # 15
thread_set = set()
try:
message_queue_receive.start()
for _ in range(workers_count):
thread = threading.Thread(target=application.run, daemon=True)
thread_set.add(thread)
thread.start()
for thread in thread_set:
thread.join()
message_queue_receive.stop()
finally:
if connection.is_open:
connection.close()
return 0
if __name__ == '__main__':
main()