About
This guide will teach you how to process documents using BioMedICUS from a database or some other custom source. In this example we will be using a sqlite database of documents.
Pre-Requisites
Before starting this tutorial, install BioMedICUS using these instructions.
This tutorial assumes we are in the active virtual environment from the installation guide.
First Steps
In this tutorial we will be running BioMedICUS from a Python script file. Start off by creating a file sql_pipeline.py
in your favorite text editor or IDE.
Instantiating the Default Pipeline
We will start by creating an instance of the default BioMedICUS pipeline. This is done by parsing the pipeline’s options from the command line, done here by using the parents=[default_pipeline.argument_parser()]
argument when creating a new parser. We will also add our own argument input_file
which will be the path to the input sqlite file.
import sqlite3
from argparse import ArgumentParser
from biomedicus_client.pipeline import default_pipeline
from mtap import Event
if __name__ == '__main__':
parser = ArgumentParser(add_help=True, parents=[default_pipeline.argument_parser()])
parser.add_argument('input_file')
args = parser.parse_args()
pipeline = default_pipeline.from_args(args)
with events_client(pipeline.events_address) as events:
pass
Creating a sqlite Document Source
Next, we will create a document source. Update the above code starting with with events_client(pipeline.events_address) as events:
to the following, replacing the pass
statement:
with events_client(pipeline.events_address) as events:
con = sqlite3.connect(args.input_file)
cur = con.cursor()
def source():
for name, text in cur.execute("SELECT NAME, TEXT FROM DOCUMENTS"):
with Event(event_id=name, client=events) as e:
doc = e.create_document('plaintext', text)
yield doc
This is a Python generator function which will read a document from the database, create an MTAP Document
object, and then yield that object. This function will be used by the pipeline to provide all the documents it needs to process.
We’ve pre-populated the SQL SELECT
statement with the NAME
and TEXT
fields and the DOCUMENTS
table. If you have a database in mind you can substitute your own fields and table or even parameterize them using the ArgumentParser
.
Passing the Document Source to Pipeline
Finally, we will pass the source to the run_multithread
method which will run the pipeline on the documents. First, though, we use a SELECT
statement and COUNT
to count the number of documents, and will pass that to the method, using the total argument. This is completely optional and only serves to enable a nice progress bar. After we have finished processing the documents, we use print_times
to print statistics about the different processors run times, and will close our sqlite connection using con.close()
.
with events_client(pipeline.events_address) as events:
...
def source():
...
count, = next(cur.execute("SELECT COUNT(*) FROM DOCUMENTS"))
pipeline.run_multithread(source(), total=count)
pipeline.print_times()
con.close()
We use the Pipeline’s run_multithread
method here. You can learn more about this method in the MTAP documentation.
We’re done with this file now. Save the changes to the file.
Final Script
The script in its final state is shown below:
from argparse import ArgumentParser
import sqlite3
from mtap import Event, events_client
from biomedicus_client import default_pipeline
if __name__ == '__main__':
parser = ArgumentParser(add_help=True, parents=[default_pipeline.argument_parser()])
parser.add_argument('input_file')
args = parser.parse_args()
pipeline = default_pipeline.from_args(args)
with events_client(pipeline.events_address) as events:
con = sqlite3.connect(args.input_file)
cur = con.cursor()
def source():
for name, text in cur.execute("SELECT NAME, TEXT FROM DOCUMENTS"):
with Event(event_id=name, client=events) as e:
doc = e.create_document('plaintext', text)
yield doc
count, = next(cur.execute("SELECT COUNT(*) FROM DOCUMENTS"))
times = pipeline.run_multithread(source(), total=count)
times.print()
con.close()
Running the Pipeline
Before we can run the pipeline script we just created we need to first deploy the BioMedICUS processors. In one console window in a BioMedICUS virtual environment run the following:
b9 deploy
Once the Done deploying all servers
line shows up, open another console window and enter the BioMedICUS virtual environment. We will need a sqlite database containing some notes, I’ve made one available here. This file contains 15 de-identified free notes from the MTSamples Website. Download the database file and place in the working directory with your script. To process these 15 notes using the script we just created, run the following:
python sql_pipeline.py example.db3 --include-label-text
This will create a folder named output
in the directory and place 15 json files containing the serialized results of processing. To view the serialized output you can use the following:
python -m json.tool output/97_98.json
Next Steps
Note that this method will work for more than just sqlite3 databases. Anything that can be placed in a for loop that iterates over text samples can be used instead of the sqlite cursor in this guide. For example, it could be splitting a single file into multiple documents, or using a different type of database, or even using a queue of documents provided by some other source such as a service endpoint.
This method for creating pipelines using can also work in conjunction with RTF Processing and using your own custom pipeline components.
Appendix A
Following are some alternative versions of the sql pipeline:
Default Pipeline + RTF
from argparse import ArgumentParser
import sqlite3
from mtap import Event, events_client
from biomedicus_client import default_pipeline
if __name__ == '__main__':
parser = ArgumentParser(add_help=True, parents=[default_pipeline.argument_parser()])
parser.add_argument('input_file')
args = parser.parse_args()
args.rtf = True # Toggles --rtf flag always on.
# Can also skip parsing arguments and programmatically create the pipeline,
# see :func:`default_pipeline.create`.
pipeline = default_pipeline.from_args(args)
with events_client(pipeline.events_address) as events:
con = sqlite3.connect(args.input_file)
cur = con.cursor()
def source():
# Note I recommended that RTF documents be stored as BLOBs since most
# databases do not support storing text in the standard Windows-1252
# encoding of rtf documents. (RTF documents can actually use different
# encodings specified by a keyword like \ansicpg1252 at the beginning of
# the document, but this is uncommon).
# If you are storing RTF documents ensure that they are initially read from
# file using the correct encoding [i.e. open('file.rtf', 'r', encoding='cp1252')]
# before storing in the database, so that special characters are preserved.
for name, text in cur.execute("SELECT NAME, TEXT FROM DOCUMENTS"):
with Event(event_id=name, client=events) as e:
e.binaries['rtf'] = text
# or "e.binaries['rtf'] = text.encode('cp1252')" in TEXT column case
yield e
count, = next(cur.execute("SELECT COUNT(*) FROM DOCUMENTS"))
# Here we're adding the params since we're calling the pipeline with a source that
# provides Events rather than documents. This param will tell DocumentProcessors
# which document they need to process after the rtf converter creates that document.
times = pipeline.run_multithread(source(), params={'document_name': 'plaintext'}, total=count)
times.print()
con.close()
RTF-Only Pipeline
from argparse import ArgumentParser
import sqlite3
from mtap import Event, events_client
from biomedicus_client import rtf_to_text
if __name__ == '__main__':
parser = ArgumentParser(add_help=True, parents=[rtf_to_text.argument_parser()])
parser.add_argument('input_file')
args = parser.parse_args()
args.rtf = True # Toggles --rtf flag always on.
# Can also skip parsing arguments and programmatically create the pipeline,
# see :func:`rtf_to_text.create`.
pipeline = rtf_to_text.from_args(args)
with events_client(pipeline.events_address) as events:
con = sqlite3.connect(args.input_file)
cur = con.cursor()
def source():
# Note I recommended that RTF documents be stored as BLOBs since most
# databases do not support storing text in the standard Windows-1252
# encoding of rtf documents. (RTF documents can actually use different
# encodings specified by a keyword like \ansicpg1252 at the beginning of
# the document, but this is uncommon).
# If you are storing RTF documents ensure that they are initially read from
# file using the correct encoding [i.e. open('file.rtf', 'r', encoding='cp1252')]
# before storing in the database, so that special characters are preserved.
for name, text in cur.execute("SELECT NAME, TEXT FROM DOCUMENTS"):
with Event(event_id=name, client=events) as e:
e.binaries['rtf'] = text
# or "e.binaries['rtf'] = text.encode('cp1252')" in TEXT column case
yield e
count, = next(cur.execute("SELECT COUNT(*) FROM DOCUMENTS"))
# Here we're adding the params since we're calling the pipeline with a source that
# provides Events rather than documents. This param will tell DocumentProcessors
# which document they need to process after the rtf converter creates that document.
times = pipeline.run_multithread(source(), params={'document_name': 'plaintext'}, total=count)
times.print()
con.close()