md.message
md.message component defines message and message queue interaction contracts and
provides few useful tools out of the box.
Architecture overview
Installation
Usage
Receive and process message
Example of implementation (see examples/retrieve-and-process-message.py):
import typing
import md.message
# Implement contracts:
class ExampleMessage(md.message.MessageInterface[str]):
def __init__(self, payload: str) -> None:
self._payload = payload
def get_payload(self) -> str:
return self._payload
class ExampleReceive(md.message.ReceiveInterface[ExampleMessage]):
def receive(self) -> typing.Iterable[ExampleMessage]:
for i in range(42):
yield ExampleMessage(f'message #{i}')
def accept(self, message: ExampleMessage) -> None: # ack
print('Receive: Message accepted: ' + message.get_payload())
def reject(self, message: ExampleMessage) -> None: # nack
print('Receive: Message rejected: ' + message.get_payload())
class ExampleHandle(md.message.HandleInterface[ExampleMessage]):
def __init__(self) -> None:
self.__counter = 0
def handle(self, message: ExampleMessage) -> None:
# ... example implementation will fail each 2nd message
try:
if self.__counter % 2 == 0:
print('Handle: Message processed (success): ' + message.get_payload())
else:
print('Handle: Message processed (failure): ' + message.get_payload())
raise RuntimeError
finally:
self.__counter += 1
if __name__ == '__main__':
# example 1: receive & handle message:
receive_message: md.message.ReceiveInterface[ExampleMessage] = ExampleReceive()
handle_message: md.message.HandleInterface[ExampleMessage] = ExampleHandle()
receive_application: md.message.ReceiveApplication[ExampleMessage] = (
md.message.ReceiveApplication(
receive_message=receive_message,
handle_message=handle_message,
)
)
receive_application.run()
Message send
Example of implementation (see examples/send-message.py):
import md.message
# Implement contracts:
class ExampleMessage(md.message.MessageInterface[str]):
def __init__(self, payload: str) -> None:
self._payload = payload
def get_payload(self) -> str:
return self._payload
class ExampleSend(md.message.SendInterface[ExampleMessage]):
def send(self, message: ExampleMessage) -> None:
print('Message sent: ' + message.get_payload())
if __name__ == '__main__':
# example 2: send message:
send: md.message.SendInterface[ExampleMessage] = ExampleSend()
send.send(message=ExampleMessage(payload='example message'))