Documentation
Overview
md.processor.threading component provides thread safe provider for task processing.
Architecture overview
Usage example
Thread safe provider
import typing
import md.processor
import md.processor.threading
import md.python.threading
class Task(md.processor.TaskInterface):
def __init__(self, id_: int) -> None:
self.id = id_
class Provider(md.processor.ProviderInterface):
def provide(self) -> typing.Iterator[Task]:
for number in range(42):
yield Task(id_=number)
class Processor(md.processor.ProcessorInterface):
def process(self, task: Task) -> None:
print(f'Processing task: #{task.id!s}')
if __name__ == '__main__':
# arrange
provider = Provider()
processor = Processor()
# use case #1 : basic usage (without threading usage at all)
worker = md.processor.Worker(
provider=provider,
processor=processor,
)
worker.run()
# use case #2 : use multithreading processing
provider = Provider()
thread_safe_provider = md.processor.threading.Provider(
provider=provider,
)
worker = md.processor.Worker(
provider=thread_safe_provider,
processor=processor,
)
md.python.threading.run_blocking(target=worker.run, threads=10)