mtap

Command-Line Utility

Running Events Service

usage:

python -m mtap events [-h] [--address ADDRESS] [--port PORT]
                         [--workers WORKERS] [--register] [--config CONFIG]

optional arguments:
 -h, --help            show this help message and exit
 --address ADDRESS, -a ADDRESS
                       the address to serve the service on
 --port PORT, -p PORT  the port to serve the service on
 --workers WORKERS, -w WORKERS
                       number of worker threads to handle requests
 --register, -r        whether to register the service with the configured
                       service discovery
 --config CONFIG, -c CONFIG
                       path to config file

API Documentation

Events service client, documents

class mtap.EventsClient(address, channel_factory=None, _pool=None, _channel=None)[source]

A client object for interacting with the events service.

Normally, users shouldn’t have to use any of the methods on this object, as they are invoked by the globally distributed object classes of Event, Document, and Labeler.

Keyword Arguments:
  • address (Optional[str]) – The events service target e.g. ‘localhost:9090’ or omit/None to use service discovery.

  • stub (Optional[EventsStub]) – An existing events service client gRPC stub to use.

Examples

>>> with EventsClient(address='localhost:50000') as client, \
>>>      Event(event_id='1', client=client) as event:
>>>     document = event.create_document(document_name='plaintext',
>>>                                      text='The quick brown fox jumps over the lazy dog.')
class mtap.Event(*, event_id=None, event_service_instance_id=None, client=None, only_create_new=False, default_adapters=None)[source]

An object for interacting with a specific event locally or on the events service.

The Event object functions as a map from string document names to Document objects that can be used to access document data from the events server.

Keyword Arguments:
  • event_id (Optional[str]) – A globally-unique identifier for the event, or omit / none for a random UUID.

  • client (Optional[EventsClient]) – A client for an events service to push any changes to the event to.

  • only_create_new (bool) – Fails if the event already exists on the events service.

Examples

>>> with EventsClient() as c, Event(event_id='id', client=c) as event:
>>>     # use event
>>>     ...
property event_id

The globally unique identifier for this event.

Type:

str

property event_service_instance_id

The unique instance identifier for this event’s paired event service.

Type:

str

property documents

A mutable mapping of strings to Document objects that can be used to query and add documents to the event.

Type:

MutableMapping[str, Document]

property metadata

A mutable mapping of strings to strings that can be used to query and add metadata to the event.

Type:

MutableMapping[str, str]

property binaries

A mutable mapping of strings to bytes that can be used to query and add binary data to the event.

Type:

MutableMapping[str, str]

property created_indices

A mapping of document names to a list of the names of all the label indices that have been added to that document

Type:

Dict[str, List[str]]

close()[source]

Closes this event. Lets the event service know that we are done with the event, allowing to clean up the event if no other clients have open leases to it.

create_document(document_name, text)[source]

Adds a document to the event keyed by document_name and containing the specified text.

Parameters:
  • document_name (str) – The event-unique identifier for the document, example: ‘plaintext’.

  • text (str) – The content of the document. This is a required field, document text is final and immutable, as changing the text would very likely invalidate any labels on the document.

Returns:

The added document.

Return type:

Document

Examples

>>> event = Event()
>>> document = event.create_document('plaintext', text="The text of the document.")
add_document(document)[source]

Adds the document to this event, first uploading to events service if this event has a client connection to the events service.

Parameters:

document (Document) – The document to add to this event.

Examples

>>> event = Event()
>>> document = Document('plaintext', text="The text of the document.")
>>> event.add_document(document)
class mtap.Document(document_name, *, text=None, event=None, default_adapters=None)[source]

An object for interacting with text and labels stored on an Event.

Documents are keyed by their name, and pipelines can store different pieces of related text on a single processing event using multiple documents. An example would be storing the text of one language on one document, and a translation on another, or storing the rtf or html encoding on one document, and the parsed plaintext on another document.

Both the document text and any added label indices are immutable. This is to enable parallelization and distribution of processing, and to prevent changes to the dependency graph of label indices and text, which can make debugging difficult.

Parameters:

document_name (str) – The document name identifier.

Keyword Arguments:
  • text (Optional[str]) – The document text, can be omitted if this is an existing document and text needs to be retrieved from the events service.

  • event (Optional[Event]) – The parent event of this document. If the event has a client, then that client will be used to share changes to this document with all other clients of the Events service. In that case, text should only be specified if it is the known existing text of the document.

Examples

Local document:

>>> document = Document('plaintext', text='Some document text.')

Existing distributed object:

>>> with EventsClient(address='localhost:8080') as client, \
>>>      Event(event_id='1', client=client) as event:
>>>     document = event.documents['plaintext']
>>>     document.text
'Some document text fetched from the server.'

New distributed object:

>>> with EventsClient(address='localhost:8080') as client, \
>>>      Event(event_id='1', client=client) as event:
>>>     document = Document('plaintext', text='Some document text.')
>>>     event.add_document(document)

or

>>> with EventsClient(address='localhost:8080') as client, \
>>>      Event(event_id='1', client=client) as event:
>>>     document = event.create_document('plaintext', text='Some document text.')
property event

The parent event of this document.

Type:

Event

property document_name

The unique identifier for this document on the event.

Type:

str

property text

The document text.

Type:

str

property created_indices

A list of all of the label index names that have created on this document using a labeler either locally or by remote pipeline components invoked on this document.

Type:

List[str]

get_label_index(label_index_name)[source]

Gets the document’s label index with the specified key.

Will fetch from the events service if it is not cached locally if the document has an event with a client. Uses the label_adapter argument to perform unmarshalling from the proto message if specified.

Parameters:

label_index_name (str) – The name of the label index to get.

Returns:

The requested label index.

Return type:

LabelIndex

get_labeler(label_index_name, *, distinct=None)[source]

Creates a function that can be used to add labels to a label index.

Parameters:
  • label_index_name (str) – A document-unique identifier for the label index to be created.

  • distinct (Optional[bool]) – Optional, if using generic labels, whether to use distinct generic labels or non-distinct generic labels, will default to False.

Returns:

A callable when used in conjunction with the ‘with’ keyword will automatically handle uploading any added labels to the server.

Return type:

Labeler

Examples

>>> with document.get_labeler('sentences', distinct=True) as labeler:
>>>     labeler(0, 25, sentence_type='STANDARD')
>>>     sentence = labeler(26, 34)
>>>     sentence.sentence_type = 'FRAGMENT'
add_labels(label_index_name, labels, *, distinct=None, label_adapter=None)[source]

Skips using a labeler and adds the sequence of labels as a new label index.

Parameters:
  • label_index_name (str) – The name of the label index.

  • labels (Sequence[Label]) – The labels to add.

  • distinct (Optional[bool]) – Whether the index is distinct or non-distinct.

  • label_adapter (mtap.label_adapters.ProtoLabelAdapter) – A label adapter to use.

Returns:

The new label index created from the labels.

Return type:

LabelIndex

class mtap.data.Labeler(client, document, label_index_name, label_adapter)[source]

Object provided by get_labeler() which is responsible for adding labels to a label index on a document.

__call__(*args, **kwargs)[source]

Calls the constructor for the label type adding it to the list of labels to be uploaded.

Parameters:
  • args – Arguments passed to the label type’s constructor.

  • kwargs – Keyword arguments passed to the label type’s constructor.

Returns:

The object that was created by the label type’s constructor.

Return type:

Label

Examples

>>> labeler(0, 25, some_field='some_value', x=3)
GenericLabel(start_index=0, end_index=25, some_field='some_value', x=3)
done()[source]

Finalizes the label index, uploads the added labels to the events service.

Normally called automatically on exit from a context manager block, but can be manually invoked if the labeler is not used in a context manager block.

class mtap.data.LabelIndexInfo(index_name, type)

Information about a label index contained on a document.

index_name

The name of the label index.

Type:

str

type

The type of the label index.

Type:

LabelIndexType

class mtap.data.LabelIndexType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

The type of serialized labels contained in the label index.

UNKNOWN = 0

Label index not set or type not known.

GENERIC = 1

JSON / Generic Label index

CUSTOM = 2

Other / custom protobuf label index

Labels

class mtap.data.Label[source]

An abstract base class for a label of attributes on text.

abstract property document

The parent document this label appears on.

Type:

Document

abstract property label_index_name

The label index this label appears on.

Type:

str

abstract property identifier

The index of the label within its label index.

Type:

int

abstract property start_index

The index of the first character of the text covered by this label.

Type:

int

abstract property end_index

The index after the last character of the text covered by this label.

Type:

int

property location

A tuple of (start_index, end_index) used to perform sorting and comparison first based on start_index, then based on end_index.

Type:

Location

property text

The slice of document text covered by this label. Will retrieve from events server if it is not cached locally.

Type:

str

abstract shallow_fields_equal(other)[source]

Tests if the fields on this label and locations of references are the same as another label.

Parameters:

other – The other label to test.

Returns:

True if all of the fields are equal and the references are at the same locations.

class mtap.data.Location(start_index, end_index)[source]

A location in text, a tuple of (start_index, end_index).

Used to perform comparison of labels based on their locations.

Parameters:
  • start_index (float) – The start index inclusive of the location in text.

  • end_index (float) – The end index exclusive of the location in text.

start_index

The start index inclusive of the location in text.

Type:

float

end_index

The end index exclusive of the location in text.

Type:

float

covers(other)[source]

Whether the span of text covered by this label completely overlaps the span of text covered by the other label or location.

Parameters:

other (Union[Location, Label]) – A location or label to compare against.

Returns:

True if other is completely overlapped/covered False otherwise.

Return type:

bool

relative_to(location)[source]

Creates a location relative to the the same origin as location and makes it relative to location.

Parameters:

location (int or Location or Label) – A location to relativize this location to.

Returns:

A copy with updated indices.

Return type:

Location

Examples

>>> sentence = Location(10, 20)
>>> token = Location(10, 15)
>>> token.relative_to(sentence)
Location(start_index=0, end_index=5)
offset_by(location)[source]

Creates a location by offsetting this location by an integer or the start_index of a location / label. Derelativizes this location.

Parameters:

location (int or Location or Label) – A location to offset this location by.

Returns:

A copy with updated indices.

Return type:

Location

Examples

>>> sentence = Location(10, 20)
>>> token_in_sentence = Location(0, 5)
>>> token_in_sentence.offset_by(sentence)
Location(start_index=10, end_index=15)
mtap.label(start_index, end_index, *, document=None, **kwargs)[source]

An alias for GenericLabel.

Parameters:
  • start_index (int) – The index of the first character in text to be included in the label.

  • end_index (int) – The index after the last character in text to be included in the label.

  • document (Optional[Document]) – The parent document of the label. This will be automatically set if a the label is created via labeler.

  • **kwargs – Arbitrary, any other fields that should be added to the label, values must be json-serializable.

class mtap.GenericLabel(start_index, end_index, *, identifier=None, document=None, label_index_name=None, fields=None, reference_field_ids=None, **kwargs)[source]

Default implementation of the Label class which uses a dictionary to store attributes.

Will be suitable for the majority of use cases for labels.

Parameters:
  • start_index (int) – The index of the first character in text to be included in the label.

  • end_index (int) – The index after the last character in text to be included in the label.

Keyword Arguments:
  • document (Optional[Document]) – The parent document of the label. This will be automatically set if a the label is created via labeler.

  • **kwargs – Arbitrary, any other fields that should be added to the label, values must be json-serializable.

Examples

>>> pos_tag = pos_tag_labeler(0, 5)
>>> pos_tag.tag = 'NNS'
>>> pos_tag.tag
'NNS'
>>> pos_tag2 = pos_tag_labeler(6, 10, tag='VB')
>>> pos_tag2.tag
'VB'
property document

The parent document this label appears on.

Type:

Document

property label_index_name

The label index this label appears on.

Type:

str

property identifier

The index of the label within its label index.

Type:

int

property start_index

The index of the first character of the text covered by this label.

Type:

int

property end_index

The index after the last character of the text covered by this label.

Type:

int

shallow_fields_equal(other)[source]

Tests if the fields on this label and locations of references are the same as another label.

Parameters:

other – The other label to test.

Returns:

True if all of the fields are equal and the references are at the same locations.

Label Indices

mtap.label_index(labels, distinct=False, adapter=None)[source]

Creates a label index from labels.

Parameters:
  • labels (List[L]) – Zero or more labels to create a label index from.

  • distinct (bool) – Whether the label index is distinct or not.

  • adapter (ProtoLabelAdapter) – The label adapter for these labels.

Returns:

The newly created label index.

Return type:

LabelIndex

Examples

>>> from mtap import label_index, label
>>> index = label_index([label(0, 5, x=1),
...                      label(0, 10, x=2),
...                      label(5, 10, x=3),
...                      label(7, 10, x=4),
...                      label(5, 15, x=5),
...                      label(10, 15, x=6)])
>>> index
label_index([GenericLabel(0, 5, x=1), GenericLabel(0, 10, x=2), GenericLabel(5, 10, x=3),
             GenericLabel(5, 15, x=5), GenericLabel(7, 10, x=4), GenericLabel(10, 15, x=6)], distinct=False)
class mtap.data.LabelIndex[source]

An immutable Sequence of labels ordered by their location in text. By default sorts by ascending start_index and then by ascending end_index.

abstract property distinct

Whether this label index is distinct, i.e. all of the labels in it are non-overlapping.

Type:

bool

abstract filter(fn)[source]

Filters the label index according to a filter function.

This function is less efficient for filtering based on indices than inside(), covering(), etc., which use a binary search method on the sorted index.

Parameters:

fn (Callable[[Label], bool]) – A filter function, returns true if the label should be included, false if it should not be included

Returns:

A view of this label index.

Return type:

LabelIndex

abstract at(x, end=None)[source]

Returns the labels at the specified location in text.

Parameters:
  • x (Union[Label, Location, float]) – A label or location, or start index if end is specified.

  • end (Optional[float]) – The exclusive end index of the location in text if it has not been specified by a label.

Returns:

A view of this label index.

Return type:

LabelIndex

Examples

>>> from mtap import label_index, label
>>> index = label_index([label(0, 10, x=1),
...                      label(0, 10, x=2),
...                      label(6, 20, x=3)])
>>> index.at(0, 10)
label_index([GenericLabel(0, 10, x=1), GenericLabel(0, 10, x=2)], distinct=False)
abstract covering(x, end=None)[source]

A label index containing all labels that cover / contain the specified location in text.

Parameters:
  • x (Union[Label', Location', float]) – A label or location, or start index if end is specified.

  • end (Optional[float]) – The exclusive end index of the location in text if it has not been specified by a label.

Returns:

A view of this label index.

Return type:

LabelIndex

Examples

>>> from mtap import label_index, label
>>> index = label_index([label(0, 5, x=1),
...                      label(0, 10, x=2),
...                      label(5, 10, x=3),
...                      label(7, 10, x=4),
...                      label(5, 15, x=5),
...                      label(10, 15, x=6)])
>>> index.covering(5, 10)
label_index([GenericLabel(0, 10, x=2), GenericLabel(5, 10, x=3), GenericLabel(5, 15, x=5)], distinct=False)
abstract inside(x, end=None)[source]

A label index containing all labels that are inside the specified location in text.

Parameters:
  • x (Union[Label', Location', float]) – A label or location, or start index if end is specified.

  • end (Optional[float]) – The exclusive end index of the location in text if it has not been specified by a label.

Returns:

A view of this label index.

Return type:

LabelIndex

Examples

>>> from mtap import label_index, label
>>> index = label_index([label(0, 5, x=1),
...                      label(0, 10, x=2),
...                      label(5, 10, x=3),
...                      label(7, 10, x=4),
...                      label(5, 15, x=5),
...                      label(10, 15, x=6)])
>>> index.inside(5, 10)
label_index([GenericLabel(5, 10, x=3), GenericLabel(7, 10, x=4)], distinct=False)
abstract beginning_inside(x, end=None)[source]

A label index containing all labels whose begin index is inside the specified location in text.

Parameters:
  • x (Union[Label', Location', float]) – A label or location, or start index if end is specified.

  • end (Optional[float]) – The exclusive end index of the location in text if it has not been specified by a label.

Returns:

A view of this label index.

Return type:

LabelIndex

Examples

>>> from mtap import label_index, label
>>> index = label_index([label(0, 5, x=1),
...                      label(0, 10, x=2),
...                      label(5, 10, x=3),
...                      label(7, 10, x=4),
...                      label(5, 15, x=5),
...                      label(10, 15, x=6)])
>>> index.beginning_inside(6, 11)
label_index([GenericLabel(7, 10, x=4), GenericLabel(10, 15, x=6)], distinct=False)
abstract overlapping(x, end=None)[source]

Returns all labels that overlap the specified location in text.

Parameters:
  • x (Union[Label', Location', float]) – A label or location, or start index if end is specified.

  • end (Optional[float]) – The exclusive end index of the location in text if it has not been specified by a label.

Returns:

A view of this label index.

Return type:

LabelIndex

Examples

>>> from mtap import label_index, label
>>> index = label_index([label(0, 5, x=1),
...                      label(0, 10, x=2),
...                      label(5, 10, x=3),
...                      label(7, 10, x=4),
...                      label(5, 15, x=5),
...                      label(10, 15, x=6)])
>>> index.overlapping(6, 10)
label_index([GenericLabel(0, 10, x=2), GenericLabel(5, 10, x=3), GenericLabel(5, 15, x=5),
             GenericLabel(7, 10, x=4)], distinct=False)
before(x)[source]

A label index containing all labels that are before a label’s location in text or an index in text.

Parameters:

x (Union[Label', Location', float]) – A label or location whose start_index will be used, or a float index in text.

Returns:

A view of this label index.

Return type:

LabelIndex

Examples

>>> from mtap import label_index, label
>>> index = label_index([label(0, 5, x=1),
...                      label(0, 10, x=2),
...                      label(5, 10, x=3),
...                      label(7, 10, x=4),
...                      label(5, 15, x=5),
...                      label(10, 15, x=6)])
>>> index.before(6)
label_index([GenericLabel(0, 5, x=1)], distinct=False)
after(x)[source]

A label index containing all labels that are after a label’s location in text or an index in text.

Parameters:

x (Union[Label, Location, float]) – A label or location whose end_index will be used, or a float index in text.

Returns:

A view of this label index.

Return type:

LabelIndex

Examples

>>> from mtap import label_index, label
>>> index = label_index([label(0, 5, x=1),
...                      label(0, 10, x=2),
...                      label(5, 10, x=3),
...                      label(7, 10, x=4),
...                      label(5, 15, x=5),
...                      label(10, 15, x=6)])
>>> index.after(6)
label_index([GenericLabel(7, 10, x=4), GenericLabel(10, 15, x=6)], distinct=False)
abstract ascending()[source]

This label index sorted according to ascending start and end index.

Returns:

A view of this label index.

Return type:

LabelIndex

Examples

>>> from mtap import label_index, label
>>> index = label_index([label(0, 5, x=1),
...                      label(0, 10, x=2),
...                      label(5, 10, x=3),
...                      label(7, 10, x=4),
...                      label(5, 15, x=5),
...                      label(10, 15, x=6)])
>>> index == index.ascending()
True
abstract descending()[source]

This label index sorted according to descending start index and ascending end index.

Returns:

A view of this label index.

Return type:

LabelIndex

Examples

>>> from mtap import label_index, label
>>> index = label_index([label(0, 5, x=1),
...                      label(0, 10, x=2),
...                      label(5, 10, x=3),
...                      label(7, 10, x=4),
...                      label(5, 15, x=5),
...                      label(10, 15, x=6)])
>>> index.descending()
label_index([GenericLabel(10, 15, x=6), GenericLabel(7, 10, x=4), GenericLabel(5, 15, x=5),
             GenericLabel(5, 10, x=3), GenericLabel(0, 10, x=2), GenericLabel(0, 5, x=1)], distinct=False)

Custom Label Types

class mtap.data.ProtoLabelAdapter[source]

Responsible for marshalling and unmarshalling of label objects to and from proto messages.

abstract create_label(*args, **kwargs)[source]

Called by labelers to create labels.

Should include the positional arguments start_index and end_index, because those are required properties of labels.

Parameters:
  • args – Arbitrary args used to create the label.

  • kwargs – Arbitrary keyword args used to create the label.

Returns:

An object of the label type.

Return type:

Label

abstract create_index_from_response(response)[source]

Creates a LabelIndex from the response from an events service.

Parameters:

response (mtap.api.v1.events_pb2.GetLabelsResponse) – The response protobuf message from the events service.

Returns:

A label index containing all the labels from the events service.

Return type:

LabelIndex[L]

abstract create_index(labels)[source]

Creates a LabelIndex from an iterable of label objects.

Parameters:

labels (Iterable[L]) – Labels to put in index.

Returns:

A label index containing all of the labels in the list.

Return type:

LabelIndex[L]

abstract add_to_message(labels, request)[source]

Adds a list of labels to a request to the event service to add labels.

Parameters:
  • labels (Iterable[L]) – The list of labels that need to be sent to the server.

  • request (mtap.api.v1.events_pb2.AddLabelsRequest) – The request proto message to add the labels to.

abstract pack(index, *, include_label_text=False)[source]

Prepares to serialize a label index by transforming the label index into a representation that can be dumped to json, yml, pickle, etc.

Parameters:
  • index – The index itself.

  • include_label_text – Whether to include the label’s text in the serialized representation (for informative reasons).

Returns:

A dictionary representation of the label index.

abstract unpack(packed, label_index_name, *, document=None)[source]

Takes a packed, serializable object and turns it into a full label index.

Parameters:
  • packed (Any) – The packed representation.

  • label_index_name (str) – The index name of the label index.

  • document (Document) – The document this label index occurs on.

Returns:

The label index.

Return type:

LabelIndex[L]

store_references(labels)[source]

Take all the references for the labels and turn them into static references.

Parameters:

labels (Sequence[L]) – The labels to store the references on.

Creating Processors

class mtap.processing.Processor(*args, **kwargs)[source]

Mixin used by all processor abstract base classes that provides the ability to update serving status and use timers.

update_serving_status(status)[source]

Updates the serving status of the processor for health checking.

Parameters:

status (str) – One of “SERVING”, “NOT_SERVING”, “UNKNOWN”.

property custom_label_adapters

Used to provide non-standard proto label adapters for specific index names.

Returns:

static started_stopwatch(key)[source]

An object that can be used to time aspects of processing. The stopwatch will be started at creation.

Parameters:

key (str) – The key to store the time under.

Returns:

An object that is used to do the timing.

Return type:

Stopwatch

Examples

>>> # In a process method
>>> with self.started_stopwatch('key'):
>>>     # do work
>>>     ...
static unstarted_stopwatch(key)[source]

An object that can be used to time aspects of processing. The stopwatch will be stopped at creation.

Parameters:

key (str) – The key to store the time under.

Returns:

An object that is used to do the timing.

Return type:

Stopwatch

Examples

>>> # In a process method
>>> with self.unstarted_stopwatch('key') as stopwatch:
>>>     for _ in range(10):
>>>         # work you don't want timed
>>>         ...
>>>         stopwatch.start()
>>>         # work you do want timed
>>>         ...
>>>         stopwatch.stop()
class mtap.EventProcessor(*args, **kwargs)[source]

Abstract base class for an event processor.

Examples

>>> class ExampleProcessor(EventProcessor):
...     def process(self, event, params):
...          # do work on the event
...          ...
abstract process(event, params)[source]

Performs processing on an event, implemented by the subclass.

Parameters:
  • event (Event) – The event object to be processed.

  • params (Dict[str, Any]) – Processing parameters. A dictionary of strings mapped to json-serializable values.

Returns:

An arbitrary dictionary of strings mapped to json-serializable values which will be returned to the caller, even remotely.

Return type:

Optional[Dict[str, Any]]

default_adapters()[source]

Can be overridden to return a mapping from label index names to adapters that will then be used in any documents or events provided to this processor’s process methods.

Returns:

The mapping (dict-like) of label index names to ProtoLabelAdapters to be used for those indices.

Return type:

Mapping[str, ProtoLabelAdapter]

close()[source]

Can be overridden for cleaning up anything that needs to be cleaned up. Will be called by the framework after it’s done with the processor.

class mtap.DocumentProcessor(*args, **kwargs)[source]

Abstract base class for a document processor.

Examples

>>> class ExampleProcessor(mtap.DocumentProcessor):
...     def process(self, document, params):
...         # do processing on document
...         ...
>>> class ExampleProcessor(mtap.DocumentProcessor):
...     def process(self, document, params):
...          with self.started_stopwatch('key'):
...               # use stopwatch on something
...               ...
abstract process_document(document, params)[source]

Performs processing of a document on an event, implemented by the subclass.

Parameters:
  • document (Document) – The document object to be processed.

  • params (Dict[str, Any]) – Processing parameters. A dictionary of strings mapped to json-serializable values.

Returns:

An arbitrary dictionary of strings mapped to json-serializable values that will be returned to the caller of the processor.

Return type:

Dict[str, Any]

close()[source]

Can be overridden for cleaning up anything that needs to be cleaned up. Will be called by the framework after it’s done with the processor.

class mtap.processing.Stopwatch(context=None, key=None)[source]

A class for timing runtime of components and returning the total runtime with the processor’s results.

duration

The amount of time elapsed for this timer.

Type:

timedelta

Examples

>>> # in an EventProcessor or DocumentProcessor process method call
>>> with self.started_stopwatch('key'):
>>>     timed_routine()
>>> # in an EventProcessor or DocumentProcessor process method call
>>> with self.unstarted_stopwatch('key') as stopwatch:
>>>     for _ in range(10):
>>>         # work you don't want timed
>>>         ...
>>>         stopwatch.start()
>>>         # work you want timed
>>>         ...
>>>         stopwatch.stop()
start()[source]

Starts the timer.

stop()[source]

Stops / pauses the timer

Processor Description Decorators

mtap.processor(name, *, human_name=None, description=None, parameters=None, inputs=None, outputs=None, **additional_metadata)[source]

Decorator which attaches a service name and metadata to a processor. Which then can be used for runtime reflection of how the processor works.

Parameters:

name (str) –

Identifying service name both for launching via command line and for service registration.

Should be a mix of alphanumeric characters and dashes so that it plays nice with the DNS name requirements of service discovery tools like Consul. Can be overridden at runtime via the identifier option on processor_parser().

Keyword Arguments:
  • human_name (Optional[str]) – An option human name for the processor.

  • description (Optional[str]) – A short description of the processor and what it does.

  • parameters (Optional[List[ParameterDescription]]) – The processor’s parameters.

  • inputs (Optional[List[str]]) –

    String identifiers for the output from a processor that this processor uses as an input.

    Takes the format “[processor-name]/[output]”. Examples would be “tagger:pos_tags” or “sentence-detector:sentences”.

  • outputs (Optional[List[LabelDescription]]) – The label indices this processor outputs.

  • **additional_metadata (Any) – Any other data that should be added to the processor’s metadata, should be serializable to yaml and json.

Returns:

A decorator to be applied to instances of EventProcessor or DocumentProcessor. This decorator attaches the metadata so it can be reflected at runtime.

Examples

>>> from mtap.processing import EventProcessor
>>> @processor('example-text-converter')
>>> class TextConverter(EventProcessor):
>>>     ...

or

>>> from mtap.processing import DocumentProcessor
>>> @processor('example-sentence-detector')
>>> class SentenceDetector(DocumentProcessor):
>>>     ...

From our own example processor:

>>> from mtap.processing import DocumentProcessor
>>> @processor('mtap-example-processor-python',
>>>            human_name="Python Example Processor",
>>>            description="counts the number of times the letters a and b occur in a document",
>>>            parameters=[
>>>                parameter('do_work', required=True, data_type='bool',
>>>                          description="Whether the processor should do anything.")
>>>            ],
>>>            outputs=[
>>>                labels('mtap.examples.letter_counts',
>>>                            properties=[label_property('letter', data_type='str'),
>>>                                        label_property('count', data_type='int')])
>>>            ])
>>> class ExampleProcessor(DocumentProcessor):
>>>     ...
mtap.processing.descriptions.parameter(name, description, data_type, required)[source]
mtap.processing.descriptions.labels(name, *, reference=None, optional=False, name_from_parameter=None, description=None, properties=None)[source]

A description for a label type.

Parameters:

name (str) – The label index name.

Keyword Arguments:
  • reference (Optional[str]) – If this is an output of another processor, that processor’s name followed by a slash and the default output name of the index go here. Example: “sentence-detector/sentences”.

  • optional (bool) – Whether this label index is an optional input or output.

  • name_from_parameter (Optional[str]) – If the label index gets its name from a processor parameter, the name of the parameter.

  • description (Optional[str]) – A short description of the label index.

  • properties (Optional[List[label_property]]) – The properties of the labels in the label index.

mtap.processing.descriptions.name

The label index name.

Type:

str

mtap.processing.descriptions.reference

If this is an output of another processor, that processor’s name followed by a slash and the default output name of the index go here. Example: “sentence-detector/sentences”.

Type:

Optional[str]

mtap.processing.descriptions.optional

Whether this label index is an optional input or output.

Type:

bool

mtap.processing.descriptions.name_from_parameter

If the label index gets its name from a processor parameter, the name of the parameter.

Type:

Optional[str]

mtap.processing.descriptions.description

A short description of the label index.

Type:

Optional[str]

mtap.processing.descriptions.properties

The properties of the labels in the label index.

Type:

Optional[List[label_property]]

mtap.processing.descriptions.label_property(name, *, nullable=False, description=None, data_type=None)[source]

Creates a description for a property on a label.

Parameters:

name (str) – The property’s name.

Keyword Arguments:
  • description (Optional[str]) – A short description of the property.

  • data_type (Optional[str]) – The data type of the property: str, float, or bool; List[T] or Mapping[T1, T2] of those.

  • nullable (bool) – Whether the property can have a valid value of null.

Running Services

mtap.processor_parser()[source]

An ArgumentParser that can be used to parse the settings for run_processor().

Returns:

A parser containing server settings.

Return type:

ArgumentParser

Examples

Using this as a parent parser:

>>> parser = ArgumentParser(parents=[processor_parser()])
>>> parser.add_argument('--my-arg-1')
>>> parser.add_argument('--my-arg-2')
>>> args = parser.parse_args()
>>> processor = MyProcessor(args.my_arg_1, args.my_arg_2)
>>> run_processor(processor, args)
mtap.run_processor(proc, *, mp=False, options=None, args=None, mp_context=None)[source]

Runs the processor as a GRPC service, blocking until an interrupt signal is received.

Parameters:
  • proc (EventProcessor) – The processor to host.

  • mp (bool) – If true, will create instances of proc on multiple forked processes to process events. This is useful if the processor is computationally intensive and would run into Python GIL issues on a single process.

  • options (Optional[Namespace]) – The parsed arguments from the parser returned by processor_parser().

  • args (Optional[Sequence[str]]) – Arguments to parse server settings from if namespace was not supplied.

  • mp_context – A multiprocessing context that gets passed to the process pool executor in the case of mp = True.

Examples

Will automatically parse arguments:

>>> run_processor(MyProcessor())

Manual arguments:

>>> run_processor(MyProcessor(), args=['-p', '8080'])
class mtap.EventsServer(host, *, port=0, register=False, workers=10, sid=None, write_address=False, config=None)[source]

Server which hosts events.

Parameters:
  • host (str) – The address / hostname / IP to host the server on.

  • port (int) – The port to host the server on.

  • register (bool) – Whether to register the service with service discovery.

  • workers (int) – The number of workers that should handle requests.

  • write_address (bool) – Whether to write the events service address to a file.

  • config (mtap.Config) – An optional mtap config.

property port

The port the processor service is bound to.

Type:

int

start()[source]

Starts the service.

stop(*, grace=None)[source]

De-registers (if registered with service discovery) the service and immediately stops accepting requests, completely stopping the service after a specified grace time.

During the grace period the server will continue to process existing requests, but it will not accept any new requests. This function is idempotent, multiple calls will shutdown the server after the soonest grace to expire, calling the shutdown event for all calls to this function.

Keyword Arguments:

grace (Optional[float]) – The grace period in seconds that the server should continue processing requests before shutdown.

Returns:

A shutdown event for the server.

Return type:

threading.Event

class mtap.ProcessorServer(runner, host, port=0, *, sid=None, register=False, workers=None, write_address=False, config=None)[source]

Host a MTAP processor as a service.

Parameters:
  • proc (EventProcessor) – The event processor to host.

  • host (str) – The address / hostname / IP to host the server on.

  • port (int) – The port to host the server on, or 0 to use a random port.

Keyword Arguments:
  • register (Optional[bool]) – Whether to register the processor with service discovery.

  • events_address (Optional[str]) – The address of the events server, or omitted / None if the events service should be discovered.

  • processor_name (Optional[str]) – The identifier to register the processor under, if omitted the processor name will be used.

  • workers (Optional[int]) – The number of workers that should handle requests. Defaults to 10.

  • params (Optional[Mapping[str, Any]) – A set of default parameters that will be passed to the processor every time it runs.

  • grpc_enable_http_proxy (bool) – Enables or disables the grpc channel to the event service using http_proxy or https_proxy environment variables.

property port

Port the hosted server is bound to.

Type:

int

start()[source]

Starts the service.

stop(*, grace=None)[source]

De-registers (if registered with service discovery) the service and immediately stops accepting requests, completely stopping the service after a specified grace time.

During the grace period the server will continue to process existing requests, but it will not accept any new requests. This function is idempotent, multiple calls will shutdown the server after the soonest grace to expire, calling the shutdown event for all calls to this function.

Keyword Arguments:

grace (Optional[float]) – The grace period that the server should continue processing requests for shutdown.

Returns:

A shutdown event for the server.

Return type:

threading.Event

Running a pipeline

class mtap.Pipeline(*components, name=None, events_address=None, events_client=None, mp_config=None)[source]

An object which can be used to build and run a pipeline of remote and local processors.

Pipelines are a MutableSequence containing one or more ComponentDescriptor, a pipeline can be modified after creation using this functionality.

Parameters:

*components (ComponentDescriptor) – A list of component descriptors created using RemoteProcessor or LocalProcessor.

Keyword Arguments:
  • name (Optional[str]) – An optional name for the pipeline, defaults to ‘pipeline’.

  • config (Optional[Config]) – An optional config override.

name

The pipeline’s name.

Type:

str

Examples

Remote pipeline with name discovery:

>>> with mtap.Pipeline(
>>>         RemoteProcessor('processor-1-id'),
>>>         RemoteProcessor('processor-2-id'),
>>>         RemoteProcessor('processor-3-id')
>>>     ) as pipeline:
>>>     client = pipeline.events_client
>>>     for txt in txts:
>>>         with Event(client=client) as event:
>>>             document = event.add_document('plaintext', txt)
>>>             results = pipeline.run(document)

Remote pipeline using addresses:

>>> with mtap.Pipeline(
>>>         RemoteProcessor('processor-1-name', address='localhost:50052'),
>>>         RemoteProcessor('processor-2-id', address='localhost:50053'),
>>>         RemoteProcessor('processor-3-id', address='localhost:50054'),
>>>         events_address='localhost:50051'
>>>     ) as pipeline:
>>>     client = pipeline.events_client
>>>     for txt in txts:
>>>         with Event(client=client) as event:
>>>             document = event.add_document('plaintext', txt)
>>>             results = pipeline.run(document)

Modifying pipeline

>>> pipeline = Pipeline(RemoteProcessor('foo', address='localhost:50000'),
>>>                     RemoteProcessor('bar', address='localhost:50000'))
>>> pipeline
Pipeline(RemoteProcessor(processor_name='foo', address='localhost:50000', component_id=None, params=None),
         RemoteProcessor(processor_name='bar', address='localhost:50000', component_id=None, params=None))
>>> pipeline.append(RemoteProcessor('baz', address='localhost:50001'))
>>> pipeline
Pipeline(RemoteProcessor(processor_name='foo', address='localhost:50000', component_id=None, params=None),
         RemoteProcessor(processor_name='bar', address='localhost:50000', component_id=None, params=None),
         RemoteProcessor(processor_name='baz', address='localhost:50001', component_id=None, params=None))
>>> del pipeline[1]
>>> pipeline
Pipeline(RemoteProcessor(processor_name='foo', address='localhost:50000', component_id=None, params=None),
         RemoteProcessor(processor_name='baz', address='localhost:50001', component_id=None, params=None))
>>> pipeline[1] = RemoteProcessor(processor_name='bar', address='localhost:50003')
>>> pipeline
Pipeline(RemoteProcessor(processor_name='foo', address='localhost:50000', component_id=None, params=None),
         RemoteProcessor(processor_name='bar', address='localhost:50003', component_id=None, params=None))
>>> pipeline += list(pipeline)  # Putting in a new list to prevent an infinite recursion
>>> pipeline
Pipeline(RemoteProcessor(processor_name='foo', address='localhost:50000', component_id=None, params=None),
         RemoteProcessor(processor_name='bar', address='localhost:50003', component_id=None, params=None),
         RemoteProcessor(processor_name='foo', address='localhost:50000', component_id=None, params=None),
         RemoteProcessor(processor_name='bar', address='localhost:50003', component_id=None, params=None))
static from_yaml_file(conf_path)[source]

Creates a pipeline from a yaml pipeline configuration file.

Parameters:

conf_path (str or pathlib.Path) – The path to the configuration file.

Returns:

Pipeline object from the configuration.

static load_configuration(conf)[source]

Creates a pipeline from a pipeline configuration dictionary.

Parameters:

conf (Dict) – The pipeline configuration dictionary.

Returns:

Pipeline created from the configuration.

run_multithread(source, *, params=None, show_progress=None, total=None, close_events=None, max_failures=None, workers=None, read_ahead=None, mp_context=None, log_level=None)[source]

Runs this pipeline on a source which provides multiple documents / events.

Concurrency is per-event, with each event being provided a thread which runs it through the pipeline.

Parameters:
  • source (Union[Iterable[Union[Event, Document]], ProcessingSource]) – A generator of events or documents to process. This should be an Iterable of either Event or Document objects or a ProcessingSource.

  • params (Optional[dict[str, Any]]) – Json object containing params specific to processing this event, the existing params dictionary defined in add_processor() will be updated with the contents of this dict.

  • show_progress (Optional[bool]) – Whether to print a progress bar using tqdm.

  • total (Optional[int]) – An optional argument indicating the total number of events / documents that will be provided by the iterable, for the progress bar.

  • close_events (Optional[bool]) – Whether the pipeline should close events after they have been fully processed through all components.

  • max_failures (Optional[int]) – The number of acceptable failures. Once this amount is exceeded processing will halt. Note that because of the nature of conccurrency processing may continue for a short amount of time before termination.

  • workers (Optional[int]) – The number of threads to process documents on.

  • read_ahead (Optional[int]) – The number of source documents to read ahead into memory before processing.

  • mp_context (multiprocessing context, optional) – An optional override for the multiprocessing context.

  • (~typing.Optional[~typing.Union[str (log_level) – The log_level for running the pipeline.

  • int]] – The log_level for running the pipeline.

Examples

>>> docs = list(pathlib.Path('abc/').glob('*.txt'))
>>> def document_source():
>>>     for path in docs:
>>>         with path.open('r') as f:
>>>             txt = f.read()
>>>         with Event(event_id=path.name, client=pipeline.events_client) as event:
>>>             doc = event.create_document('plaintext', txt)
>>>             yield doc
>>>
>>> pipeline.run_multithread(document_source(), total=len(docs))
run(target, *, params=None)[source]

Processes the event/document using all the processors in the pipeline.

Parameters:
  • target (Union[Event, Document]) – Either an event or a document to process.

  • params (dict[str, Any]) – Json object containing params specific to processing this event, the existing params dictionary defined in add_processor() will be updated with the contents of this dict.

Returns:

The results of all the processors in the pipeline.

Return type:

list[ProcessingResult]

Examples

>>> e = mtap.Event()
>>> document = mtap.Document('plaintext', text="...", event=e)
>>> with Pipeline(...) as pipeline:
>>>     pipeline.run(document)
>>>     # is equivalent to pipeline.run(document.event, params={'document_name': document.document_name})

The ‘document_name’ param is used to indicate to DocumentProcessor which document on the event to process.

pipeline_timer_stats()[source]

The aggregated statistics for the global runtime of the pipeline.

Returns:

The timing stats for the global runtime of the pipeline.

Return type:

AggregateTimingInfo

close()[source]

Closes any open connections to remote processors.

as_processor()[source]

Returns the pipeline as a processor.

Returns:

An event processor that can be added to other pipelines or hosted.

Return type:

EventProcessor

print_times()[source]

Prints all of the times collected during this pipeline using print().

class mtap.processing.ComponentDescriptor[source]

A configuration which describes either a local or remote pipeline component and what the pipeline needs to do to call the component.

class mtap.RemoteProcessor(processor_name, *, address=None, component_id=None, params=None, enable_proxy=False)[source]

A configuration for a remote processor that the pipeline will connect to in order to perform processing.

Parameters:
  • processor_name (str) – The identifier used for health checking and discovery.

  • address (Optional[str]) – Optionally an address to use, will use service discovery configuration to locate processors if this is None / omitted.

  • component_id (Optional[str]) – How the processor’s results will be identified locally. Will be modified to be unique if it is not unique relative to other component in a pipeline.

  • params (Optional[dict, Any]) – An optional parameter dictionary that will be passed to the processor as parameters with every event or document processed. Values should be json-serializable.

processor_name

The identifier used for health checking and discovery.

Type:

str

address

Optionally an address to use, will use service discovery configuration to locate processors if this is None / omitted.

Type:

Optional[str]

component_id

How the processor’s results will be identified locally. Will be modified to be unique if it is not unique relative to other component in a pipeline.

Type:

Optional[str]

params

An optional parameter dictionary that will be passed to the processor as parameters with every event or document processed. Values should be json-serializable.

Type:

Optional[dict, Any]

class mtap.LocalProcessor(proc, *, component_id=None, params=None)[source]

A configuration of a locally-invoked processor.

Parameters:
  • proc (EventProcessor) – The processor instance to run with the pipeline.

  • component_id (str) – How the processor’s results will be identified locally. Will be modified to be unique if it is not unique relative to other component in a pipeline.

  • params (Optional[dict, Any]) – An optional parameter dictionary that will be passed to the processor as parameters with every event or document processed. Values should be json-serializable.

proc

The processor instance to run with the pipeline.

Type:

EventProcessor

component_id

How the processor’s results will be identified locally. Will be modified to be unique if it is not unique relative to other component in a pipeline.

Type:

Optional[str]

params

An optional parameter dictionary that will be passed to the processor as parameters with every event or document processed. Values should be json-serializable.

Type:

Optional[dict, Any]

class mtap.processing.ProcessingSource[source]

Provides events or documents for the multi-threaded pipeline runner. Also has functionality for receiving results.

property total

The total number of documents this source will provide.

Returns:

str or None count of the total events or None if not known.

abstract provide(consume)[source]

The method which provides documents for the multi-threaded runner. This method provides documents or events to the pipeline.

Parameters:

consume (Callable[Union[Document, Event]]) – The consumer method to pass documents or events to process.

Examples

Example implementation for processing text documents in a directory.

>>> ...
>>> def provide(self, consume: Callable[[Union[Document, Event]], None]):
>>>     for file in Path(".").glob("*.txt"):
>>>         with file.open('r') as fio:
>>>             txt = fio.read()
>>>         event = Event()
>>>         doc = event.create_document('plaintext', txt)
>>>         consume(doc)
receive_result(result, event)[source]

Optional method: Asynchronous callback which returns the results of processing. This method is called on a processing worker thread. Default behavior is to do nothing.

Parameters:
  • result (PipelineResult) – The result of processing using the pipeline.

  • event (Event) – The event processed.

receive_failure(exc)[source]

Optional method: Asynchronous callback which receives exceptions for any failed documents.

Parameters:

exc (ProcessingError) – The processing exception.

Returns:

Whether the error should be suppressed and not count against maximum failures.

Return type:

bool

close()[source]

Optional method: called to clean up after processing is complete.

Returns:

class mtap.processing.PipelineResult(component_results, elapsed_time)[source]

The result of processing an event or document in a pipeline.

Parameters:
  • component_results (List[ProcessingResult]) – The processing results for each individual component

  • elapsed_time (timedelta) – The elapsed time for the entire pipeline.

component_results

The processing results for each individual component

Type:

List[ProcessingResult]

elapsed_time

The elapsed time for the entire pipeline.

Type:

timedelta

component_result(identifier)[source]

Returns the component result for a specific identifier.

Parameters:

identifier – The processor’s identifier in the pipeline.

Returns:

The result for the specified processor.

Return type:

ProcessingResult

class mtap.processing.ProcessingResult(identifier, result_dict, timing_info, created_indices)[source]

The result of processing one document or event.

identifier

The id of the processor with respect to the pipeline.

Type:

str

result_dict

The json object returned by the processor as its results.

Type:

Dict

timing_info

A dictionary of the times taken processing this document.

Type:

Dict

created_indices

Any indices that have been added to documents by this processor.

Type:

Dict[str, List[str]]

class mtap.processing.TimerStats(mean, std, min, max, sum)[source]

Statistics about a specific keyed measured duration recorded by a Stopwatch.

mean

The sample mean of all measured durations.

Type:

timedelta

std

The sample standard deviation of all measured durations.

Type:

timedelta

min

The minimum of all measured durations.

Type:

timedelta

max

The maximum of all measured durations.

Type:

timedelta

sum

The sum of all measured durations.

Type:

timedelta

class mtap.processing.AggregateTimingInfo(identifier, timing_info)[source]

Collection of all the timing info for a specific processor.

identifier

The ID of the processor with respect to the pipeline.

Type:

str

timing_info

A map from all the timer keys for the processor to the aggregated duration statistics.

Type:

dict[str, TimerStats]

print_times()[source]

Prints the aggregate timing info for all processing components using print.

static csv_header()[source]

Returns the header for CSV formatted timing data.

Returns:

str

timing_csv()[source]

Returns the timing data formatted as a string, generating each

Returns:

Generator[str]

Configuration

class mtap.Config(*args)[source]

The MTAP configuration dictionary.

By default configuration is loaded from one of a number of locations in the following priority:

  • A file at the path of the ‘–config’ parameter passed into main methods.

  • A file at the path of the ‘MTAP_CONFIG’ environment variable

  • $PWD/mtapConfig.yml

  • $HOME/.mtap/mtapConfig.yml’

  • /etc/mtap/mtapConfig.yml

MTAP components will use a global shared configuration object, by entering the context of a config object using “with”, all of the MTAP functions called on that thread will make use of that config object.

Examples

>>> with mtap.Config() as config:
>>>     config['key'] = 'value'
>>>     # other MTAP methods in this
>>>     # block will use the updated config object.
update_from_yaml(path)[source]

Updates the configuration by loading and collapsing all of the structures in a yaml file.

Parameters:

path – The path to the yaml file to load.