The patterns for implementing and running text analysis in the MTAP Framework.


A processor is a component which performs some kind of task on the data in an Event or a Document. MTAP provides functionality for deploying and running these processors.

In MTAP there are two types of processors:

  • EventProcessor
  • DocumentProcessor


Event processors perform processing on Event objects, potentially using multiple Document objects on that Event. Examples would be translating one language Document into another, cross-analyzing or creating references between more than one document, or summarizing a document.


from mtap import processor, EventProcessor

class ExampleProcessor(EventProcessor):
  def process(self, event, params):
    doc1 = event.documents['english']
    doc2 = event.create_document('spanish', translate(doc1.text))


import edu.umn.nlpie.mtap.common.*;
import edu.umn.nlpie.mtap.model.*;
import edu.umn.nlpie.mtap.processing.*;

public class ExampleProcessor extends EventProcessor {
  public void process(Event event,
                      JsonObject params,
                      JsonObjectBuilder result) {
    Document doc1 = event.getDocuments().get("english");
    Document doc2 = event.addDocument("spanish", translate(doc1.getText()));


Document processors perform processing on a specific Document object on an Event. Examples would be most standard NLP tasks like sentence segmentation, part of speech tagging, parsing, normalization, negation, or entity detection.

In this example, assuming that the “sentences” label index has been created by another processor, we can then access the sentences even running this processor on a different machine:


from mtap import DocumentProcessor, processor

class ExampleProcessor(DocumentProcessor):
  def process_document(self, document, params):
    for sentence in document.labels['sentences']:
      # do processing on sentence


import edu.umn.nlpie.mtap.common.*;
import edu.umn.nlpie.mtap.model.*;
import edu.umn.nlpie.mtap.processing.*;

public class ExampleProcessor extends EventProcessor {
  public void process(Document document,
                      JsonObject params,
                      JsonObjectBuilder result) {
    for (GenericLabel label : document.getLabelIndex("sentences")) {


Hosting Processors

In order to facilitate interoperability between components of different languages, as well as scalability and flexibility of the how components are deployed, MTAP uses a distributed system, microservice-inspired approach for processing.

Each component is wrapped by the MTAP framework and hosted using a standardized gRPC service definition.


from mtap import run_processor

processor = ExampleProcessor()


import edu.umn.nlpie.mtap.processing.ProcessorServerBuilder;
import edu.umn.nlpie.mtap.processing.ProcessorServerOptions;
import edu.umn.nlpie.mtap.common.Server;

import org.kohsuke.args4j.*;



class ExampleProcessor ... {
  public static void main(String[] args) {
    ProcessorServer.Builder builder = new ProcessorServer.Builder();
    CmdLineParser parser = new CmdLineParser(builder);
    try {
      Server server = ExampleProcessor());
    } catch (IOException e) {
      System.err.println("Failed to start server: " + e.getMessage());
    } catch (InterruptedException e) {
      System.err.println("Server interrupted.");
    } catch (CmdLineException e) {
      ProcessorServerOptions.printHelp(parser, HelloWorldExample.class, e, null);

In both of these languages a standardized argument parser is provided which allows customization of how the processors are hosted, for example the hostname and port of the processor service.


A pipeline is a class for calling one or more processors in succession. The pipeline handles all of the communication with the pipeline components.


pipeline = Pipeline(
    RemoteProcessor('processor1', address='localhost:10001'),
    RemoteProcessor('processor2', address='localhost:10002')
with events_client(address='localhost:10000') as client:
  with Event(event_id='1', client=client) as event:
    document = Document(document_name='plaintext', text=document_text)

The pipeline also has a dictionary-based specification, so a pipeline can be created by loading a serialized file. This allows sharing and customization of pipelines via configuration.

name: example-pipeline
    - name: processor1
      address: 'localhost:10001'
    - name: processor2
      address: 'localhost:10002'
pipeline = Pipeline.from_yaml("/path/to")
with events_client(address='localhost:10000') as client:
  with Event(event_id='1', client=client) as event:
    document = Document(document_name='plaintext', text=document_text)