Documentation
Overview
md.process component defines task processing contracts.
Architecture overview
Installation
Usage example
Image format converting
import typing
import subprocess
import md.processor
class ConvertException(md.processor.ProcessException):
""" Usually occurs when imagick processing failed """
pass
class ConvertImageTask(md.processor.TaskInterface):
def __init__(self, source_path: str, target_path: str) -> None:
self.source_path = source_path
self.target_path = target_path
class ConvertImageProcessor(md.processor.ProcessorInterface):
def process(self, task: ConvertImageTask) -> None:
command_parts = ['convert', task.source_path, task.target_path]
command = subprocess.list2cmdline(seq=command_parts)
process = subprocess.Popen(command_parts, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = process.communicate()
if 0 == process.returncode:
return
raise ConvertException({
'command': command,
'code': process.returncode,
'stdout': out.decode('utf-8'),
'stderr': err.decode('utf-8'),
})
if __name__ == '__main__':
convert_image_processor = ConvertImageProcessor()
# Use case #1: single task processing
try:
convert_image_processor.process(
task=ConvertImageTask(
source_path='../var/image.svg',
target_path='../var/image.png',
)
)
except ConvertException as e:
exit(1)
if __name__ == '__main__':
# Use case #2: task processing with task provider
class ConvertImageTaskProvider(md.processor.ProviderInterface):
def __init__(self, path: str, from_: str, to: str) -> None:
self._path: str = path
self._from: str = from_
self._to: str = to
def provide(self) -> typing.Iterator[ConvertImageTask]:
import os
import re
for file in os.listdir(self._path):
if not file.endswith(self._from):
continue
yield ConvertImageTask(
source_path=self._path + '/' + file,
target_path=self._path + '/' + re.sub(rf'\.{self._from}$', f'.{self._to}', file),
)
convert_image_task_provider = ConvertImageTaskProvider(
path='../var/',
from_='png',
to='svg',
)
for convert_image_task in convert_image_task_provider.provide():
try:
convert_image_processor.process(task=convert_image_task)
except ConvertException as e:
exit(1)
if __name__ == '__main__':
# Use case #3: as same as #2 using `worker` concept
worker = md.processor.Worker(
provider=convert_image_task_provider,
processor=convert_image_processor,
)
try:
worker.run_compute_file_hash()
except ConvertException as e:
exit(1)
Thread safe provider
Python does not provide thread safe access to generators by default, in case when task provider is assumed to be shared and used in few threads, consider to wrap provider with thread safe provider from md.processor.threading component to prevent race-condition issue.