Source code for mtap.processing._pipeline

#  Copyright 2021 Regents of the University of Minnesota.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

import logging
import multiprocessing
import pathlib
import signal
import traceback
from abc import ABC, abstractmethod
from concurrent.futures import Future
from datetime import datetime
from logging.handlers import QueueHandler, QueueListener
from threading import Lock, Condition
from typing import (
    Optional,
    Dict,
    Any,
    Union,
    List,
    MutableSequence,
    Iterable,
    Callable,
    ContextManager,
    TYPE_CHECKING,
    overload,
)

from tqdm import tqdm

from mtap import EventsClient, Config
from mtap.data import Event
from mtap.processing import _base, _runners, _timing

if TYPE_CHECKING:
    import mtap
    from mtap import processing

logger = logging.getLogger('mtap.processing')


[docs]class RemoteProcessor(_base.ComponentDescriptor): """A configuration for a remote processor that the pipeline will connect to in order to perform processing. Args: processor_name (str): The identifier used for health checking and discovery. address (~typing.Optional[str]): Optionally an address to use, will use service discovery configuration to locate processors if this is None / omitted. component_id (~typing.Optional[str]): How the processor's results will be identified locally. Will be modified to be unique if it is not unique relative to other component in a pipeline. params (~typing.Optional[dict, Any]): An optional parameter dictionary that will be passed to the processor as parameters with every event or document processed. Values should be json-serializable. Attributes: processor_name (str): The identifier used for health checking and discovery. address (~typing.Optional[str]): Optionally an address to use, will use service discovery configuration to locate processors if this is None / omitted. component_id (~typing.Optional[str]): How the processor's results will be identified locally. Will be modified to be unique if it is not unique relative to other component in a pipeline. params (~typing.Optional[dict, Any]): An optional parameter dictionary that will be passed to the processor as parameters with every event or document processed. Values should be json-serializable. """ __slots__ = ('processor_name', 'address', 'component_id', 'params', 'enable_proxy') def __init__(self, processor_name: str, *, address: Optional[str] = None, component_id: Optional[str] = None, params: Optional[Dict[str, Any]] = None, enable_proxy: bool = False): self.processor_name = processor_name self.address = address self.component_id = component_id self.params = params self.enable_proxy = enable_proxy def create_pipeline_component( self, component_ids: Dict[str, int], client: 'mtap.EventsClient' ) -> 'processing.ProcessingComponent': component_id = self.component_id or self.processor_name component_id = _unique_component_id(component_ids, component_id) runner = _runners.RemoteRunner(processor_name=self.processor_name, component_id=component_id, address=self.address, params=self.params, enable_proxy=self.enable_proxy) return runner def __repr__(self): return "RemoteProcessor(processor_name={}, address={}, component_id={}, params={})".format( *map(repr, [self.processor_name, self.address, self.component_id, self.params]) )
[docs]class LocalProcessor(_base.ComponentDescriptor): """A configuration of a locally-invoked processor. Args: proc (EventProcessor): The processor instance to run with the pipeline. component_id (str): How the processor's results will be identified locally. Will be modified to be unique if it is not unique relative to other component in a pipeline. params (~typing.Optional[dict, Any]): An optional parameter dictionary that will be passed to the processor as parameters with every event or document processed. Values should be json-serializable. Attributes: proc (EventProcessor): The processor instance to run with the pipeline. component_id (~typing.Optional[str]): How the processor's results will be identified locally. Will be modified to be unique if it is not unique relative to other component in a pipeline. params (~typing.Optional[dict, Any]): An optional parameter dictionary that will be passed to the processor as parameters with every event or document processed. Values should be json-serializable. """ __slots__ = ('proc', 'processor_name', 'component_id', 'params') def __init__(self, proc: 'mtap.EventProcessor', *, component_id: Optional[str] = None, params: Optional[Dict[str, Any]] = None): self.proc = proc self.processor_name = proc.metadata['name'] self.component_id = component_id or self.processor_name self.params = params def create_pipeline_component( self, component_ids: Dict[str, int], client: 'Callable[[], mtap.EventsClient]' ) -> 'processing.ProcessingComponent': identifier = _unique_component_id(component_ids, self.component_id) runner = _runners.ProcessorRunner(proc=self.proc, processor_name=self.processor_name, component_id=identifier, params=self.params, client=client()) return runner def __repr__(self): return 'LocalProcessor(proc={}, processor_name={}, component_id={}, params={}'.format( *map(repr, [ self.proc, self.processor_name, self.component_id, self.params ]))
def _event_and_params(target, params): try: document_name = target.document_name params = dict(params or {}) params['document_name'] = document_name event = target.event except AttributeError: event = target return event, params def _cancel_callback(event, read_ahead, cd, close_events): def fn(future: Future): if close_events: event.close() read_ahead.task_completed() cd.count_down(future.exception() is not None) return fn def _create_pipeline(name: Optional[str] = None, events_address: Optional[str] = None, events_client: Optional[EventsClient] = None, mp_config: 'Optional[MpConfig]' = None, *components: 'processing.ComponentDescriptor'): return Pipeline(*components, name=name, events_address=events_address, events_client=events_client, mp_config=mp_config) class MpConfig: """Configuration object for pipeline multiprocessing. Args: max_failures (~typing.Optional[int]) Override for maximum number of failures before cancelling the pipeline run. Default is 0. show_progress (~typing.Optional[bool]) Override for whether progress should be displayed in console. Default is true. workers (~typing.Optional[int]) Number of workers to concurrently process events through the pipeline. Default is 4. read_ahead (~typing.Optional[int]) The number of documents to read onto the events service(s) to queue for processing. Default is equal to number of workers. close_events (~typing.Optional[bool]) Whether any events passed from the source to the pipeline should be closed when the pipeline is completed. Default is true. mp_start_method (~typing.Optional[str]) The start method for multiprocessing processes see: :meth:`multiprocessing.get_context`. """ __slots__ = ("max_failures", "show_progress", "workers", "read_ahead", "close_events", "mp_start_method") def __init__(self, max_failures: Optional[int] = None, show_progress: Optional[bool] = None, workers: Optional[int] = None, read_ahead: Optional[int] = None, close_events: Optional[bool] = None, mp_start_method: Optional[str] = None): self.max_failures = 0 if max_failures is None else max_failures if not type(self.max_failures) == int or self.max_failures < 0: raise ValueError("max_failures must be None or non-negative integer") self.show_progress = True if show_progress is None else show_progress if not type(self.show_progress) == bool: raise ValueError("show_progress must be boolean.") self.workers = 4 if workers is None else workers if not type(self.workers) == int or self.workers <= 0: raise ValueError("workers must be None or positive integer.") self.read_ahead = self.workers if read_ahead is None else read_ahead if not type(self.read_ahead) == int or self.read_ahead < 0: raise ValueError("read_ahead must be None or a non-negative integer") self.close_events = True if close_events is None else close_events if not type(self.close_events) == bool: raise ValueError("close_events must be None or a bool.") self.mp_start_method = "spawn" if mp_start_method is None else mp_start_method @staticmethod def from_configuration(conf: Dict) -> 'MpConfig': return MpConfig(**conf)
[docs]class Pipeline(MutableSequence['processing.ComponentDescriptor']): """An object which can be used to build and run a pipeline of remote and local processors. Pipelines are a :obj:`~typing.MutableSequence` containing one or more :obj:`~mtap.processing.pipeline.ComponentDescriptor`, a pipeline can be modified after creation using this functionality. Args: *components (ComponentDescriptor): A list of component descriptors created using :class:`RemoteProcessor` or :class:`LocalProcessor`. Keyword Args: name (~typing.Optional[str]): An optional name for the pipeline, defaults to 'pipeline'. config (~typing.Optional[Config]): An optional config override. Attributes: name (str): The pipeline's name. Examples: Remote pipeline with name discovery: >>> with mtap.Pipeline( >>> RemoteProcessor('processor-1-id'), >>> RemoteProcessor('processor-2-id'), >>> RemoteProcessor('processor-3-id') >>> ) as pipeline: >>> client = pipeline.events_client >>> for txt in txts: >>> with Event(client=client) as event: >>> document = event.add_document('plaintext', txt) >>> results = pipeline.run(document) Remote pipeline using addresses: >>> with mtap.Pipeline( >>> RemoteProcessor('processor-1-name', address='localhost:50052'), >>> RemoteProcessor('processor-2-id', address='localhost:50053'), >>> RemoteProcessor('processor-3-id', address='localhost:50054'), >>> events_address='localhost:50051' >>> ) as pipeline: >>> client = pipeline.events_client >>> for txt in txts: >>> with Event(client=client) as event: >>> document = event.add_document('plaintext', txt) >>> results = pipeline.run(document) Modifying pipeline >>> pipeline = Pipeline(RemoteProcessor('foo', address='localhost:50000'), >>> RemoteProcessor('bar', address='localhost:50000')) >>> pipeline Pipeline(RemoteProcessor(processor_name='foo', address='localhost:50000', component_id=None, params=None), RemoteProcessor(processor_name='bar', address='localhost:50000', component_id=None, params=None)) >>> pipeline.append(RemoteProcessor('baz', address='localhost:50001')) >>> pipeline Pipeline(RemoteProcessor(processor_name='foo', address='localhost:50000', component_id=None, params=None), RemoteProcessor(processor_name='bar', address='localhost:50000', component_id=None, params=None), RemoteProcessor(processor_name='baz', address='localhost:50001', component_id=None, params=None)) >>> del pipeline[1] >>> pipeline Pipeline(RemoteProcessor(processor_name='foo', address='localhost:50000', component_id=None, params=None), RemoteProcessor(processor_name='baz', address='localhost:50001', component_id=None, params=None)) >>> pipeline[1] = RemoteProcessor(processor_name='bar', address='localhost:50003') >>> pipeline Pipeline(RemoteProcessor(processor_name='foo', address='localhost:50000', component_id=None, params=None), RemoteProcessor(processor_name='bar', address='localhost:50003', component_id=None, params=None)) >>> pipeline += list(pipeline) # Putting in a new list to prevent an infinite recursion >>> pipeline Pipeline(RemoteProcessor(processor_name='foo', address='localhost:50000', component_id=None, params=None), RemoteProcessor(processor_name='bar', address='localhost:50003', component_id=None, params=None), RemoteProcessor(processor_name='foo', address='localhost:50000', component_id=None, params=None), RemoteProcessor(processor_name='bar', address='localhost:50003', component_id=None, params=None)) """ __slots__ = ['_component_ids', 'name', '_component_descriptors', 'events_address', 'mp_config', '_created_events_client', '_provided_events_client', 'times_map', '__components', '_events_client'] def __init__(self, *components: 'processing.ComponentDescriptor', name: Optional[str] = None, events_address: Optional[str] = None, events_client: Optional[EventsClient] = None, mp_config: Optional[MpConfig] = None): self._component_ids = {} self.name = name or 'pipeline' self._component_descriptors = list(components) self.events_address = events_address self._created_events_client = False self._provided_events_client = events_client self._events_client = events_client self.mp_config = mp_config or MpConfig() self.times_map = {} def __reduce__(self): return _create_pipeline, (self.name, self.events_address, self._provided_events_client, self.mp_config) + tuple(self._component_descriptors)
[docs] @staticmethod def from_yaml_file(conf_path: Union[pathlib.Path, str]) -> 'Pipeline': """Creates a pipeline from a yaml pipeline configuration file. Args: conf_path (str or pathlib.Path): The path to the configuration file. Returns: Pipeline object from the configuration. """ conf_path = pathlib.Path(conf_path) from yaml import load try: from yaml import CLoader as Loader except ImportError: from yaml import Loader with conf_path.open('rb') as f: conf = load(f, Loader=Loader) return Pipeline.load_configuration(conf)
[docs] @staticmethod def load_configuration(conf: Dict) -> 'Pipeline': """Creates a pipeline from a pipeline configuration dictionary. Args: conf (Dict): The pipeline configuration dictionary. Returns: Pipeline created from the configuration. """ bad_keys = [k for k in conf.keys() if k not in ['name', 'events_addresses', 'events_address', 'components', 'mp_config']] if len(bad_keys) > 0: raise ValueError('Unrecognized keys in pipeline configuration: {}'.format(bad_keys)) name = conf.get('name', None) if 'events_address' in conf.keys() and 'events_addresses' in conf.keys(): raise ValueError("Only one of 'events_address' and 'events_addresses' should be specified.") events_address = conf.get('events_address', None) or conf.get('events_addresses', None) components = [] conf_components = conf.get('components', []) for conf_component in conf_components: bad_keys = [k for k in conf_component.keys() if k not in ['processor_id', 'name', 'component_id', 'address', 'params']] if len(bad_keys) > 0: raise ValueError('Unrecognized pipeline component key(s) {}'.format(bad_keys)) if 'processor_id' in conf_component: logger.warning("The 'processor_id' field has been renamed to 'name' in pipeline configurations." "For now it is automatically migrated, but it may fail in a future version") conf_component['name'] = conf_component['processor_id'] components.append( RemoteProcessor( processor_name=conf_component['name'], address=conf_component.get('address', None), component_id=conf_component.get('component_id', None), params=dict(conf_component.get('params', {})) ) ) mp_config = MpConfig.from_configuration(conf.get('mp_config', {})) return Pipeline(*components, name=name, events_address=events_address, mp_config=mp_config)
@property def events_client(self) -> EventsClient: if self._events_client is not None: return self._events_client self._created_events_client = True self._events_client = EventsClient(address=self.events_address) return self._events_client @events_client.setter def events_client(self, value: EventsClient): self._events_client = value @property def _components(self) -> 'List[processing.ProcessingComponent]': try: return self.__components except AttributeError: self.__components = [desc.create_pipeline_component(self._component_ids, lambda: self.events_client) for desc in self._component_descriptors] return self.__components @_components.deleter def _components(self): for component in self.__components: component.close() del self.__components
[docs] def run_multithread( self, source: Union[ Iterable[Union['mtap.Document', 'mtap.Event']], 'processing.ProcessingSource'], *, params: Optional[Dict[str, Any]] = None, show_progress: Optional[bool] = None, total: Optional[int] = None, close_events: Optional[bool] = None, max_failures: Optional[int] = None, workers: Optional[int] = None, read_ahead: Optional[int] = None, mp_context=None, log_level: Optional[Union[str, int]] = None ): """Runs this pipeline on a source which provides multiple documents / events. Concurrency is per-event, with each event being provided a thread which runs it through the pipeline. Args: source (~typing.Union[~typing.Iterable[~typing.Union[Event, Document]], ProcessingSource]) A generator of events or documents to process. This should be an :obj:`~typing.Iterable` of either :obj:`Event` or :obj:`Document` objects or a :obj:`~mtap.processing.ProcessingSource`. params (~typing.Optional[dict[str, ~typing.Any]]) Json object containing params specific to processing this event, the existing params dictionary defined in :func:`~PipelineBuilder.add_processor` will be updated with the contents of this dict. show_progress (~typing.Optional[bool]) Whether to print a progress bar using tqdm. total (~typing.Optional[int]) An optional argument indicating the total number of events / documents that will be provided by the iterable, for the progress bar. close_events (~typing.Optional[bool]) Whether the pipeline should close events after they have been fully processed through all components. max_failures (~typing.Optional[int]) The number of acceptable failures. Once this amount is exceeded processing will halt. Note that because of the nature of conccurrency processing may continue for a short amount of time before termination. workers (~typing.Optional[int]) The number of threads to process documents on. read_ahead (~typing.Optional[int]) The number of source documents to read ahead into memory before processing. mp_context (multiprocessing context, optional) An optional override for the multiprocessing context. log_level (~typing.Optional[~typing.Union[str, int]] The log_level for running the pipeline. Examples: >>> docs = list(pathlib.Path('abc/').glob('*.txt')) >>> def document_source(): >>> for path in docs: >>> with path.open('r') as f: >>> txt = f.read() >>> with Event(event_id=path.name, client=pipeline.events_client) as event: >>> doc = event.create_document('plaintext', txt) >>> yield doc >>> >>> pipeline.run_multithread(document_source(), total=len(docs)) """ show_progress = show_progress if show_progress is not None else self.mp_config.show_progress close_events = close_events if close_events is not None else self.mp_config.close_events max_failures = max_failures if max_failures is not None else self.mp_config.max_failures workers = workers if workers is not None else self.mp_config.workers mp_context = (multiprocessing.get_context(self.mp_config.mp_start_method) if mp_context is None else mp_context) read_ahead = read_ahead if read_ahead is not None else self.mp_config.read_ahead log_level = 'INFO' if log_level is None else log_level with _PipelineMultiRunner(self, source, params, show_progress, total, close_events, max_failures, workers, read_ahead, mp_context, log_level) as runner: runner.run()
[docs] def run(self, target: Union['mtap.Event', 'mtap.Document'], *, params: Optional[Dict[str, Any]] = None) -> 'processing.PipelineResult': """Processes the event/document using all the processors in the pipeline. Args: target (~typing.Union[Event, Document]): Either an event or a document to process. params (dict[str, ~typing.Any]): Json object containing params specific to processing this event, the existing params dictionary defined in :func:`~PipelineBuilder.add_processor` will be updated with the contents of this dict. Returns: list[ProcessingResult]: The results of all the processors in the pipeline. Examples: >>> e = mtap.Event() >>> document = mtap.Document('plaintext', text="...", event=e) >>> with Pipeline(...) as pipeline: >>> pipeline.run(document) >>> # is equivalent to pipeline.run(document.event, params={'document_name': document.document_name}) The 'document_name' param is used to indicate to :obj:`~mtap.DocumentProcessor` which document on the event to process. """ event, params = _event_and_params(target, params) event_id = event.event_id result = self._run_by_event_id(event_id, event.event_service_instance_id, params) self._add_result_times(result) for component_result in result.component_results: try: event.add_created_indices(component_result.created_indices) except AttributeError: pass return result
def _run_by_event_id(self, event_id, event_service_instance_id, params): start = datetime.now() results = [component.call_process(event_id, event_service_instance_id, params) for component in self._components] total = datetime.now() - start results = [_base.ProcessingResult(identifier=component.component_id, result_dict=result[0], timing_info=result[1], created_indices=result[2]) for component, result in zip(self._components, results)] logger.debug('Finished processing event_id: %s', event_id) return _base.PipelineResult(results, total) def _add_result_times(self, result): times = {} for component_id, _, component_times, _ in result.component_results: times.update({component_id + ':' + k: v for k, v in component_times.items()}) times[self.name + 'total'] = result.elapsed_time _timing.add_times(self.times_map, times) @overload def processor_timer_stats(self) -> 'List[processing.AggregateTimingInfo]': """Returns the timing information for all processors. Returns: List[AggregateTimingInfo]: A list of timing info objects, one for each processor, in the same order that the processors were added to the pipeline. """ ... @overload def processor_timer_stats(self, identifier: str) -> 'processing.AggregateTimingInfo': """Returns the timing info for one processor. Args: identifier (Optional[str]): The pipeline component_id for the processor to return timing info. Returns: AggregateTimingInfo: The timing info for the specified processor. """ ... def processor_timer_stats(self, identifier=None): if identifier is not None: aggregates = _timing.create_timer_stats(self.times_map, identifier + ':') aggregates = {k[(len(identifier) + 1):]: v for k, v in aggregates.items()} return _base.AggregateTimingInfo(identifier=identifier, timing_info=aggregates) timing_infos = [] for component in self._components: component_id = component.component_id aggregates = _timing.create_timer_stats(self.times_map, component_id + ':') aggregates = {k[(len(component_id) + 1):]: v for k, v in aggregates.items()} timing_infos.append( _base.AggregateTimingInfo(identifier=component_id, timing_info=aggregates)) return timing_infos
[docs] def pipeline_timer_stats(self) -> 'processing.AggregateTimingInfo': """The aggregated statistics for the global runtime of the pipeline. Returns: AggregateTimingInfo: The timing stats for the global runtime of the pipeline. """ pipeline_id = self.name aggregates = _timing.create_timer_stats(self.times_map, pipeline_id) aggregates = {k[len(pipeline_id):]: v for k, v in aggregates.items()} return _base.AggregateTimingInfo(identifier=self.name, timing_info=aggregates)
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def close(self): """Closes any open connections to remote processors. """ for component in self._components: try: component.close() except AttributeError: pass if self._created_events_client: self._events_client.close()
[docs] def as_processor(self) -> 'processing.EventProcessor': """Returns the pipeline as a processor. Returns: EventProcessor: An event processor that can be added to other pipelines or hosted. """ return _PipelineProcessor(self._components)
[docs] def print_times(self): """Prints all of the times collected during this pipeline using :func:`print`. """ self.pipeline_timer_stats().print_times() for pipeline_timer in self.processor_timer_stats(): pipeline_timer.print_times()
def __getitem__(self, item): return self._component_descriptors[item] def __setitem__(self, key, value): self._clear_components() self._component_descriptors[key] = value def __delitem__(self, key): self._clear_components() del self._component_descriptors[key] def __len__(self): return len(self._component_descriptors) def _clear_components(self): try: del self._components except AttributeError: pass def insert(self, index, o) -> None: self._clear_components() self._component_descriptors.insert(index, o) def __repr__(self): return "Pipeline(" + ', '.join( [repr(component) for component in self._component_descriptors]) + ')'
[docs]class ProcessingSource(ContextManager, ABC): """Provides events or documents for the multi-threaded pipeline runner. Also has functionality for receiving results. """ __slots__ = ('_total',) @property def total(self) -> Optional[int]: """The total number of documents this source will provide. Returns: str or None count of the total events or None if not known. """ try: return self._total except AttributeError: return None @total.setter def total(self, count: Optional[int]): self._total = count
[docs] @abstractmethod def provide(self, consume: Callable[[Union['mtap.Document', 'mtap.Event']], None]): """The method which provides documents for the multi-threaded runner. This method provides documents or events to the pipeline. Args: consume (~typing.Callable[~typing.Union[Document, Event]]) The consumer method to pass documents or events to process. Examples: Example implementation for processing text documents in a directory. >>> ... >>> def provide(self, consume: Callable[[Union[Document, Event]], None]): >>> for file in Path(".").glob("*.txt"): >>> with file.open('r') as fio: >>> txt = fio.read() >>> event = Event() >>> doc = event.create_document('plaintext', txt) >>> consume(doc) """ ...
[docs] def receive_result(self, result: 'processing.PipelineResult', event: 'mtap.Event'): """Optional method: Asynchronous callback which returns the results of processing. This method is called on a processing worker thread. Default behavior is to do nothing. Args: result (PipelineResult): The result of processing using the pipeline. event (Event): The event processed. """ pass
[docs] def receive_failure(self, exc: 'processing.ProcessingError') -> bool: """Optional method: Asynchronous callback which receives exceptions for any failed documents. Args: exc (ProcessingError): The processing exception. Returns: bool: Whether the error should be suppressed and not count against maximum failures. """ pass
[docs] def close(self): """Optional method: called to clean up after processing is complete. Returns: """ pass
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() return None
class _IterableProcessingSource(ProcessingSource): """Wraps an iterable in a ProcessingSource for the multi-thread processor. """ __slots__ = ('it',) def __init__(self, source): # We use an iterator here so we can ensure that it gets closed on unexpected / early # termination and any caller context managers are exited before their client gets shut down. # Using a for-in loop we're not guaranteed, which can cause zombie unclosed events on the # event service. self.it = iter(source) def provide(self, consume: Callable[[Union['mtap.Document', 'mtap.Event']], None]): while True: try: target = next(self.it) except StopIteration: break consume(target) def close(self): try: self.it.close() except AttributeError: pass class _PipelineProcessor(_base.EventProcessor): __slots__ = ('_components',) def __init__(self, components: List['processing.ProcessingComponent']): self._components = components def process(self, event: 'mtap.Event', params: Dict[str, Any] = None): results = [component.call_process(event.event_id, event.event_service_instance_id, params) for component in self._components] times = {} for _, component_times, _ in results: times.update(component_times) for k, v in times.items(): _PipelineProcessor.current_context().add_time(k, v) return {'component_results': [result[0] for result in results]} _mp_pipeline = None def _mp_process_init(config, pipeline, queue, log_level): global _mp_pipeline # set up multiprocess logging via queue back to listener h = logging.handlers.QueueHandler(queue) root = logging.getLogger() root.addHandler(h) root.setLevel(log_level) Config(config).enter_context() _mp_pipeline = pipeline def cleanup_pipeline(*_): _mp_pipeline.close() signal.signal(signal.SIGINT, cleanup_pipeline) def _mp_process_event(event_id, event_service_instance_id, params): assert isinstance(_mp_pipeline, Pipeline) try: result = _mp_pipeline._run_by_event_id(event_id, event_service_instance_id, params) except Exception: return event_id, None, traceback.format_exc() return event_id, result, None class _PipelineMultiRunner: __slots__ = ('pipeline', 'failures', 'max_failures', 'targets_cond', 'active_targets', 'close_events', 'max_targets', 'n_threads', 'progress_bar', 'source', 'params', 'pool', 'active_events', 'log_listener') def __init__(self, pipeline, source, params, progress, total, close_events, max_failures, n_threads, read_ahead, mp_context, log_level): self.pipeline = pipeline self.failures = 0 self.max_failures = max_failures self.targets_cond = Condition(Lock()) self.active_targets = 0 self.close_events = close_events if read_ahead is None: read_ahead = n_threads self.max_targets = n_threads + read_ahead self.n_threads = n_threads total = (source.total if hasattr(source, 'total') else None) or total self.progress_bar = tqdm(total=total, unit='event', smoothing=0.01) if progress else None if not isinstance(source, ProcessingSource): if not hasattr(source, '__iter__'): raise ValueError('The source needs to either be a ProcessingSource or an Iterable.') source = _IterableProcessingSource(source) self.source = source self.params = params logging_queue = mp_context.Queue(-1) handler = logging.StreamHandler() f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s') handler.setFormatter(f) handler.setLevel(log_level) self.log_listener = QueueListener(logging_queue, handler) self.log_listener.start() self.pool = mp_context.Pool( self.n_threads, initializer=_mp_process_init, initargs=(dict(Config()), pipeline, logging_queue, log_level) ) self.active_events = {} def max_failures_reached(self): return self.failures > self.max_failures def increment_active_tasks(self): with self.targets_cond: self.active_targets += 1 def task_completed(self): with self.targets_cond: if self.progress_bar is not None: self.progress_bar.update(1) self.active_targets -= 1 self.targets_cond.notify() def tasks_done(self): return self.active_targets == 0 def wait_tasks_completed(self): with self.targets_cond: self.targets_cond.wait_for(self.tasks_done) def read_ready(self): return self.active_targets < self.max_targets def wait_to_read(self): with self.targets_cond: self.targets_cond.wait_for(self.read_ready) def run(self): def result_handler(result): event_id, result, error = result event = self.active_events.pop(event_id) if result is not None: self.pipeline._add_result_times(result) self.source.receive_result(result, event) else: if not self.source.receive_failure(error): logger.error('Error while processing event_id: %s\n%s', event_id, error) self.failures += 1 self.task_completed() if self.close_events: event.release_lease() def error_handler(error): logger.error('Unexpected error') raise error def consume(target): if self.max_failures_reached(): raise ValueError('Max processing failures exceeded.') event, params = _event_and_params(target, self.params) try: event.lease() event_id = event.event_id self.active_events[event_id] = event self.pool.apply_async(_mp_process_event, args=(event_id, event.event_service_instance_id, params), callback=result_handler, error_callback=error_handler) self.increment_active_tasks() except BaseException as e: # here we failed sometime between taking a new lease and adding the done # callback to the future, meaning the lease will never get freed. event.release_lease() raise e self.wait_to_read() try: with self.source: self.source.provide(consume) self.wait_tasks_completed() except KeyboardInterrupt: print('Pipeline terminated by user (KeyboardInterrupt).') if self.max_failures_reached(): raise ValueError('Max processing failures exceeded.') def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def close(self): self.pool.terminate() self.pool.join() if self.close_events: for event in self.active_events.values(): event.close() self.log_listener.stop() def _unique_component_id(component_ids, component_id): count = component_ids.get(component_id, 0) count += 1 component_ids[component_id] = count if count > 1: component_id = component_id + '-' + str(count) return component_id class FilesInDirectoryProcessingSource(ProcessingSource): """Processing source for pipelines which iterates over files in a directory. Args: client (mtap.EventsClient): The events client to send documents to. directory (pathlib.Path or str): The path to the directory of files to process. Keyword Args: document_name (str): The name of the document to create and add text to. extension_glob (str): The glob for which files to create events and documents for. count_total (bool): Whether to count the total number of documents before processing. errors (~typing.Optional[str]): The errors argument for :func:`open`. """ __slots__ = ('client', 'input_directory', 'document_name', 'extension_glob', 'errors', 'total') def __init__(self, client: 'mtap.EventsClient', directory: Union[pathlib.Path, str], *, document_name: str = 'plaintext', extension_glob: str = '*.txt', count_total: bool = True, errors: Optional[str] = None): self.client = client self.input_directory = pathlib.Path(directory) if not self.input_directory.is_dir(): raise ValueError('Invalid / non-existing input directory: ' + str(self.input_directory)) self.document_name = document_name self.extension_glob = extension_glob self.errors = errors if count_total: self.total = sum(1 for _ in self.input_directory.rglob(self.extension_glob)) def provide(self, consume: Callable[[Union['mtap.Document', 'mtap.Event']], None]): for path in self.input_directory.rglob(self.extension_glob): with path.open('r', errors=self.errors) as f: txt = f.read() relative = str(path.relative_to(self.input_directory)) with Event(event_id=relative, client=self.client, only_create_new=True) as e: doc = e.create_document(self.document_name, txt) consume(doc)