Skip to content

Documentation

Overview

md.processor.threading component provides thread safe provider for task processing.

Architecture overview

Architecture overview

Usage example

Thread safe provider

import typing

import md.processor
import md.processor.threading
import md.python.threading


class Task(md.processor.TaskInterface):
    def __init__(self, id_: int) -> None:
        self.id = id_


class Provider(md.processor.ProviderInterface):
    def provide(self) -> typing.Iterator[Task]:
        for number in range(42):
            yield Task(id_=number)


class Processor(md.processor.ProcessorInterface):
    def process(self, task: Task) -> None:
        print(f'Processing task: #{task.id!s}')


if __name__ == '__main__':
    # arrange
    provider = Provider()
    processor = Processor()

    # use case #1 : basic usage (without threading usage at all) 
    worker = md.processor.Worker(
        provider=provider,
        processor=processor,
    )

    worker.run()

    # use case #2 : use multithreading processing
    provider = Provider()
    thread_safe_provider = md.processor.threading.Provider(
        provider=provider,
    )
    worker = md.processor.Worker(
        provider=thread_safe_provider,
        processor=processor,
    )
    md.python.threading.run_blocking(target=worker.run, threads=10)

Multithread processing

import md.processor.threading

md.python.threading.run_blocking(target=worker.run_compute_file_hash, threads=10, kwargs=kwargs)