# Copyright 2019 Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import concurrent.futures.thread as thread
import logging
import signal
import threading
import traceback
import typing
import uuid
from logging.handlers import QueueListener
from typing import Any, Dict, Tuple, Sequence, Optional, Union
import google.protobuf.empty_pb2 as empty_pb2
import grpc
import grpc_health.v1.health as health
import grpc_health.v1.health_pb2_grpc as health_pb2_grpc
from mtap import _config
from mtap import _structs
from mtap import data
from mtap import utilities
from mtap.api.v1 import processing_pb2, processing_pb2_grpc
from mtap.processing import _base, _runners, _timing
if typing.TYPE_CHECKING:
import mtap
logger = logging.getLogger('mtap.processing')
[docs]def run_processor(proc: 'mtap.EventProcessor',
*,
mp: bool = False,
options: Optional[argparse.Namespace] = None,
args: Optional[Sequence[str]] = None,
mp_context=None):
"""Runs the processor as a GRPC service, blocking until an interrupt signal is received.
Args:
proc (EventProcessor): The processor to host.
mp (bool): If true, will create instances of ``proc`` on multiple forked processes to
process events. This is useful if the processor is computationally intensive and would
run into Python GIL issues on a single process.
options (~typing.Optional[~argparse.Namespace]): The parsed arguments from the parser
returned by :func:`processor_parser`.
args (~typing.Optional[~typing.Sequence[str]]): Arguments to parse server settings from if
``namespace`` was not supplied.
mp_context: A multiprocessing context that gets passed to the process pool executor in the
case of mp = True.
Examples:
Will automatically parse arguments:
>>> run_processor(MyProcessor())
Manual arguments:
>>> run_processor(MyProcessor(), args=['-p', '8080'])
"""
from mtap import EventProcessor
if not isinstance(proc, EventProcessor):
raise ValueError("Processor must be instance of EventProcessor class.")
if options is None:
processors_parser = argparse.ArgumentParser(parents=[processor_parser()])
processors_parser.add_help = True
options = processors_parser.parse_args(args)
if options.log_level:
logging.basicConfig(level=getattr(logging, options.log_level))
events_addresses = []
if options.events_addresses is not None:
events_addresses.extend(options.events_addresses.split(','))
with _config.Config() as c:
if options.mtap_config is not None:
c.update_from_yaml(options.mtap_config)
# instantiate runner
name = options.name or proc.metadata['name']
sid = options.sid
enable_http_proxy = options.grpc_enable_http_proxy
if enable_http_proxy is not None:
c['grpc.events_channel_options.grpc.enable_http_proxy'] = enable_http_proxy
if mp:
runner = MpProcessorRunner(proc=proc,
workers=options.workers,
events_address=events_addresses,
processor_name=name,
mp_context=mp_context,
log_level=options.log_level)
else:
client = data.EventsClient(address=events_addresses)
runner = _runners.ProcessorRunner(proc, client=client,
processor_name=name,
params=None)
server = ProcessorServer(runner=runner,
sid=sid,
host=options.host,
port=options.port,
register=options.register,
workers=options.workers,
write_address=options.write_address)
e = threading.Event()
def do_stop(*_):
e.set()
signal.signal(signal.SIGINT, do_stop)
signal.signal(signal.SIGTERM, do_stop)
server.start()
try:
e.wait()
except KeyboardInterrupt:
pass
server.stop()
_mp_processor = ... # EventProcessor
_mp_client = ... # EventClient
def _mp_initialize(proc: 'mtap.EventProcessor', events_address, config, log_queue):
global _mp_processor
global _mp_client
h = logging.handlers.QueueHandler(log_queue)
root = logging.getLogger()
root.addHandler(h)
root.setLevel(logging.DEBUG)
_config.Config(config)
_mp_processor = proc
_mp_client = data.EventsClient(address=events_address)
def _mp_call_process(event_id, event_service_instance_id, params):
global _mp_processor
global _mp_client
with _base.Processor.enter_context() as c, \
data.Event(event_id=event_id, event_service_instance_id=event_service_instance_id,
client=_mp_client) as event:
with _base.Processor.started_stopwatch('process_method'):
result = _mp_processor.process(event, params)
return result, c.times, event.created_indices
class MpProcessorRunner:
__slots__ = ('pool', 'metadata', 'processor_name', 'component_id', 'log_listener')
def __init__(self,
proc: 'mtap.EventProcessor',
processor_name: str,
component_id: 'Optional[str]' = None,
workers: 'Optional[int]' = 8,
events_address: 'Optional[Union[str, Sequence[str]]]' = None,
mp_context=None,
log_level=None):
if mp_context is None:
import multiprocessing as mp
mp_context = mp.get_context('spawn')
config = _config.Config()
log_queue = mp_context.Queue(-1)
handler = logging.StreamHandler()
log_level = 'INFO' if log_level is None else log_level
handler.setLevel(log_level)
self.log_listener = QueueListener(log_queue, handler)
self.log_listener.start()
self.pool = mp_context.Pool(workers,
initializer=_mp_initialize,
initargs=(proc, events_address, dict(config), log_queue))
self.metadata = proc.metadata
self.processor_name = processor_name
self.component_id = component_id or processor_name
def call_process(self,
event_id: str,
event_service_instance_id: str,
params: Optional[Dict[str, Any]]) -> Tuple[Dict, Dict, Dict]:
p = dict()
if params is not None:
p.update(params)
return self.pool.apply(_mp_call_process, args=(event_id, event_service_instance_id, p))
def close(self):
self.pool.terminate()
self.pool.join()
self.log_listener.stop()
[docs]def processor_parser() -> argparse.ArgumentParser:
"""An :class:`~argparse.ArgumentParser` that can be used to parse the settings for
:func:`run_processor`.
Returns:
~argparse.ArgumentParser: A parser containing server settings.
Examples:
Using this as a parent parser:
>>> parser = ArgumentParser(parents=[processor_parser()])
>>> parser.add_argument('--my-arg-1')
>>> parser.add_argument('--my-arg-2')
>>> args = parser.parse_args()
>>> processor = MyProcessor(args.my_arg_1, args.my_arg_2)
>>> run_processor(processor, args)
"""
processors_parser = argparse.ArgumentParser(add_help=False)
processors_parser.add_argument('--host', '--address', '-a', default="127.0.0.1", metavar="HOST",
help='the address to serve the service on')
processors_parser.add_argument('--port', '-p', type=int, default=0, metavar="PORT",
help='the port to serve the service on')
processors_parser.add_argument('--workers', '-w', type=int, default=10,
help='number of worker threads to handle requests')
processors_parser.add_argument('--register', '-r', action='store_true',
help='whether to register the service with the configured '
'service discovery')
processors_parser.add_argument("--mtap-config", default=None,
help="path to MTAP config file")
processors_parser.add_argument('--events-addresses', '--events-address', '--events', '-e',
default=None,
help='address of the events service to use, '
'omit to use discovery')
processors_parser.add_argument('--name', '-n',
help="Optional override service name, defaults to the processor annotation")
processors_parser.add_argument('--sid',
help="A unique identifier for this instance of the processor service.")
processors_parser.add_argument('--write-address', action='store_true',
help='If set, will write the server address ')
processors_parser.add_argument('--log-level', type=str, default='INFO',
help="Sets the python log level.")
processors_parser.add_argument('--grpc-enable-http-proxy', action='store_true',
help="If set, will enable usage of http_proxy by grpc.")
processors_parser.add_argument('--mp-spawn-method',
choices=['spawn', 'fork', 'forkserver', None],
help="A multiprocessing spawn method to use.")
return processors_parser
def _label_index_meta_to_proto(d, message):
message.name = d['name'] or ''
message.name_from_parameter = d['name_from_parameter'] or ''
message.optional = d.get('optional', False)
message.description = d['description'] or ''
for property_meta in d['properties']:
p = message.properties.add()
p.name = property_meta['name'] or ''
p.description = property_meta['description'] or ''
p.data_type = property_meta['data_type'] or ''
p.nullable = property_meta['nullable']
class _ProcessorServicer(processing_pb2_grpc.ProcessorServicer):
def __init__(self,
config: 'mtap.Config',
address: str,
sid: str,
runner: '_base.ProcessingComponent',
health_servicer: health.HealthServicer,
register: bool = False):
self.config = config
self.address = address
self.sid = sid
self._runner = runner
self.register = register
self.health_servicer = health_servicer
self._times_map = {}
self._deregister = None
self.processed = 0
self.failure_count = 0
def start(self, port: int):
self.health_servicer.set(self._runner, 'SERVING')
if self.register:
from mtap._discovery import Discovery
service_registration = Discovery(config=self.config)
self._deregister = service_registration.register_processor_service(self._runner.processor_name,
self.sid,
self.address,
port,
'v1')
def shutdown(self):
self.health_servicer.set(self._runner.processor_name, 'NOT_SERVING')
if self._deregister is not None:
self._deregister()
self._runner.close()
def Process(self, request, context=None):
event_id = request.event_id
event_service_instance_id = request.event_service_instance_id
logger.debug(
'%s received process request on event: (%s, %s)',
self._runner.processor_name, event_id, event_service_instance_id)
params = {}
_structs.copy_struct_to_dict(request.params, params)
try:
response = processing_pb2.ProcessResponse()
result, times, added_indices = self._runner.call_process(
event_id,
event_service_instance_id,
params
)
if result is not None:
_structs.copy_dict_to_struct(result, response.result, [])
_timing.add_times(self._times_map, times)
for k, l in times.items():
response.timing_info[k].FromTimedelta(l)
for document_name, l in added_indices.items():
for index_name in l:
created_index = response.created_indices.add()
created_index.document_name = document_name
created_index.index_name = index_name
return response
except Exception as e:
logger.error(str(e))
logger.error(traceback.format_exc())
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(e))
return empty_pb2.Empty()
def GetStats(self, request, context):
r = processing_pb2.GetStatsResponse(processed=self.processed,
failures=self.failure_count)
for k, v in _timing.create_timer_stats(self._times_map, self._runner.component_id).items():
ts = r.timing_stats[k]
ts.mean.FromTimedelta(v.mean)
ts.std.FromTimedelta(v.std)
ts.max.FromTimedelta(v.max)
ts.min.FromTimedelta(v.min)
ts.sum.FromTimedelta(v.sum)
return r
def GetInfo(self, request, context):
response = processing_pb2.GetInfoResponse()
_structs.copy_dict_to_struct(self._runner.metadata, response.metadata)
return response
[docs]class ProcessorServer:
"""Host a MTAP processor as a service.
Args:
proc (EventProcessor): The event processor to host.
host (str): The address / hostname / IP to host the server on.
port (int): The port to host the server on, or 0 to use a random port.
Keyword Args:
register (~typing.Optional[bool]): Whether to register the processor with service discovery.
events_address (~typing.Optional[str]):
The address of the events server, or omitted / None if the events service should be
discovered.
processor_name (~typing.Optional[str]):
The identifier to register the processor under, if omitted the processor name will be
used.
workers (~typing.Optional[int]):
The number of workers that should handle requests. Defaults to 10.
params (~typing.Optional[~typing.Mapping[str, ~typing.Any]):
A set of default parameters that will be passed to the processor every time it runs.
grpc_enable_http_proxy (bool):
Enables or disables the grpc channel to the event service using http_proxy or
https_proxy environment variables.
"""
def __init__(self,
runner: 'mtap.processing.ProcessingComponent',
host: str,
port: int = 0,
*,
sid: Optional[None] = None,
register: bool = False,
workers: Optional[int] = None,
write_address: bool = False,
config: 'Optional[mtap.Config]' = None):
self.host = host
self.processor_name = runner.processor_name
self.sid = sid or str(uuid.uuid4())
self.write_address = write_address
if config is None:
config = _config.Config()
self._health_servicer = health.HealthServicer()
self._health_servicer.set('', 'SERVING')
self._health_servicer.set(self.processor_name, 'SERVING')
self._servicer = _ProcessorServicer(
config=config,
address=host,
sid=self.sid,
runner=runner,
health_servicer=self._health_servicer,
register=register
)
workers = workers or 10
thread_pool = thread.ThreadPoolExecutor(max_workers=workers)
options = config.get("grpc.processor_options", {})
self._server = grpc.server(thread_pool, options=list(options.items()))
health_pb2_grpc.add_HealthServicer_to_server(self._health_servicer, self._server)
processing_pb2_grpc.add_ProcessorServicer_to_server(self._servicer, self._server)
self._port = self._server.add_insecure_port("{}:{}".format(self.host, port))
if port != 0 and self._port != port:
raise ValueError(f"Unable to bind on port {port}, likely in use.")
self._stopped_event = threading.Event()
self._address_file = None
@property
def port(self) -> int:
"""int: Port the hosted server is bound to.
"""
return self._port
[docs] def start(self):
"""Starts the service.
"""
self._server.start()
self._servicer.start(self.port)
if self.write_address:
self._address_file = utilities.write_address_file('{}:{}'.format(self.host, self.port), self.sid)
logger.info('Started processor server with name: "%s" on address: "%s:%d"',
self.processor_name, self.host, self.port)
[docs] def stop(self, *, grace: Optional[float] = None) -> threading.Event:
"""De-registers (if registered with service discovery) the service and immediately stops
accepting requests, completely stopping the service after a specified `grace` time.
During the grace period the server will continue to process existing requests, but it will
not accept any new requests. This function is idempotent, multiple calls will shutdown
the server after the soonest grace to expire, calling the shutdown event for all calls to
this function.
Keyword Args:
grace (~typing.Optional[float]):
The grace period that the server should continue processing requests for shutdown.
Returns:
threading.Event: A shutdown event for the server.
"""
print('Shutting down processor server with name: "{}" on address: "{}:{}"'.format(
self.processor_name, self.host, self.port))
if self._address_file is not None:
self._address_file.unlink()
self._servicer.shutdown()
return self._server.stop(grace=grace)