Source code for mtap.deployment

#  Copyright 2020 Regents of the University of Minnesota.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
"""Module for deploying a set of processing services and the events server all at once.

Examples:
    See python/mtap/examples/exampleDeploymentConfiguration.yml for an example of the yaml deployment configuration
    which can be loaded using :py:meth:`~mtap.deployment.Deployment.from_yaml_file`

    An example configuration

    >>> deploy = Deployment(
    >>>     GlobalSettings(host='0.0.0.0'),
    >>>     EventsDeployment(port=10100, workers=8),
    >>>     SharedProcessorConfig(workers=8, jvm_args=['-Xms32m', '-Xmx8g'], classpath='blah.jar'),
    >>>     ProcessorDeployment(implementation='python',
    >>>                         entry_point='mtap.examples.example_processor',
    >>>                         instances=4,
    >>>                         port=10101,
    >>>                         workers=4),
    >>>     ProcessorDeployment(implementation='java',
    >>>                         entry_point='edu.umn.nlpie.mtap.WordOccurrencesExampleProcessor',
    >>>                         port=10105)
    >>> )
    >>> deploy.run_servers()



"""
import argparse
import logging
import os
import pathlib
import shutil
import signal
import subprocess
import sys
import threading
import time
import uuid
from contextlib import contextmanager
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union

import grpc

from mtap import utilities, _config

__all__ = [
    'Deployment', 'GlobalSettings', 'SharedProcessorConfig', 'EventsDeployment', 'ServiceDeployment',
    'ProcessorDeployment', 'main', 'deployment_parser', 'ServiceDeploymentException',
]

logger = logging.getLogger('mtap.deployment')

PYTHON_EXE = sys.executable


def _get_java() -> str:
    try:
        return str(pathlib.Path(os.environ['JAVA_HOME']) / 'bin' / 'java')
    except KeyError:
        return 'java'


JAVA_EXE = _get_java()


def _listen(process: subprocess.Popen) -> int:
    for line in process.stdout:
        print(line.decode(), end='', flush=True)
    return process.wait()


[docs]class ServiceDeploymentException(Exception): """Exception raised in the case of a service failing to launch. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs]class GlobalSettings: """Settings shared by event service and all processors. Keyword Args: host (~typing.Optional[str]): The global host override, forces all services to use a specific host name. mtap_config (~typing.Optional[str]): The path to an MTAP config file to load for all services. log_level (~typing.Optional[str]): A python logging level to pass to all services. register (~typing.Optional[bool]): Whether services should register with service discovery. Attributes: host (~typing.Optional[str]): The global host override, forces all services to use a specific host name. mtap_config (~typing.Optional[str]): The path to an MTAP config file to load for all services. log_level (~typing.Optional[str]): A python logging level to pass to all services. register (~typing.Optional[bool]): Whether services should register with service discovery. """ def __init__(self, *, host: Optional[str] = None, mtap_config: Optional[str] = None, log_level: Optional[str] = None, register: Optional[bool] = None): self.host = host self.mtap_config = mtap_config self.log_level = log_level self.register = register
[docs] @staticmethod def from_conf(conf: Optional[Dict]) -> 'GlobalSettings': """Creates a global settings object from a configuration dictionary. Keyword Args: conf (~typing.Optional[~typing.Dict]): The configuration dictionary. Returns: GlobalSettings: The global settings object. """ conf = conf or {} return GlobalSettings(host=conf.get('host'), mtap_config=conf.get('mtap_config'), log_level=conf.get('log_level'), register=conf.get('register'))
[docs]class SharedProcessorConfig: """Configuration that is shared between multiple processor services. Keyword Args: events_addresses (~typing.Optional[~typing.List[str]]): An optional GRPC-compatible target for the events service to be used by all processors. workers (~typing.Optional[int]): The default number of worker threads which will perform processing. additional_args (~typing.Optional[~typing.List[str]]): a list of additional arguments that should be appended to every processor. jvm_args (~typing.Optional[~typing.List[str]]): a list of JVM arguments for all java processors. java_classpath (~typing.Optional[str]): A classpath string that will be passed to all java processors. startup_timeout (~typing.Optional[int]): The default startup timeout for processors. mp_spawn_method (~typing.Optional[str]): A :meth:`multiprocessing.get_context` argument to create the multiprocessing context. Attributes: events_addresses (~typing.Optional[~typing.List[str]]): An optional GRPC-compatible target for the events service to be used by all processors. workers (~typing.Optional[int]): The default number of worker threads which will perform processing. additional_args (~typing.Optional[~typing.List[str]]): a list of additional arguments that should be appended to every processor. jvm_args (~typing.Optional[~typing.List[str]]): a list of JVM arguments for all java processors. java_classpath (~typing.Optional[str]): A classpath string that will be passed to all java processors. startup_timeout (~typing.Optional[int]): The default startup timeout for processors. mp_spawn_method (~typing.Optional[str]): A :meth:`multiprocessing.get_context` argument to create the multiprocessing context. """ def __init__(self, *, events_addresses: Optional[List[str]] = None, workers: Optional[int] = None, additional_args: Optional[List[str]] = None, jvm_args: Optional[List[str]] = None, java_classpath: Optional[str] = None, java_additional_args: Optional[List[str]] = None, startup_timeout: Optional[int] = None, mp_spawn_method: Optional[str] = None): self.events_addresses = events_addresses self.workers = workers self.additional_args = additional_args self.jvm_args = jvm_args self.java_classpath = java_classpath self.java_additional_args = java_additional_args self.startup_timeout = startup_timeout or 30 self.mp_spawn_method = mp_spawn_method
[docs] @staticmethod def from_conf(conf: Optional[Dict]) -> 'SharedProcessorConfig': """Builds a configuration from a dictionary representation. Args: conf (~typing.Optional[~typing.Dict]): The configuration dictionary. Returns: SharedProcessorConfig object. """ conf = conf or {} return SharedProcessorConfig(**conf)
[docs]class ServiceDeployment: """Shared configuration for services, both events and processors. Keyword Args: workers (~typing.Optional[int]): The number of workers. register (~typing.Optional[bool]): Whether to use service discovery. mtap_config (~typing.Optional[str]): A path to the mtap configuration. log_level (~typing.Optional[str]): The log level. Attributes: workers (~typing.Optional[int]): The number of workers. register (~typing.Optional[bool]): Whether to use service discovery. mtap_config (~typing.Optional[str]): A path to the mtap configuration. log_level (~typing.Optional[str]): The log level. """ def __init__(self, *, workers: Optional[int], register: Optional[bool], mtap_config: Optional[str], log_level: Optional[str]): self.workers = workers self.register = register self.mtap_config = mtap_config self.log_level = log_level def _service_args(self, host: Optional[str] = None, port: Optional[int] = None, unique_service_identifier: Optional[List[str]] = None, register_default: Optional[bool] = None, global_host: Optional[str] = None, workers_default: Optional[int] = None, mtap_config_default: Optional[str] = None, log_level_default: Optional[str] = None): call = [] host = global_host or host if host is not None: call.extend(['--host', str(host)]) if port is not None: call.extend(['--port', str(port)]) if self.register or register_default: call.append('--register') sid = unique_service_identifier or str(uuid.uuid4()) call.extend(["--sid", sid]) workers = self.workers or workers_default if workers is not None: call.extend(['--workers', str(workers)]) mtap_config = self.mtap_config or mtap_config_default if mtap_config is not None: call.extend(['--mtap-config', mtap_config]) log_level = self.log_level or log_level_default if log_level is not None: call.extend(['--log-level', log_level]) call.append('--write-address') return call, sid
[docs]class EventsDeployment: """Deployment configuration for the events service. Keyword Args: enabled (bool): Whether an events service should be created. addresses (~typing.Optional[~typing.Sequence[str]]): The host address of the events service. workers (~typing.Optional[int]): The number of worker threads the events service should use. register (~typing.Optional[bool]): Whether to register the events service with discovery. mtap_config (~typing.Optional[str]): Path to an mtap configuration file. log_level (~typing.Optional[str]): The log level for the events service. Attributes: enabled (bool): Whether an events service should be created. addresses (~typing.Optional[~typing.Sequence[str]]): The host address of the events service. service_deployment (ServiceDeployment): The service deployment settings (workers, registration, config, logging). """ def __init__(self, *, enabled: bool = True, addresses: Optional[Sequence[str]] = None, workers: Optional[int] = None, register: Optional[bool] = None, mtap_config: Optional[str] = None, log_level: Optional[str] = None): self.enabled = enabled self.addresses = addresses self.service_deployment = ServiceDeployment(workers=workers, register=register, mtap_config=mtap_config, log_level=log_level) def create_calls(self, global_settings: GlobalSettings) -> Iterable[Tuple[List[str], str]]: for address in self.addresses: host = None port = None if address: splits = address.split(':') if len(splits) == 2: host, port = splits if host == '': host = None else: host = splits[0] call = [PYTHON_EXE, '-m', 'mtap', 'events'] service_args, sid = self.service_deployment._service_args( host=host, port=port, register_default=global_settings.register, global_host=global_settings.host, mtap_config_default=global_settings.mtap_config, log_level_default=global_settings.log_level ) call.extend(service_args) yield call, sid
[docs] @staticmethod def from_conf(conf: Optional[Dict]) -> 'EventsDeployment': """Creates the EventsDeployment configuration option from a configuration dictionary. Args: conf (~typing.Optional[~typing.Dict]): The configuration dictionary Returns: EventsDeployment or None from the configuration dictionary. """ conf = conf or {} enabled = conf.get('enabled') if enabled is None: enabled = False address = conf.get('address', None) or conf.get('addresses', None) if address is None: addresses = [] elif isinstance(address, str): addresses = [address] elif isinstance(address, Iterable): addresses = list(address) else: raise ValueError('Unrecognized type of address: ' + type(address)) return EventsDeployment(enabled=enabled, addresses=addresses, workers=conf.get('workers'), register=conf.get('register'), mtap_config=conf.get('mtap_config'))
[docs]class ProcessorDeployment: """Deployment configuration for an MTAP processor. Used to construct the command for launching the processor. The processor should be a Java Class with a main method or a Python module with a main block. It should accept the standard MTAP processor deployment arguments and launch an MTAP processor using :func:`mtap.run_processor` or the equivalent Java method. Args: implementation (str): Either "java" or "python". entry_point (str): Either the java main class, or the python main module. Keyword Args: enabled (~typing.Optional[bool]): Whether the processor should be launched as part of deployment. Default is `True` if `None`. instances (~typing.Optional[int]): The number of instances of the processor to launch. Default is `1` if `None`. host (~typing.Optional[str]): The listening host for the processor service. port (~typing.Optional[int]): The listening port for the processor service. workers (~typing.Optional[int]): The number of worker threads per instance. register (~typing.Optional[bool]): Whether the processor should register with the discovery service specified in the MTAP configuration mtap_config (~typing.Optional[str]): Path to the MTAP configuration file. log_level (~typing.Optional[str]): The log level for the processor. identifier (~typing.Optional[str]): An optional identifier override to use for registration. pre_args (~typing.Optional[~typing.List[str]]): Arguments that occur prior to the MTAP service arguments (like host, port, etc). additional_args (~typing.Optional[~typing.List[str]]): Arguments that occur after the MTAP service arguments. startup_timeout (~typing.Optional[int]): Optional override startup timeout. mp_spawn_method (~typing.Optional[str]): A :meth:`multiprocessing.get_context` argument to create the multiprocessing context. Attributes: implementation (str): Either "java" or "python". entry_point (str): Either the java main class, or the python main module. enabled (bool): Whether the processor should be launched as part of deployment. instances (int): The number of instances of the processor to launch. host (~typing.Optional[str]): The listening host for the processor service. port (~typing.Optional[int]): The listening port for the processor service. service_deployment (ServiceDeployment): The service deployment settings (workers, registration, config, logging). pre_args (~typing.Optional[~typing.List[str]]): Arguments that occur prior to the MTAP service arguments (like host, port, etc). additional_args (~typing.Optional[~typing.List[str]]): Arguments that occur after the MTAP service arguments. startup_timeout (~typing.Optional[int]): Optional override startup timeout. mp_spawn_method (~typing.Optional[str]): A :meth:`multiprocessing.get_context` argument to create the multiprocessing context. """ def __init__(self, implementation: str, entry_point: str, *, enabled: Optional[bool] = None, instances: Optional[int] = None, host: Optional[str] = None, port: Optional[int] = None, workers: Optional[int] = None, register: Optional[bool] = None, mtap_config: Optional[str] = None, log_level: Optional[str] = None, name: Optional[str] = None, unique_service_identifier: Optional[List[str]] = None, pre_args: Optional[List[str]] = None, additional_args: Optional[List[str]] = None, startup_timeout: Optional[int] = None, mp_spawn_method: Optional[str] = None): self.implementation = implementation self.entry_point = entry_point self.enabled = enabled if enabled is not None else True self.instances = instances or 1 if not isinstance(self.instances, int) or self.instances < 1: raise ValueError("Instances must be strictly positive integer.") self.name = name self.unique_service_identifier = unique_service_identifier self.pre_args = pre_args self.additional_args = additional_args self.host = host self.port = port self.service_deployment = ServiceDeployment(workers=workers, register=register, mtap_config=mtap_config, log_level=log_level) self.startup_timeout = startup_timeout self.mp_spawn_method = mp_spawn_method
[docs] @staticmethod def from_conf(conf: Dict) -> 'ProcessorDeployment': """Creates an MTAP processor deployment configuration from a configuration dictionary. Args: conf (~typing.Dict): The configuration dictionary. Returns: ProcessorDeployment object that can be used to constuct the call for the processor. """ return ProcessorDeployment(**conf)
def create_calls(self, global_settings: GlobalSettings, shared_config: SharedProcessorConfig) -> Iterable[Tuple[List[str], str]]: if isinstance(self.port, list): ports = self.port elif self.port is None: ports = [None] * self.instances else: ports = list(range(self.port, self.port + self.instances)) for port in ports: if self.implementation == 'python': call = [PYTHON_EXE, '-m', self.entry_point] mp_spawn_method = shared_config.mp_spawn_method if self.mp_spawn_method is not None: mp_spawn_method = self.mp_spawn_method if mp_spawn_method is not None: call.extend(['--mp-spawn-method', mp_spawn_method]) elif self.implementation == 'java': call = [str(JAVA_EXE)] if shared_config.jvm_args is not None: call.extend(shared_config.jvm_args) if shared_config.java_classpath is not None: call.extend(['-cp', shared_config.java_classpath]) call.append(self.entry_point) else: raise ValueError('Unrecognized implementation: ' + self.implementation) if self.pre_args is not None: call.extend(self.pre_args) service_args, sid = self.service_deployment._service_args( host=self.host, port=port, unique_service_identifier=self.unique_service_identifier, register_default=global_settings.register, global_host=global_settings.host, mtap_config_default=global_settings.mtap_config, log_level_default=global_settings.log_level, workers_default=shared_config.workers ) call.extend(service_args) if self.name is not None: call.extend(['--name', self.name]) events_addresses = shared_config.events_addresses if events_addresses is not None: call.extend(['--events', ','.join(events_addresses)]) if self.additional_args is not None: call.extend(self.additional_args) if shared_config.additional_args is not None: call.extend(shared_config.additional_args) if self.implementation == 'java' and shared_config.java_additional_args is not None: call.extend(shared_config.java_additional_args) yield call, sid
[docs]class Deployment: """An automatic deployment configuration which launches a configurable set of MTAP services. Args: global_settings (~typing.Optional[GlobalSettings]): Settings shared among all services. events_deployment (~typing.Optional[EventsDeployment]): Deployment settings for the events service. shared_processor_config (~typing.Optional[SharedProcessorConfig]): Shared configuration settings for all processors. *processors (ProcessorDeployment): Configurations for individual processors. Attributes: global_settings (~typing.Optional[GlobalSettings]): Settings shared among all services. events_deployment (~typing.Optional[EventsDeployment]): Deployment settings for the events service. shared_processor_config (~typing.Optional[SharedProcessorConfig]): Shared configuration settings for all processors. processors (~typing.List[ProcessorDeployment]): Configurations for individual processors. """ def __init__(self, global_settings: Optional[GlobalSettings] = None, events_deployment: Optional[EventsDeployment] = None, shared_processor_config: Optional[SharedProcessorConfig] = None, *processors: ProcessorDeployment): self.global_settings = global_settings self.events_deployment = events_deployment self.shared_processor_config = shared_processor_config self.processors = list(processors) self._processor_listeners = []
[docs] @staticmethod def load_configuration(conf: Dict) -> 'Deployment': """Creates a deployment object from a configuration dictionary. Args: conf (~typing.Dict): The configuration dictionary. Returns: Deployment object created. """ global_settings = GlobalSettings.from_conf(conf.get('global')) events = EventsDeployment.from_conf(conf.get('events_service')) shared_processor_config = SharedProcessorConfig.from_conf(conf.get('shared_processor_config')) processors_list = conf.get('processors', []) processors = [ProcessorDeployment.from_conf(c) for c in processors_list] return Deployment(global_settings, events, shared_processor_config, *processors)
[docs] @staticmethod def from_yaml_file(conf_path: Union[pathlib.Path, str]) -> 'Deployment': """Loads a deployment configuration from a yaml file. Args: conf_path (str or pathlib.Path): The path to the yaml configuration file. Returns: Deployment object created from the configuration. """ conf_path = pathlib.Path(conf_path) from yaml import load try: from yaml import CLoader as Loader except ImportError: from yaml import Loader with conf_path.open('rb') as f: conf = load(f, Loader=Loader) return Deployment.load_configuration(conf)
[docs] @contextmanager def run_servers(self): """A context manager that starts all the configured services in subprocesses and returns. Raises: ServiceDeploymentException: If one or more of the services fails to launch. Examples >>> deploy = Deployment.from_yaml_file('deploy_config.yml') >>> with deploy.run_servers(): >>> # do something that requires the servers. >>> # servers are automatically shutdown / terminated when the block is exited """ try: self._do_launch_all_processors() yield self finally: self.shutdown()
[docs] def run_servers_and_wait(self): """Starts the specified servers and blocks until KeyboardInterrupt, SIGINT, or SIGTERM are received. """ e = threading.Event() signal.signal(signal.SIGINT, lambda *_: e.set()) signal.signal(signal.SIGTERM, lambda *_: e.set()) with self.run_servers(): try: e.wait() except KeyboardInterrupt: pass
def _do_launch_all_processors(self): c = _config.Config() enable_proxy = c.get('grpc.enable_proxy', False) events_addresses = [] # deploy events service if self.events_deployment.enabled: for call, sid in self.events_deployment.create_calls(self.global_settings): # Start new session here because otherwise subprocesses get hit with signals meant for parent events_address = self._start_subprocess(call, "events", sid, 30, enable_proxy) events_addresses.append(events_address) # attach deployed events service addresses if it's not already specified or will be picked up by service disc. if (not self.global_settings.register and not self.events_deployment.service_deployment.register and self.shared_processor_config.events_addresses is None): self.shared_processor_config.events_addresses = events_addresses # deploy processors for processor_deployment in self.processors: if processor_deployment.enabled: for call, sid in processor_deployment.create_calls(self.global_settings, self.shared_processor_config): logger.debug('Launching processor with call: %s', call) # Start new session here because otherwise subprocesses get hit with signals meant for parent startup_timeout = (processor_deployment.startup_timeout or self.shared_processor_config.startup_timeout) self._start_subprocess(call, processor_deployment.entry_point, sid, startup_timeout, enable_proxy) print('Done deploying all servers.', flush=True)
[docs] def shutdown(self): """Shuts down all processors. Returns: """ print("Shutting down all processors") excs = [] for p, listener in self._processor_listeners: try: p.terminate() listener.join(timeout=15.0) if listener.is_alive(): print(f'Unsuccessfully terminated processor {p.args}... killing.') p.kill() listener.join() except Exception as e: print(f"Failed to properly shutdown processor {p.args}") excs.append(e)
def _start_subprocess(self, call: List[str], name: Any, sid: str, startup_timeout: int, enable_proxy: bool = False) -> str: # starts process and listener, stores for later cleanup, returns address. p = subprocess.Popen(call, start_new_session=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) listener = threading.Thread(target=_listen, args=(p,)) listener.start() self._processor_listeners.append((p, listener)) # Adding early so it gets cleaned up on failure address = None for i in range(startup_timeout): try: address = utilities.read_address(sid) break except FileNotFoundError: time.sleep(1) if address is None: raise ServiceDeploymentException(f'Failed to launch, timed out waiting: {name}') with grpc.insecure_channel(address, options=[('grpc.enable_http_proxy', enable_proxy)]) as channel: future = grpc.channel_ready_future(channel) try: future.result(timeout=startup_timeout) except grpc.FutureTimeoutError: raise ServiceDeploymentException(f'Failed to launch, unresponsive: {name}') return address
def main(args: Optional[Sequence[str]] = None, conf: Optional[argparse.Namespace] = None): if conf is None: conf = deployment_parser().parse_args(args) if conf.log_level is not None: logging.basicConfig(level=conf.log_level) if conf.mode == 'run_servers': deployment = Deployment.from_yaml_file(conf.deploy_config) deployment.run_servers() if conf.mode == 'write_example': example = pathlib.Path(__file__).parent / "examples" / "exampleDeploymentConfiguration.yml" shutil.copyfile(str(example), "exampleDeploymentConfiguration.yml") print('Writing "exampleDeploymentConfiguration.yml" to ' + str(pathlib.Path.cwd()))
[docs]def deployment_parser() -> argparse.ArgumentParser: """Creates a parser for configuration that can be passed to the deployment main method. Returns: ~argparse.ArgumentParser: The argument parser object that will create a namespace that can be passed to :func:`main`. """ parser = argparse.ArgumentParser(add_help=False) parser.add_argument('--log-level', metavar='LEVEL', help="The log level to use for the deployment script.") subparsers = parser.add_subparsers(title='mode') run_servers = subparsers.add_parser('run_servers') run_servers.add_argument('deploy_config', metavar='CONFIG_FILE', type=pathlib.Path, help="A path to the deployment configuration to deploy.") run_servers.set_defaults(mode='run_servers') write_example = subparsers.add_parser('write_example') write_example.set_defaults(mode='write_example') return parser
if __name__ == '__main__': main()