Documentation
Overview
md.processor.throttle component provides task provider throttling API.
Warning
Current implementation is based on time wasting model that is not better solution. Consider to use sheduling model instead
Architecture overview
Usage example
Throttle provider
import typing
import md.processor
import md.processor.throttle
class Task(md.processor.TaskInterface):
def __init__(self, id_: int) -> None:
self.id = id_
class Provider(md.processor.ProviderInterface):
def provide(self) -> typing.Iterator[Task]:
for number in range(42):
yield Task(id_=number)
class Processor(md.processor.ProcessorInterface):
def process(self, task: Task) -> None:
print(f'Processing task: #{task.id!s}')
if __name__ == '__main__':
# arrange
provider = Provider()
processor = Processor()
# use case #1 : basic usage (without throttling usage at all)
worker = md.processor.Worker(
provider=provider,
processor=processor,
)
worker.run()
# use case #2 : use multithreading processing
provider = Provider()
throttle_provider = md.processor.throttle.Provider(
provider=provider,
rate=1/2, # not often than 2 tasks per second
)
worker = md.processor.Worker(
provider=throttle_provider,
processor=processor,
)
worker.run()
Thread safe provider
This library does not provide thread safe access to providers by default, in case when task provider is assumed to be shared and used in few threads, consider to wrap provider with thread safe provider from md.processor.threading component to prevent race-condition issue.