mtap¶
Command-Line Utility¶
Running Events Service¶
usage:
python -m mtap events [-h] [--address ADDRESS] [--port PORT]
[--workers WORKERS] [--register] [--config CONFIG]
optional arguments:
-h, --help show this help message and exit
--address ADDRESS, -a ADDRESS
the address to serve the service on
--port PORT, -p PORT the port to serve the service on
--workers WORKERS, -w WORKERS
number of worker threads to handle requests
--register, -r whether to register the service with the configured
service discovery
--config CONFIG, -c CONFIG
path to config file
API Documentation¶
Events service client, documents¶
- class mtap.EventsClient(address, channel_factory=None, _pool=None, _channel=None)[source]¶
A client object for interacting with the events service.
Normally, users shouldn’t have to use any of the methods on this object, as they are invoked by the globally distributed object classes of
Event
,Document
, andLabeler
.- Keyword Arguments:
Examples
>>> with EventsClient(address='localhost:50000') as client, \ >>> Event(event_id='1', client=client) as event: >>> document = event.create_document(document_name='plaintext', >>> text='The quick brown fox jumps over the lazy dog.')
- class mtap.Event(*, event_id=None, event_service_instance_id=None, client=None, only_create_new=False, default_adapters=None)[source]¶
An object for interacting with a specific event locally or on the events service.
The Event object functions as a map from string document names to
Document
objects that can be used to access document data from the events server.- Keyword Arguments:
event_id (Optional[str]) – A globally-unique identifier for the event, or omit / none for a random UUID.
client (Optional[EventsClient]) – A client for an events service to push any changes to the event to.
only_create_new (bool) – Fails if the event already exists on the events service.
Examples
>>> with EventsClient() as c, Event(event_id='id', client=c) as event: >>> # use event >>> ...
- property event_service_instance_id¶
The unique instance identifier for this event’s paired event service.
- Type:
- property documents¶
A mutable mapping of strings to
Document
objects that can be used to query and add documents to the event.- Type:
- property metadata¶
A mutable mapping of strings to strings that can be used to query and add metadata to the event.
- Type:
- property binaries¶
A mutable mapping of strings to bytes that can be used to query and add binary data to the event.
- Type:
- property created_indices¶
A mapping of document names to a list of the names of all the label indices that have been added to that document
- close()[source]¶
Closes this event. Lets the event service know that we are done with the event, allowing to clean up the event if no other clients have open leases to it.
- create_document(document_name, text)[source]¶
Adds a document to the event keyed by document_name and containing the specified text.
- Parameters:
- Returns:
The added document.
- Return type:
Examples
>>> event = Event() >>> document = event.create_document('plaintext', text="The text of the document.")
- add_document(document)[source]¶
Adds the document to this event, first uploading to events service if this event has a client connection to the events service.
- Parameters:
document (Document) – The document to add to this event.
Examples
>>> event = Event() >>> document = Document('plaintext', text="The text of the document.") >>> event.add_document(document)
- class mtap.Document(document_name, *, text=None, event=None, default_adapters=None)[source]¶
An object for interacting with text and labels stored on an
Event
.Documents are keyed by their name, and pipelines can store different pieces of related text on a single processing event using multiple documents. An example would be storing the text of one language on one document, and a translation on another, or storing the rtf or html encoding on one document, and the parsed plaintext on another document.
Both the document text and any added label indices are immutable. This is to enable parallelization and distribution of processing, and to prevent changes to the dependency graph of label indices and text, which can make debugging difficult.
- Parameters:
document_name (str) – The document name identifier.
- Keyword Arguments:
text (Optional[str]) – The document text, can be omitted if this is an existing document and text needs to be retrieved from the events service.
event (Optional[Event]) – The parent event of this document. If the event has a client, then that client will be used to share changes to this document with all other clients of the Events service. In that case, text should only be specified if it is the known existing text of the document.
Examples
Local document:
>>> document = Document('plaintext', text='Some document text.')
Existing distributed object:
>>> with EventsClient(address='localhost:8080') as client, \ >>> Event(event_id='1', client=client) as event: >>> document = event.documents['plaintext'] >>> document.text 'Some document text fetched from the server.'
New distributed object:
>>> with EventsClient(address='localhost:8080') as client, \ >>> Event(event_id='1', client=client) as event: >>> document = Document('plaintext', text='Some document text.') >>> event.add_document(document)
or
>>> with EventsClient(address='localhost:8080') as client, \ >>> Event(event_id='1', client=client) as event: >>> document = event.create_document('plaintext', text='Some document text.')
- property created_indices¶
A list of all of the label index names that have created on this document using a labeler either locally or by remote pipeline components invoked on this document.
- get_label_index(label_index_name)[source]¶
Gets the document’s label index with the specified key.
Will fetch from the events service if it is not cached locally if the document has an event with a client. Uses the label_adapter argument to perform unmarshalling from the proto message if specified.
- Parameters:
label_index_name (str) – The name of the label index to get.
- Returns:
The requested label index.
- Return type:
- get_labeler(label_index_name, *, distinct=None)[source]¶
Creates a function that can be used to add labels to a label index.
- Parameters:
- Returns:
A callable when used in conjunction with the ‘with’ keyword will automatically handle uploading any added labels to the server.
- Return type:
Examples
>>> with document.get_labeler('sentences', distinct=True) as labeler: >>> labeler(0, 25, sentence_type='STANDARD') >>> sentence = labeler(26, 34) >>> sentence.sentence_type = 'FRAGMENT'
- class mtap.data.Labeler(client, document, label_index_name, label_adapter)[source]¶
Object provided by
get_labeler()
which is responsible for adding labels to a label index on a document.- __call__(*args, **kwargs)[source]¶
Calls the constructor for the label type adding it to the list of labels to be uploaded.
- Parameters:
args – Arguments passed to the label type’s constructor.
kwargs – Keyword arguments passed to the label type’s constructor.
- Returns:
The object that was created by the label type’s constructor.
- Return type:
Examples
>>> labeler(0, 25, some_field='some_value', x=3) GenericLabel(start_index=0, end_index=25, some_field='some_value', x=3)
- class mtap.data.LabelIndexInfo(index_name, type)¶
Information about a label index contained on a document.
- type¶
The type of the label index.
- Type:
- class mtap.data.LabelIndexType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
The type of serialized labels contained in the label index.
- UNKNOWN = 0¶
Label index not set or type not known.
- GENERIC = 1¶
JSON / Generic Label index
- CUSTOM = 2¶
Other / custom protobuf label index
Labels¶
- class mtap.data.Label[source]¶
An abstract base class for a label of attributes on text.
- abstract property start_index¶
The index of the first character of the text covered by this label.
- Type:
- abstract property end_index¶
The index after the last character of the text covered by this label.
- Type:
- property location¶
A tuple of (start_index, end_index) used to perform sorting and comparison first based on start_index, then based on end_index.
- Type:
- property text¶
The slice of document text covered by this label. Will retrieve from events server if it is not cached locally.
- Type:
- class mtap.data.Location(start_index, end_index)[source]¶
A location in text, a tuple of (start_index, end_index).
Used to perform comparison of labels based on their locations.
- Parameters:
- covers(other)[source]¶
Whether the span of text covered by this label completely overlaps the span of text covered by the
other
label or location.
- relative_to(location)[source]¶
Creates a location relative to the the same origin as
location
and makes it relative tolocation
.- Parameters:
location (int or Location or Label) – A location to relativize this location to.
- Returns:
A copy with updated indices.
- Return type:
Examples
>>> sentence = Location(10, 20) >>> token = Location(10, 15) >>> token.relative_to(sentence) Location(start_index=0, end_index=5)
- offset_by(location)[source]¶
Creates a location by offsetting this location by an integer or the
start_index
of a location / label. Derelativizes this location.- Parameters:
location (int or Location or Label) – A location to offset this location by.
- Returns:
A copy with updated indices.
- Return type:
Examples
>>> sentence = Location(10, 20) >>> token_in_sentence = Location(0, 5) >>> token_in_sentence.offset_by(sentence) Location(start_index=10, end_index=15)
- mtap.label(start_index, end_index, *, document=None, **kwargs)[source]¶
An alias for
GenericLabel
.- Parameters:
start_index (int) – The index of the first character in text to be included in the label.
end_index (int) – The index after the last character in text to be included in the label.
document (Optional[Document]) – The parent document of the label. This will be automatically set if a the label is created via labeler.
**kwargs – Arbitrary, any other fields that should be added to the label, values must be json-serializable.
- class mtap.GenericLabel(start_index, end_index, *, identifier=None, document=None, label_index_name=None, fields=None, reference_field_ids=None, **kwargs)[source]¶
Default implementation of the Label class which uses a dictionary to store attributes.
Will be suitable for the majority of use cases for labels.
- Parameters:
- Keyword Arguments:
Examples
>>> pos_tag = pos_tag_labeler(0, 5) >>> pos_tag.tag = 'NNS' >>> pos_tag.tag 'NNS'
>>> pos_tag2 = pos_tag_labeler(6, 10, tag='VB') >>> pos_tag2.tag 'VB'
Label Indices¶
- mtap.label_index(labels, distinct=False, adapter=None)[source]¶
Creates a label index from labels.
- Parameters:
labels (List[L]) – Zero or more labels to create a label index from.
distinct (bool) – Whether the label index is distinct or not.
adapter (ProtoLabelAdapter) – The label adapter for these labels.
- Returns:
The newly created label index.
- Return type:
Examples
>>> from mtap import label_index, label >>> index = label_index([label(0, 5, x=1), ... label(0, 10, x=2), ... label(5, 10, x=3), ... label(7, 10, x=4), ... label(5, 15, x=5), ... label(10, 15, x=6)]) >>> index label_index([GenericLabel(0, 5, x=1), GenericLabel(0, 10, x=2), GenericLabel(5, 10, x=3), GenericLabel(5, 15, x=5), GenericLabel(7, 10, x=4), GenericLabel(10, 15, x=6)], distinct=False)
- class mtap.data.LabelIndex[source]¶
An immutable
Sequence
of labels ordered by their location in text. By default sorts by ascending start_index and then by ascending end_index.- abstract property distinct¶
Whether this label index is distinct, i.e. all of the labels in it are non-overlapping.
- Type:
- abstract filter(fn)[source]¶
Filters the label index according to a filter function.
This function is less efficient for filtering based on indices than
inside()
,covering()
, etc., which use a binary search method on the sorted index.
- abstract at(x, end=None)[source]¶
Returns the labels at the specified location in text.
- Parameters:
- Returns:
A view of this label index.
- Return type:
Examples
>>> from mtap import label_index, label >>> index = label_index([label(0, 10, x=1), ... label(0, 10, x=2), ... label(6, 20, x=3)]) >>> index.at(0, 10) label_index([GenericLabel(0, 10, x=1), GenericLabel(0, 10, x=2)], distinct=False)
- abstract covering(x, end=None)[source]¶
A label index containing all labels that cover / contain the specified location in text.
- Parameters:
- Returns:
A view of this label index.
- Return type:
Examples
>>> from mtap import label_index, label >>> index = label_index([label(0, 5, x=1), ... label(0, 10, x=2), ... label(5, 10, x=3), ... label(7, 10, x=4), ... label(5, 15, x=5), ... label(10, 15, x=6)]) >>> index.covering(5, 10) label_index([GenericLabel(0, 10, x=2), GenericLabel(5, 10, x=3), GenericLabel(5, 15, x=5)], distinct=False)
- abstract inside(x, end=None)[source]¶
A label index containing all labels that are inside the specified location in text.
- Parameters:
- Returns:
A view of this label index.
- Return type:
Examples
>>> from mtap import label_index, label >>> index = label_index([label(0, 5, x=1), ... label(0, 10, x=2), ... label(5, 10, x=3), ... label(7, 10, x=4), ... label(5, 15, x=5), ... label(10, 15, x=6)]) >>> index.inside(5, 10) label_index([GenericLabel(5, 10, x=3), GenericLabel(7, 10, x=4)], distinct=False)
- abstract beginning_inside(x, end=None)[source]¶
A label index containing all labels whose begin index is inside the specified location in text.
- Parameters:
- Returns:
A view of this label index.
- Return type:
Examples
>>> from mtap import label_index, label >>> index = label_index([label(0, 5, x=1), ... label(0, 10, x=2), ... label(5, 10, x=3), ... label(7, 10, x=4), ... label(5, 15, x=5), ... label(10, 15, x=6)]) >>> index.beginning_inside(6, 11) label_index([GenericLabel(7, 10, x=4), GenericLabel(10, 15, x=6)], distinct=False)
- abstract overlapping(x, end=None)[source]¶
Returns all labels that overlap the specified location in text.
- Parameters:
- Returns:
A view of this label index.
- Return type:
Examples
>>> from mtap import label_index, label >>> index = label_index([label(0, 5, x=1), ... label(0, 10, x=2), ... label(5, 10, x=3), ... label(7, 10, x=4), ... label(5, 15, x=5), ... label(10, 15, x=6)]) >>> index.overlapping(6, 10) label_index([GenericLabel(0, 10, x=2), GenericLabel(5, 10, x=3), GenericLabel(5, 15, x=5), GenericLabel(7, 10, x=4)], distinct=False)
- before(x)[source]¶
A label index containing all labels that are before a label’s location in text or an index in text.
- Parameters:
x (Union[Label', Location', float]) – A label or location whose start_index will be used, or a float index in text.
- Returns:
A view of this label index.
- Return type:
Examples
>>> from mtap import label_index, label >>> index = label_index([label(0, 5, x=1), ... label(0, 10, x=2), ... label(5, 10, x=3), ... label(7, 10, x=4), ... label(5, 15, x=5), ... label(10, 15, x=6)]) >>> index.before(6) label_index([GenericLabel(0, 5, x=1)], distinct=False)
- after(x)[source]¶
A label index containing all labels that are after a label’s location in text or an index in text.
- Parameters:
x (Union[Label, Location, float]) – A label or location whose end_index will be used, or a float index in text.
- Returns:
A view of this label index.
- Return type:
Examples
>>> from mtap import label_index, label >>> index = label_index([label(0, 5, x=1), ... label(0, 10, x=2), ... label(5, 10, x=3), ... label(7, 10, x=4), ... label(5, 15, x=5), ... label(10, 15, x=6)]) >>> index.after(6) label_index([GenericLabel(7, 10, x=4), GenericLabel(10, 15, x=6)], distinct=False)
- abstract ascending()[source]¶
This label index sorted according to ascending start and end index.
- Returns:
A view of this label index.
- Return type:
Examples
>>> from mtap import label_index, label >>> index = label_index([label(0, 5, x=1), ... label(0, 10, x=2), ... label(5, 10, x=3), ... label(7, 10, x=4), ... label(5, 15, x=5), ... label(10, 15, x=6)]) >>> index == index.ascending() True
- abstract descending()[source]¶
This label index sorted according to descending start index and ascending end index.
- Returns:
A view of this label index.
- Return type:
Examples
>>> from mtap import label_index, label >>> index = label_index([label(0, 5, x=1), ... label(0, 10, x=2), ... label(5, 10, x=3), ... label(7, 10, x=4), ... label(5, 15, x=5), ... label(10, 15, x=6)]) >>> index.descending() label_index([GenericLabel(10, 15, x=6), GenericLabel(7, 10, x=4), GenericLabel(5, 15, x=5), GenericLabel(5, 10, x=3), GenericLabel(0, 10, x=2), GenericLabel(0, 5, x=1)], distinct=False)
Custom Label Types¶
- class mtap.data.ProtoLabelAdapter[source]¶
Responsible for marshalling and unmarshalling of label objects to and from proto messages.
- abstract create_label(*args, **kwargs)[source]¶
Called by labelers to create labels.
Should include the positional arguments start_index and end_index, because those are required properties of labels.
- Parameters:
args – Arbitrary args used to create the label.
kwargs – Arbitrary keyword args used to create the label.
- Returns:
An object of the label type.
- Return type:
- abstract create_index_from_response(response)[source]¶
Creates a LabelIndex from the response from an events service.
- Parameters:
response (mtap.api.v1.events_pb2.GetLabelsResponse) – The response protobuf message from the events service.
- Returns:
A label index containing all the labels from the events service.
- Return type:
LabelIndex[L]
- abstract create_index(labels)[source]¶
Creates a LabelIndex from an iterable of label objects.
- Parameters:
labels (Iterable[L]) – Labels to put in index.
- Returns:
A label index containing all of the labels in the list.
- Return type:
LabelIndex[L]
- abstract add_to_message(labels, request)[source]¶
Adds a list of labels to a request to the event service to add labels.
- Parameters:
labels (Iterable[L]) – The list of labels that need to be sent to the server.
request (mtap.api.v1.events_pb2.AddLabelsRequest) – The request proto message to add the labels to.
- abstract pack(index, *, include_label_text=False)[source]¶
Prepares to serialize a label index by transforming the label index into a representation that can be dumped to json, yml, pickle, etc.
- Parameters:
index – The index itself.
include_label_text – Whether to include the label’s text in the serialized representation (for informative reasons).
- Returns:
A dictionary representation of the label index.
- abstract unpack(packed, label_index_name, *, document=None)[source]¶
Takes a packed, serializable object and turns it into a full label index.
- Parameters:
- Returns:
The label index.
- Return type:
LabelIndex[L]
Creating Processors¶
- class mtap.processing.Processor(*args, **kwargs)[source]¶
Mixin used by all processor abstract base classes that provides the ability to update serving status and use timers.
- update_serving_status(status)[source]¶
Updates the serving status of the processor for health checking.
- Parameters:
status (str) – One of “SERVING”, “NOT_SERVING”, “UNKNOWN”.
- property custom_label_adapters¶
Used to provide non-standard proto label adapters for specific index names.
Returns:
- static started_stopwatch(key)[source]¶
An object that can be used to time aspects of processing. The stopwatch will be started at creation.
- Parameters:
key (str) – The key to store the time under.
- Returns:
An object that is used to do the timing.
- Return type:
Examples
>>> # In a process method >>> with self.started_stopwatch('key'): >>> # do work >>> ...
- static unstarted_stopwatch(key)[source]¶
An object that can be used to time aspects of processing. The stopwatch will be stopped at creation.
- Parameters:
key (str) – The key to store the time under.
- Returns:
An object that is used to do the timing.
- Return type:
Examples
>>> # In a process method >>> with self.unstarted_stopwatch('key') as stopwatch: >>> for _ in range(10): >>> # work you don't want timed >>> ... >>> stopwatch.start() >>> # work you do want timed >>> ... >>> stopwatch.stop()
- class mtap.EventProcessor(*args, **kwargs)[source]¶
Abstract base class for an event processor.
Examples
>>> class ExampleProcessor(EventProcessor): ... def process(self, event, params): ... # do work on the event ... ...
- abstract process(event, params)[source]¶
Performs processing on an event, implemented by the subclass.
- Parameters:
- Returns:
An arbitrary dictionary of strings mapped to json-serializable values which will be returned to the caller, even remotely.
- Return type:
- default_adapters()[source]¶
Can be overridden to return a mapping from label index names to adapters that will then be used in any documents or events provided to this processor’s process methods.
- Returns:
The mapping (dict-like) of label index names to ProtoLabelAdapters to be used for those indices.
- Return type:
Mapping[str, ProtoLabelAdapter]
- class mtap.DocumentProcessor(*args, **kwargs)[source]¶
Abstract base class for a document processor.
Examples
>>> class ExampleProcessor(mtap.DocumentProcessor): ... def process(self, document, params): ... # do processing on document ... ...
>>> class ExampleProcessor(mtap.DocumentProcessor): ... def process(self, document, params): ... with self.started_stopwatch('key'): ... # use stopwatch on something ... ...
- class mtap.processing.Stopwatch(context=None, key=None)[source]¶
A class for timing runtime of components and returning the total runtime with the processor’s results.
Examples
>>> # in an EventProcessor or DocumentProcessor process method call >>> with self.started_stopwatch('key'): >>> timed_routine()
>>> # in an EventProcessor or DocumentProcessor process method call >>> with self.unstarted_stopwatch('key') as stopwatch: >>> for _ in range(10): >>> # work you don't want timed >>> ... >>> stopwatch.start() >>> # work you want timed >>> ... >>> stopwatch.stop()
Processor Description Decorators¶
- mtap.processor(name, *, human_name=None, description=None, parameters=None, inputs=None, outputs=None, **additional_metadata)[source]¶
Decorator which attaches a service name and metadata to a processor. Which then can be used for runtime reflection of how the processor works.
- Parameters:
name (str) –
Identifying service name both for launching via command line and for service registration.
Should be a mix of alphanumeric characters and dashes so that it plays nice with the DNS name requirements of service discovery tools like Consul. Can be overridden at runtime via the identifier option on
processor_parser()
.- Keyword Arguments:
human_name (Optional[str]) – An option human name for the processor.
description (Optional[str]) – A short description of the processor and what it does.
parameters (Optional[List[ParameterDescription]]) – The processor’s parameters.
inputs (Optional[List[str]]) –
String identifiers for the output from a processor that this processor uses as an input.
Takes the format “[processor-name]/[output]”. Examples would be “tagger:pos_tags” or “sentence-detector:sentences”.
outputs (Optional[List[LabelDescription]]) – The label indices this processor outputs.
**additional_metadata (Any) – Any other data that should be added to the processor’s metadata, should be serializable to yaml and json.
- Returns:
A decorator to be applied to instances of EventProcessor or DocumentProcessor. This decorator attaches the metadata so it can be reflected at runtime.
Examples
>>> from mtap.processing import EventProcessor >>> @processor('example-text-converter') >>> class TextConverter(EventProcessor): >>> ...
or
>>> from mtap.processing import DocumentProcessor >>> @processor('example-sentence-detector') >>> class SentenceDetector(DocumentProcessor): >>> ...
From our own example processor:
>>> from mtap.processing import DocumentProcessor >>> @processor('mtap-example-processor-python', >>> human_name="Python Example Processor", >>> description="counts the number of times the letters a and b occur in a document", >>> parameters=[ >>> parameter('do_work', required=True, data_type='bool', >>> description="Whether the processor should do anything.") >>> ], >>> outputs=[ >>> labels('mtap.examples.letter_counts', >>> properties=[label_property('letter', data_type='str'), >>> label_property('count', data_type='int')]) >>> ]) >>> class ExampleProcessor(DocumentProcessor): >>> ...
- mtap.processing.descriptions.labels(name, *, reference=None, optional=False, name_from_parameter=None, description=None, properties=None)[source]¶
A description for a label type.
- Parameters:
name (str) – The label index name.
- Keyword Arguments:
reference (Optional[str]) – If this is an output of another processor, that processor’s name followed by a slash and the default output name of the index go here. Example: “sentence-detector/sentences”.
optional (bool) – Whether this label index is an optional input or output.
name_from_parameter (Optional[str]) – If the label index gets its name from a processor parameter, the name of the parameter.
description (Optional[str]) – A short description of the label index.
properties (Optional[List[label_property]]) – The properties of the labels in the label index.
- mtap.processing.descriptions.reference¶
If this is an output of another processor, that processor’s name followed by a slash and the default output name of the index go here. Example: “sentence-detector/sentences”.
- mtap.processing.descriptions.optional¶
Whether this label index is an optional input or output.
- Type:
- mtap.processing.descriptions.name_from_parameter¶
If the label index gets its name from a processor parameter, the name of the parameter.
- mtap.processing.descriptions.description¶
A short description of the label index.
Running Services¶
- mtap.processor_parser()[source]¶
An
ArgumentParser
that can be used to parse the settings forrun_processor()
.- Returns:
A parser containing server settings.
- Return type:
Examples
Using this as a parent parser:
>>> parser = ArgumentParser(parents=[processor_parser()]) >>> parser.add_argument('--my-arg-1') >>> parser.add_argument('--my-arg-2') >>> args = parser.parse_args() >>> processor = MyProcessor(args.my_arg_1, args.my_arg_2) >>> run_processor(processor, args)
- mtap.run_processor(proc, *, mp=False, options=None, args=None, mp_context=None)[source]¶
Runs the processor as a GRPC service, blocking until an interrupt signal is received.
- Parameters:
proc (EventProcessor) – The processor to host.
mp (bool) – If true, will create instances of
proc
on multiple forked processes to process events. This is useful if the processor is computationally intensive and would run into Python GIL issues on a single process.options (Optional[Namespace]) – The parsed arguments from the parser returned by
processor_parser()
.args (Optional[Sequence[str]]) – Arguments to parse server settings from if
namespace
was not supplied.mp_context – A multiprocessing context that gets passed to the process pool executor in the case of mp = True.
Examples
Will automatically parse arguments:
>>> run_processor(MyProcessor())
Manual arguments:
>>> run_processor(MyProcessor(), args=['-p', '8080'])
- class mtap.EventsServer(host, *, port=0, register=False, workers=10, sid=None, write_address=False, config=None)[source]¶
Server which hosts events.
- Parameters:
host (str) – The address / hostname / IP to host the server on.
port (int) – The port to host the server on.
register (bool) – Whether to register the service with service discovery.
workers (int) – The number of workers that should handle requests.
write_address (bool) – Whether to write the events service address to a file.
config (mtap.Config) – An optional mtap config.
- stop(*, grace=None)[source]¶
De-registers (if registered with service discovery) the service and immediately stops accepting requests, completely stopping the service after a specified grace time.
During the grace period the server will continue to process existing requests, but it will not accept any new requests. This function is idempotent, multiple calls will shutdown the server after the soonest grace to expire, calling the shutdown event for all calls to this function.
- class mtap.ProcessorServer(runner, host, port=0, *, sid=None, register=False, workers=None, write_address=False, config=None)[source]¶
Host a MTAP processor as a service.
- Parameters:
proc (EventProcessor) – The event processor to host.
host (str) – The address / hostname / IP to host the server on.
port (int) – The port to host the server on, or 0 to use a random port.
- Keyword Arguments:
register (Optional[bool]) – Whether to register the processor with service discovery.
events_address (Optional[str]) – The address of the events server, or omitted / None if the events service should be discovered.
processor_name (Optional[str]) – The identifier to register the processor under, if omitted the processor name will be used.
workers (Optional[int]) – The number of workers that should handle requests. Defaults to 10.
params (Optional[Mapping[str, Any]) – A set of default parameters that will be passed to the processor every time it runs.
grpc_enable_http_proxy (bool) – Enables or disables the grpc channel to the event service using http_proxy or https_proxy environment variables.
- stop(*, grace=None)[source]¶
De-registers (if registered with service discovery) the service and immediately stops accepting requests, completely stopping the service after a specified grace time.
During the grace period the server will continue to process existing requests, but it will not accept any new requests. This function is idempotent, multiple calls will shutdown the server after the soonest grace to expire, calling the shutdown event for all calls to this function.
Running a pipeline¶
- class mtap.Pipeline(*components, name=None, events_address=None, events_client=None, mp_config=None)[source]¶
An object which can be used to build and run a pipeline of remote and local processors.
Pipelines are a
MutableSequence
containing one or moreComponentDescriptor
, a pipeline can be modified after creation using this functionality.- Parameters:
*components (ComponentDescriptor) – A list of component descriptors created using
RemoteProcessor
orLocalProcessor
.- Keyword Arguments:
Examples
Remote pipeline with name discovery:
>>> with mtap.Pipeline( >>> RemoteProcessor('processor-1-id'), >>> RemoteProcessor('processor-2-id'), >>> RemoteProcessor('processor-3-id') >>> ) as pipeline: >>> client = pipeline.events_client >>> for txt in txts: >>> with Event(client=client) as event: >>> document = event.add_document('plaintext', txt) >>> results = pipeline.run(document)
Remote pipeline using addresses:
>>> with mtap.Pipeline( >>> RemoteProcessor('processor-1-name', address='localhost:50052'), >>> RemoteProcessor('processor-2-id', address='localhost:50053'), >>> RemoteProcessor('processor-3-id', address='localhost:50054'), >>> events_address='localhost:50051' >>> ) as pipeline: >>> client = pipeline.events_client >>> for txt in txts: >>> with Event(client=client) as event: >>> document = event.add_document('plaintext', txt) >>> results = pipeline.run(document)
Modifying pipeline
>>> pipeline = Pipeline(RemoteProcessor('foo', address='localhost:50000'), >>> RemoteProcessor('bar', address='localhost:50000')) >>> pipeline Pipeline(RemoteProcessor(processor_name='foo', address='localhost:50000', component_id=None, params=None), RemoteProcessor(processor_name='bar', address='localhost:50000', component_id=None, params=None)) >>> pipeline.append(RemoteProcessor('baz', address='localhost:50001')) >>> pipeline Pipeline(RemoteProcessor(processor_name='foo', address='localhost:50000', component_id=None, params=None), RemoteProcessor(processor_name='bar', address='localhost:50000', component_id=None, params=None), RemoteProcessor(processor_name='baz', address='localhost:50001', component_id=None, params=None)) >>> del pipeline[1] >>> pipeline Pipeline(RemoteProcessor(processor_name='foo', address='localhost:50000', component_id=None, params=None), RemoteProcessor(processor_name='baz', address='localhost:50001', component_id=None, params=None)) >>> pipeline[1] = RemoteProcessor(processor_name='bar', address='localhost:50003') >>> pipeline Pipeline(RemoteProcessor(processor_name='foo', address='localhost:50000', component_id=None, params=None), RemoteProcessor(processor_name='bar', address='localhost:50003', component_id=None, params=None)) >>> pipeline += list(pipeline) # Putting in a new list to prevent an infinite recursion >>> pipeline Pipeline(RemoteProcessor(processor_name='foo', address='localhost:50000', component_id=None, params=None), RemoteProcessor(processor_name='bar', address='localhost:50003', component_id=None, params=None), RemoteProcessor(processor_name='foo', address='localhost:50000', component_id=None, params=None), RemoteProcessor(processor_name='bar', address='localhost:50003', component_id=None, params=None))
- static from_yaml_file(conf_path)[source]¶
Creates a pipeline from a yaml pipeline configuration file.
- Parameters:
conf_path (str or pathlib.Path) – The path to the configuration file.
- Returns:
Pipeline object from the configuration.
- static load_configuration(conf)[source]¶
Creates a pipeline from a pipeline configuration dictionary.
- Parameters:
conf (Dict) – The pipeline configuration dictionary.
- Returns:
Pipeline created from the configuration.
- run_multithread(source, *, params=None, show_progress=None, total=None, close_events=None, max_failures=None, workers=None, read_ahead=None, mp_context=None, log_level=None)[source]¶
Runs this pipeline on a source which provides multiple documents / events.
Concurrency is per-event, with each event being provided a thread which runs it through the pipeline.
- Parameters:
source (Union[Iterable[Union[Event, Document]], ProcessingSource]) – A generator of events or documents to process. This should be an
Iterable
of eitherEvent
orDocument
objects or aProcessingSource
.params (Optional[dict[str, Any]]) – Json object containing params specific to processing this event, the existing params dictionary defined in
add_processor()
will be updated with the contents of this dict.show_progress (Optional[bool]) – Whether to print a progress bar using tqdm.
total (Optional[int]) – An optional argument indicating the total number of events / documents that will be provided by the iterable, for the progress bar.
close_events (Optional[bool]) – Whether the pipeline should close events after they have been fully processed through all components.
max_failures (Optional[int]) – The number of acceptable failures. Once this amount is exceeded processing will halt. Note that because of the nature of conccurrency processing may continue for a short amount of time before termination.
workers (Optional[int]) – The number of threads to process documents on.
read_ahead (Optional[int]) – The number of source documents to read ahead into memory before processing.
mp_context (multiprocessing context, optional) – An optional override for the multiprocessing context.
(~typing.Optional[~typing.Union[str (log_level) – The log_level for running the pipeline.
int]] – The log_level for running the pipeline.
Examples
>>> docs = list(pathlib.Path('abc/').glob('*.txt')) >>> def document_source(): >>> for path in docs: >>> with path.open('r') as f: >>> txt = f.read() >>> with Event(event_id=path.name, client=pipeline.events_client) as event: >>> doc = event.create_document('plaintext', txt) >>> yield doc >>> >>> pipeline.run_multithread(document_source(), total=len(docs))
- run(target, *, params=None)[source]¶
Processes the event/document using all the processors in the pipeline.
- Parameters:
- Returns:
The results of all the processors in the pipeline.
- Return type:
Examples
>>> e = mtap.Event() >>> document = mtap.Document('plaintext', text="...", event=e) >>> with Pipeline(...) as pipeline: >>> pipeline.run(document) >>> # is equivalent to pipeline.run(document.event, params={'document_name': document.document_name})
The ‘document_name’ param is used to indicate to
DocumentProcessor
which document on the event to process.
- pipeline_timer_stats()[source]¶
The aggregated statistics for the global runtime of the pipeline.
- Returns:
The timing stats for the global runtime of the pipeline.
- Return type:
- class mtap.processing.ComponentDescriptor[source]¶
A configuration which describes either a local or remote pipeline component and what the pipeline needs to do to call the component.
- class mtap.RemoteProcessor(processor_name, *, address=None, component_id=None, params=None, enable_proxy=False)[source]¶
A configuration for a remote processor that the pipeline will connect to in order to perform processing.
- Parameters:
processor_name (str) – The identifier used for health checking and discovery.
address (Optional[str]) – Optionally an address to use, will use service discovery configuration to locate processors if this is None / omitted.
component_id (Optional[str]) – How the processor’s results will be identified locally. Will be modified to be unique if it is not unique relative to other component in a pipeline.
params (Optional[dict, Any]) – An optional parameter dictionary that will be passed to the processor as parameters with every event or document processed. Values should be json-serializable.
- address¶
Optionally an address to use, will use service discovery configuration to locate processors if this is None / omitted.
- component_id¶
How the processor’s results will be identified locally. Will be modified to be unique if it is not unique relative to other component in a pipeline.
- class mtap.LocalProcessor(proc, *, component_id=None, params=None)[source]¶
A configuration of a locally-invoked processor.
- Parameters:
proc (EventProcessor) – The processor instance to run with the pipeline.
component_id (str) – How the processor’s results will be identified locally. Will be modified to be unique if it is not unique relative to other component in a pipeline.
params (Optional[dict, Any]) – An optional parameter dictionary that will be passed to the processor as parameters with every event or document processed. Values should be json-serializable.
- proc¶
The processor instance to run with the pipeline.
- Type:
- component_id¶
How the processor’s results will be identified locally. Will be modified to be unique if it is not unique relative to other component in a pipeline.
- class mtap.processing.ProcessingSource[source]¶
Provides events or documents for the multi-threaded pipeline runner. Also has functionality for receiving results.
- property total¶
The total number of documents this source will provide.
- Returns:
str or None count of the total events or None if not known.
- abstract provide(consume)[source]¶
The method which provides documents for the multi-threaded runner. This method provides documents or events to the pipeline.
- Parameters:
consume (Callable[Union[Document, Event]]) – The consumer method to pass documents or events to process.
Examples
Example implementation for processing text documents in a directory.
>>> ... >>> def provide(self, consume: Callable[[Union[Document, Event]], None]): >>> for file in Path(".").glob("*.txt"): >>> with file.open('r') as fio: >>> txt = fio.read() >>> event = Event() >>> doc = event.create_document('plaintext', txt) >>> consume(doc)
- receive_result(result, event)[source]¶
Optional method: Asynchronous callback which returns the results of processing. This method is called on a processing worker thread. Default behavior is to do nothing.
- Parameters:
result (PipelineResult) – The result of processing using the pipeline.
event (Event) – The event processed.
- class mtap.processing.PipelineResult(component_results, elapsed_time)[source]¶
The result of processing an event or document in a pipeline.
- Parameters:
component_results (List[ProcessingResult]) – The processing results for each individual component
elapsed_time (timedelta) – The elapsed time for the entire pipeline.
- component_results¶
The processing results for each individual component
- Type:
List[ProcessingResult]
- class mtap.processing.ProcessingResult(identifier, result_dict, timing_info, created_indices)[source]¶
The result of processing one document or event.
- result_dict¶
The json object returned by the processor as its results.
- Type:
Dict
- timing_info¶
A dictionary of the times taken processing this document.
- Type:
Dict
- class mtap.processing.TimerStats(mean, std, min, max, sum)[source]¶
Statistics about a specific keyed measured duration recorded by a
Stopwatch
.
Configuration¶
- class mtap.Config(*args)[source]¶
The MTAP configuration dictionary.
By default configuration is loaded from one of a number of locations in the following priority:
A file at the path of the ‘–config’ parameter passed into main methods.
A file at the path of the ‘MTAP_CONFIG’ environment variable
$PWD/mtapConfig.yml
$HOME/.mtap/mtapConfig.yml’
/etc/mtap/mtapConfig.yml
MTAP components will use a global shared configuration object, by entering the context of a config object using “with”, all of the MTAP functions called on that thread will make use of that config object.
Examples
>>> with mtap.Config() as config: >>> config['key'] = 'value' >>> # other MTAP methods in this >>> # block will use the updated config object.