Skip to content

Documentation

Overview

md.processor.throttle component provides task provider throttling API.

Warning

Current implementation is based on time wasting model that is not better solution. Consider to use sheduling model instead

Architecture overview

Architecture overview

Usage example

Throttle provider

import typing

import md.processor
import md.processor.throttle


class Task(md.processor.TaskInterface):
    def __init__(self, id_: int) -> None:
        self.id = id_


class Provider(md.processor.ProviderInterface):
    def provide(self) -> typing.Iterator[Task]:
        for number in range(42):
            yield Task(id_=number)


class Processor(md.processor.ProcessorInterface):
    def process(self, task: Task) -> None:
        print(f'Processing task: #{task.id!s}')


if __name__ == '__main__':
    # arrange
    provider = Provider()
    processor = Processor()

    # use case #1 : basic usage (without throttling usage at all) 
    worker = md.processor.Worker(
        provider=provider,
        processor=processor,
    )
    worker.run()

    # use case #2 : use multithreading processing
    provider = Provider()
    throttle_provider = md.processor.throttle.Provider(
        provider=provider,
        rate=1/2,  # not often than 2 tasks per second
    )
    worker = md.processor.Worker(
        provider=throttle_provider,
        processor=processor,
    )
    worker.run()

Thread safe provider

This library does not provide thread safe access to providers by default, in case when task provider is assumed to be shared and used in few threads, consider to wrap provider with thread safe provider from md.processor.threading component to prevent race-condition issue.