Source code for mtap.data._events

#  Copyright 2020 Regents of the University of Minnesota.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
"""Events service client API and wrapper classes.

"""

import collections
import threading
import uuid
from typing import (
    Iterator,
    List,
    Dict,
    MutableMapping,
    Generic,
    TypeVar,
    NamedTuple,
    ContextManager,
    Iterable,
    Optional,
    Sequence,
    Union,
    Mapping,
    TYPE_CHECKING,
    Callable,
)

import grpc
from grpc_health.v1 import health_pb2_grpc, health_pb2

from mtap import _config, _discovery, constants
from mtap.api.v1 import events_pb2, events_pb2_grpc
from mtap.data import _base, _labels, _label_adapters

if TYPE_CHECKING:
    import mtap
    import mtap.data as data

L = TypeVar('L', bound='data.Label')


[docs]class Event: """An object for interacting with a specific event locally or on the events service. The Event object functions as a map from string document names to :obj:`Document` objects that can be used to access document data from the events server. Keyword Args: event_id (~typing.Optional[str]): A globally-unique identifier for the event, or omit / none for a random UUID. client (~typing.Optional[EventsClient]): A client for an events service to push any changes to the event to. only_create_new (bool): Fails if the event already exists on the events service. Examples: >>> with EventsClient() as c, Event(event_id='id', client=c) as event: >>> # use event >>> ... """ __slots__ = ('_event_id', '_client', '_lock', 'default_adapters', '_documents', '_metadata', '_binaries', '_event_service_instance_id') def __init__(self, *, event_id: Optional[str] = None, event_service_instance_id: Optional[str] = None, client: Optional['mtap.EventsClient'] = None, only_create_new: bool = False, default_adapters: Optional[Mapping[str, 'data.ProtoLabelAdapter']] = None): self._event_id = event_id or str(uuid.uuid4()) self._event_service_instance_id = event_service_instance_id self._lock = threading.RLock() self.default_adapters = default_adapters or {} self._client = client if self._client is not None: self._client = client.get_local_instance(self._event_service_instance_id) self._event_service_instance_id = self._client.instance_id self._client.open_event(self._event_id, only_create_new=only_create_new) @property def client(self) -> Optional['mtap.EventsClient']: return self._client @property def event_id(self) -> str: """str: The globally unique identifier for this event.""" return self._event_id @property def event_service_instance_id(self) -> str: """str: The unique instance identifier for this event's paired event service.""" return self._event_service_instance_id @property def documents(self) -> MutableMapping[str, 'mtap.Document']: """~typing.MutableMapping[str, Document]: A mutable mapping of strings to :obj:`Document` objects that can be used to query and add documents to the event.""" try: return self._documents except AttributeError: self._documents = _Documents(self, self.client) return self._documents @property def metadata(self) -> MutableMapping[str, str]: """~typing.MutableMapping[str, str]: A mutable mapping of strings to strings that can be used to query and add metadata to the event.""" try: return self._metadata except AttributeError: self._metadata = _Metadata(self, self.client) return self._metadata @property def binaries(self) -> MutableMapping[str, bytes]: """~typing.MutableMapping[str, str]: A mutable mapping of strings to bytes that can be used to query and add binary data to the event.""" try: return self._binaries except AttributeError: self._binaries = _Binaries(self, self.client) return self._binaries @property def created_indices(self) -> Dict[str, List[str]]: """~typing.Dict[str, ~typing.List[str]]: A mapping of document names to a list of the names of all the label indices that have been added to that document""" return {document_name: document.created_indices for document_name, document in self.documents.items()}
[docs] def close(self): """Closes this event. Lets the event service know that we are done with the event, allowing to clean up the event if no other clients have open leases to it.""" if self.client is not None: self.release_lease()
[docs] def create_document(self, document_name: str, text: str) -> 'mtap.Document': """Adds a document to the event keyed by `document_name` and containing the specified `text`. Args: document_name (str): The event-unique identifier for the document, example: 'plaintext'. text (str): The content of the document. This is a required field, document text is final and immutable, as changing the text would very likely invalidate any labels on the document. Returns: Document: The added document. Examples: >>> event = Event() >>> document = event.create_document('plaintext', text="The text of the document.") """ if not isinstance(text, str): raise ValueError('text is not string.') document = Document(document_name, text=text, event=self) self.documents[document_name] = document return document
[docs] def add_document(self, document: 'mtap.Document'): """Adds the document to this event, first uploading to events service if this event has a client connection to the events service. Args: document (~mtap.Document): The document to add to this event. Examples: >>> event = Event() >>> document = Document('plaintext', text="The text of the document.") >>> event.add_document(document) """ if document._event is not None: raise ValueError('The document already exists on an event') document._event = self document._client = self.client document._event_id = self.event_id self.documents[document.document_name] = document
def __enter__(self) -> 'Event': return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def add_created_indices(self, created_indices): for k, v in created_indices.items(): try: doc = self._documents[k] doc.add_created_indices(v) except KeyError: pass def lease(self): if self.client is not None: self.client.open_event(self.event_id, only_create_new=False) def release_lease(self): if self.client is not None: self.client.close_event(self.event_id)
class _WaitingIndex(NamedTuple('_WaitingIndex', [('index_name', str), ('labels', List['mtap.GenericLabel']), ('waiting_on', List[int])])): """A label index that is waiting on some labels to be staticized before uploading. """
[docs]class Document: """An object for interacting with text and labels stored on an :class:`Event`. Documents are keyed by their name, and pipelines can store different pieces of related text on a single processing event using multiple documents. An example would be storing the text of one language on one document, and a translation on another, or storing the rtf or html encoding on one document, and the parsed plaintext on another document. Both the document text and any added label indices are immutable. This is to enable parallelization and distribution of processing, and to prevent changes to the dependency graph of label indices and text, which can make debugging difficult. Args: document_name (str): The document name identifier. Keyword Args: text (~typing.Optional[str]): The document text, can be omitted if this is an existing document and text needs to be retrieved from the events service. event (~typing.Optional[Event]): The parent event of this document. If the event has a client, then that client will be used to share changes to this document with all other clients of the Events service. In that case, text should only be specified if it is the known existing text of the document. Examples: Local document: >>> document = Document('plaintext', text='Some document text.') Existing distributed object: >>> with EventsClient(address='localhost:8080') as client, \\ >>> Event(event_id='1', client=client) as event: >>> document = event.documents['plaintext'] >>> document.text 'Some document text fetched from the server.' New distributed object: >>> with EventsClient(address='localhost:8080') as client, \\ >>> Event(event_id='1', client=client) as event: >>> document = Document('plaintext', text='Some document text.') >>> event.add_document(document) or >>> with EventsClient(address='localhost:8080') as client, \\ >>> Event(event_id='1', client=client) as event: >>> document = event.create_document('plaintext', text='Some document text.') """ __slots__ = ('_document_name', '_event', '_client', '_event_id', '_text', '_labelers', '_created_indices', '_waiting_indices', 'default_adapters', '_labels') def __init__(self, document_name: str, *, text: Optional[str] = None, event: Optional[Event] = None, default_adapters: Optional[Mapping[str, 'data.ProtoLabelAdapter']] = None): if not isinstance(document_name, str): raise TypeError('Document name is not string.') self._document_name = document_name self._event = event self._client = None self._event_id = None if event is not None: self._client = event.client self._event_id = event.event_id self._text = text self._labelers = [] self._created_indices = [] self._waiting_indices = [] self.default_adapters = default_adapters or {} if self._client is None and text is None: raise ValueError('Document without text or an event with a client to fetch the text ' 'from.') @property def event(self) -> Event: """Event: The parent event of this document.""" return self._event @property def document_name(self) -> str: """str: The unique identifier for this document on the event.""" return self._document_name @property def text(self): """str: The document text.""" if self._text is None and self._client is not None: self._text = self._client.get_document_text(self._event_id, self._document_name) return self._text @property def created_indices(self) -> List[str]: """~typing.List[str]: A list of all of the label index names that have created on this document using a labeler either locally or by remote pipeline components invoked on this document.""" return list(self._created_indices) @property def labels(self) -> Mapping[str, 'data.LabelIndex']: try: return self._labels except AttributeError: self._labels = _LabelIndices(self) return self._labels
[docs] def get_label_index(self, label_index_name: str) -> 'data.LabelIndex[Union[mtap.GenericLabel, L]]': """Gets the document's label index with the specified key. Will fetch from the events service if it is not cached locally if the document has an event with a client. Uses the `label_adapter` argument to perform unmarshalling from the proto message if specified. Args: label_index_name (str): The name of the label index to get. Returns: LabelIndex: The requested label index. """ return self.labels[label_index_name]
[docs] def get_labeler(self, label_index_name: str, *, distinct: Optional[bool] = None): """Creates a function that can be used to add labels to a label index. Args: label_index_name (str): A document-unique identifier for the label index to be created. distinct (~typing.Optional[bool]): Optional, if using generic labels, whether to use distinct generic labels or non-distinct generic labels, will default to False. Returns: Labeler: A callable when used in conjunction with the 'with' keyword will automatically handle uploading any added labels to the server. Examples: >>> with document.get_labeler('sentences', distinct=True) as labeler: >>> labeler(0, 25, sentence_type='STANDARD') >>> sentence = labeler(26, 34) >>> sentence.sentence_type = 'FRAGMENT' """ if label_index_name in self._labelers: raise KeyError("Labeler already in use: " + label_index_name) label_adapter = self.get_default_adapter(label_index_name, distinct) labeler = Labeler(self._client, self, label_index_name, label_adapter) self._labelers.append(label_index_name) return labeler
[docs] def add_labels(self, label_index_name: str, labels: Sequence[Union['data.Label', L]], *, distinct: Optional[bool] = None, label_adapter: Optional['data.ProtoLabelAdapter'] = None): """Skips using a labeler and adds the sequence of labels as a new label index. Args: label_index_name (str): The name of the label index. labels (~typing.Sequence[Label]): The labels to add. distinct (~typing.Optional[bool]): Whether the index is distinct or non-distinct. label_adapter (mtap.label_adapters.ProtoLabelAdapter): A label adapter to use. Returns: LabelIndex: The new label index created from the labels. """ if label_index_name in self.labels: raise KeyError("Label index already exists with name: " + label_index_name) if label_adapter is None: if distinct is None: distinct = False label_adapter = self.get_default_adapter(label_index_name, distinct) labels, waiting_on = _labels._staticize(labels, self, label_index_name) if len(self._waiting_indices) > 0: label_ids = {id(label) for label in labels} self._check_waiting_indices(label_ids) if len(waiting_on) > 0: self._waiting_indices.append((label_index_name, labels, label_adapter, waiting_on)) else: self._finalize_labels(label_adapter, label_index_name, labels)
def _check_waiting_indices(self, updated_ids): waiting_indices = [] for label_index_name, labels, label_adapter, waiting_on in self._waiting_indices: still_waiting = waiting_on.difference(updated_ids) if len(still_waiting) == 0: self._finalize_labels(label_adapter, label_index_name, labels) else: waiting_indices.append((label_index_name, labels, label_adapter, still_waiting)) self._waiting_indices = waiting_indices def _finalize_labels(self, label_adapter, label_index_name, labels): label_adapter.store_references(labels) if self._client is not None: self._client.add_labels(event_id=self.event.event_id, document_name=self.document_name, index_name=label_index_name, labels=labels, adapter=label_adapter) self._created_indices.append(label_index_name) index = label_adapter.create_index(labels) self.labels.add_to_cache(label_index_name, index) return index def get_default_adapter(self, label_index_name: str, distinct: Optional[bool] = None) -> Optional['data.ProtoLabelAdapter']: try: return self.default_adapters[label_index_name] except KeyError: pass try: return self.event.default_adapters[label_index_name] except (AttributeError, KeyError): return _label_adapters.DISTINCT_GENERIC_ADAPTER if distinct \ else _label_adapters.GENERIC_ADAPTER def add_created_indices(self, created_indices: Iterable[str]): # Internal, used by the pipeline to add any indices created remotely to the # "created_indices" on a local document. return self._created_indices.extend(created_indices)
[docs]class Labeler(Generic[L], ContextManager['Labeler']): """Object provided by :func:`~'mtap.Document'.get_labeler` which is responsible for adding labels to a label index on a document. """ __slots__ = ('_client', '_document', '_label_index_name', '_label_adapter', 'is_done', '_current_labels', '_lock') def __init__(self, client: 'mtap.EventsClient', document: Document, label_index_name: str, label_adapter: 'data.ProtoLabelAdapter[L]'): self._client = client self._document = document self._label_index_name = label_index_name self._label_adapter = label_adapter self.is_done = False self._current_labels = [] self._lock = threading.Lock()
[docs] def __call__(self, *args, **kwargs) -> L: """Calls the constructor for the label type adding it to the list of labels to be uploaded. Args: args: Arguments passed to the label type's constructor. kwargs: Keyword arguments passed to the label type's constructor. Returns: Label: The object that was created by the label type's constructor. Examples: >>> labeler(0, 25, some_field='some_value', x=3) GenericLabel(start_index=0, end_index=25, some_field='some_value', x=3) """ label = self._label_adapter.create_label(*args, document=self._document, **kwargs) self._current_labels.append(label) return label
def __enter__(self) -> 'data.Labeler': return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is not None: return False self.done()
[docs] def done(self): """Finalizes the label index, uploads the added labels to the events service. Normally called automatically on exit from a context manager block, but can be manually invoked if the labeler is not used in a context manager block.""" with self._lock: if self.is_done: return self.is_done = True self._document.add_labels(self._label_index_name, self._current_labels, label_adapter=self._label_adapter)
def create_channel(address): config = _config.Config() options = config.get('grpc.events_options', {}) channel = grpc.insecure_channel(address, options=list(options.items())) health = health_pb2_grpc.HealthStub(channel) hcr = health.Check(health_pb2.HealthCheckRequest(service=constants.EVENTS_SERVICE_NAME), metadata=[('service-name', constants.EVENTS_SERVICE_NAME)]) if hcr.status != health_pb2.HealthCheckResponse.SERVING: raise ValueError('Failed to connect to events service. Status:') return channel class _ClientPool: __slots__ = ('_ptr', 'clients', 'instance_id_to_delegate') def __init__(self, channel_factory, addresses): self._ptr = 0 self.clients = [EventsClient(address=addresses, channel_factory=channel_factory, _pool=self, _channel=channel_factory(a)) for a in addresses] self.instance_id_to_delegate = {d.instance_id: d for d in self.clients} def get_instance(self, instance_id): if instance_id is not None: try: return self.instance_id_to_delegate[instance_id] except KeyError: raise KeyError(f"instance_id: {instance_id} not found. " f"Available instances: {self.instance_id_to_delegate.keys()}") i = self.clients[self._ptr] self._ptr = (self._ptr + 1) % len(self.clients) return i
[docs]class EventsClient: """A client object for interacting with the events service. Normally, users shouldn't have to use any of the methods on this object, as they are invoked by the globally distributed object classes of :obj:`Event`, :obj:`Document`, and :obj:`~data.Labeler`. Keyword Args: address (~typing.Optional[str]): The events service target e.g. 'localhost:9090' or omit/None to use service discovery. stub (~typing.Optional[~mtap.api.v1.events_pb2_grpc.EventsStub]): An existing events service client gRPC stub to use. Examples: >>> with EventsClient(address='localhost:50000') as client, \\ >>> Event(event_id='1', client=client) as event: >>> document = event.create_document(document_name='plaintext', >>> text='The quick brown fox jumps over the lazy dog.') """ __slots__ = ('addresses', 'channel_factory', '_pool', '_channel', 'stub', '_instance_id', '_is_open') def __init__(self, address: Optional[Union[str, Iterable[str]]], channel_factory: Callable[[str], grpc.Channel] = None, _pool: Optional[_ClientPool] = None, _channel: Optional[grpc.Channel] = None): if address is None or (isinstance(address, Iterable) and len(address) == 0): discovery = _discovery.Discovery(_config.Config()) address = discovery.discover_events_service('v1') if isinstance(address, str): self.addresses = [address] elif isinstance(address, Iterable): self.addresses = list(address) else: raise ValueError("Unrecognized address type: " + str(type(address))) self.channel_factory = channel_factory if channel_factory is None: self.channel_factory = create_channel self._pool = _pool if self._pool is None: self._pool = _ClientPool(self.channel_factory, self.addresses) self._channel = _channel if self._channel is not None: self.stub = events_pb2_grpc.EventsStub(self._channel) r = self.stub.GetEventsInstanceId(events_pb2.GetEventsInstanceIdRequest()) self._instance_id = r.instance_id self._is_open = True @property def instance_id(self) -> str: return self._instance_id def __enter__(self) -> 'EventsClient': return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def __reduce__(self): return _create_events_client, (self.addresses, self.channel_factory,) def get_local_instance(self, instance_id: Optional[str]) -> 'EventsClient': return self._pool.get_instance(instance_id) def ensure_open(self): if not self._is_open: raise ValueError("Client to events service is not open") def open_event(self, event_id: str, only_create_new: bool): request = events_pb2.OpenEventRequest(event_id=event_id, only_create_new=only_create_new) try: response = self.stub.OpenEvent(request) assert response is not None except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: raise ConnectionError("Failed to connect to events service on addresses: {}".format(self.addresses)) if e.code() == grpc.StatusCode.ALREADY_EXISTS: raise ValueError("Event already exists") from e raise e def close_event(self, event_id): request = events_pb2.CloseEventRequest(event_id=event_id) try: response = self.stub.CloseEvent(request) except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: raise ConnectionError("Failed to connect to events service on addresses: {}".format(self.addresses)) raise e return response is not None def get_all_metadata(self, event_id): request = events_pb2.GetAllMetadataRequest(event_id=event_id) try: response = self.stub.GetAllMetadata(request) except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: raise ConnectionError("Failed to connect to events service on addresses: {}".format(self.addresses)) raise e return response.metadata def add_metadata(self, event_id, key, value): request = events_pb2.AddMetadataRequest(event_id=event_id, key=key, value=value) try: response = self.stub.AddMetadata(request) except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: raise ConnectionError("Failed to connect to events service on addresses: {}".format(self.addresses)) raise e return response is not None def get_all_binary_data_names(self, event_id: str) -> List[str]: request = events_pb2.GetAllBinaryDataNamesRequest(event_id=event_id) try: response = self.stub.GetAllBinaryDataNames(request) except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: raise ConnectionError("Failed to connect to events service on addresses: {}".format(self.addresses)) raise e return list(response.binary_data_names) def add_binary_data(self, event_id: str, binary_data_name: str, binary_data: bytes): request = events_pb2.AddBinaryDataRequest(event_id=event_id, binary_data_name=binary_data_name, binary_data=binary_data) try: response = self.stub.AddBinaryData(request) except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: raise ConnectionError("Failed to connect to events service on addresses: {}".format(self.addresses)) raise e return response is not None def get_binary_data(self, event_id: str, binary_data_name: str) -> bytes: request = events_pb2.GetBinaryDataRequest(event_id=event_id, binary_data_name=binary_data_name) try: response = self.stub.GetBinaryData(request) except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: raise ConnectionError("Failed to connect to events service on addresses: {}".format(self.addresses)) raise e return response.binary_data def get_all_document_names(self, event_id): request = events_pb2.GetAllDocumentNamesRequest(event_id=event_id) try: response = self.stub.GetAllDocumentNames(request) except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: raise ConnectionError("Failed to connect to events service on addresses: {}".format(self.addresses)) raise e return list(response.document_names) def add_document(self, event_id, document_name, text): request = events_pb2.AddDocumentRequest(event_id=event_id, document_name=document_name, text=text) try: response = self.stub.AddDocument(request) except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: raise ConnectionError("Failed to connect to events service on addresses: {}".format(self.addresses)) raise e return response is not None def get_document_text(self, event_id, document_name): request = events_pb2.GetDocumentTextRequest(event_id=event_id, document_name=document_name) try: response = self.stub.GetDocumentText(request) except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: raise ConnectionError("Failed to connect to events service on addresses: {}".format(self.addresses)) raise e return response.text def get_label_index_info(self, event_id: str, document_name: str) -> List['data.LabelIndexInfo']: request = events_pb2.GetLabelIndicesInfoRequest(event_id=event_id, document_name=document_name) try: response = self.stub.GetLabelIndicesInfo(request) except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: raise ConnectionError("Failed to connect to events service on addresses: {}".format(self.addresses)) raise e result = [] for index in response.label_index_infos: if index.type == events_pb2.GetLabelIndicesInfoResponse.LabelIndexInfo.GENERIC: index_type = _base.LabelIndexType.GENERIC elif index.type == events_pb2.GetLabelIndicesInfoResponse.LabelIndexInfo.CUSTOM: index_type = _base.LabelIndexType.CUSTOM else: index_type = _base.LabelIndexType.UNKNOWN result.append(_base.LabelIndexInfo(index.index_name, index_type)) return result def add_labels(self, event_id, document_name, index_name, labels, adapter): request = events_pb2.AddLabelsRequest(event_id=event_id, document_name=document_name, index_name=index_name, no_key_validation=True) adapter.add_to_message(labels, request) try: response = self.stub.AddLabels(request) except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: raise ConnectionError("Failed to connect to events service on addresses: {}".format(self.addresses)) raise e return response is not None def get_labels(self, event_id, document_name, index_name, adapter): request = events_pb2.GetLabelsRequest(event_id=event_id, document_name=document_name, index_name=index_name) try: response = self.stub.GetLabels(request) except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: raise ConnectionError("Failed to connect to events service on addresses: {}".format(self.addresses)) raise e return adapter.create_index_from_response(response) def close(self): self._is_open = False try: self._channel.close() except AttributeError: pass
def _create_events_client(addresses, channel_factory): return EventsClient(address=addresses, channel_factory=channel_factory) class _Documents(MutableMapping[str, Document]): __slots__ = ('event', 'event_id', 'client', 'documents') def __init__(self, event: Event, client: Optional[EventsClient]): self.event = event self.event_id = event.event_id self.client = client self.documents = {} def __contains__(self, document_name: str) -> bool: if not isinstance(document_name, str): return False if document_name in self.documents: return True self._refresh_documents() return document_name in self.documents def __getitem__(self, document_name) -> 'Document': if not isinstance(document_name, str): raise KeyError try: return self.documents[document_name] except KeyError: pass self._refresh_documents() return self.documents[document_name] def __len__(self) -> int: self._refresh_documents() return len(self.documents) def __iter__(self) -> Iterator[str]: self._refresh_documents() return iter(self.documents) def __setitem__(self, k: str, v: Document) -> None: if self.client is not None: if not self.client.add_document(self.event_id, k, v.text): raise ValueError() v._event = self.event v._event_id = self.event_id v._client = self.client self.documents[k] = v def __delitem__(self, v: str) -> None: raise NotImplementedError() def _refresh_documents(self): if self.client is not None: document_names = self.client.get_all_document_names(self.event_id) for name in document_names: if name not in self.documents: document = Document(name, event=self.event) self.documents[name] = document class _Metadata(MutableMapping[str, str]): __slots__ = ('_client', '_event', '_event_id', '_metadata') def __init__(self, event: Event, client: Optional[EventsClient] = None): self._client = client self._event = event self._event_id = event.event_id self._metadata = {} def __contains__(self, key): if key in self._metadata: return True self._refresh_metadata() return key in self._metadata def __setitem__(self, key, value): if key in self: raise KeyError("Metadata already exists with key: " + key) self._metadata[key] = value if self._client is not None: if not self._client.add_metadata(self._event_id, key, value): raise ValueError() def __getitem__(self, key): try: return self._metadata[key] except KeyError: self._refresh_metadata() return self._metadata[key] def __delitem__(self, v) -> None: raise NotImplementedError def __iter__(self) -> Iterator[str]: self._refresh_metadata() return iter(self._metadata) def __len__(self) -> int: self._refresh_metadata() return len(self._metadata) def _refresh_metadata(self): if self._client is not None: response = self._client.get_all_metadata(self._event_id) self._metadata.update(response) class _Binaries(collections.abc.MutableMapping): __slots__ = ('_client', '_event', '_event_id', '_names', '_binaries') def __init__(self, event: Event, client: Optional[EventsClient] = None): self._client = client self._event = event self._event_id = event.event_id self._names = set() self._binaries = {} def __contains__(self, key): if key in self._names: return True self._refresh_binaries() return key in self._names def __setitem__(self, key, value): if key in self: raise KeyError("Binary already exists with name: " + key) self._names.add(key) self._binaries[key] = value if self._client is not None: if not self._client.add_binary_data(self._event_id, key, value): raise ValueError() def __getitem__(self, key): try: return self._binaries[key] except KeyError: pass if self._client is not None: b = self._client.get_binary_data(event_id=self._event_id, binary_data_name=key) self._names.add(b) self._binaries[key] = b return self._binaries[key] def __delitem__(self, v) -> None: raise NotImplementedError def __iter__(self) -> Iterator[str]: self._refresh_binaries() return iter(self._names) def __len__(self) -> int: self._refresh_binaries() return len(self._names) def _refresh_binaries(self): if self._client is not None: response = self._client.get_all_binary_data_names(self._event_id) self._names.update(response) class _LabelIndices(Mapping[str, 'data.LabelIndex']): __slots__ = ('_document', '_document_name', '_event_id', '_client', '_cache', '_names_cache') def __init__(self, document: Document): self._document = document self._document_name = document.document_name try: self._event_id = document.event.event_id self._client = document.event.client except AttributeError: self._event_id = None self._client = None self._cache = {} self._names_cache = set() def __getitem__(self, k: str) -> 'data.LabelIndex': try: return self._cache[k] except KeyError: pass if k not in self: raise KeyError if self._client is not None: label_adapter = self._document.get_default_adapter(k) if label_adapter is None: label_adapter = data.GENERIC_ADAPTER index = self._client.get_labels(self._event_id, self._document_name, k, adapter=label_adapter) for label in index: label.label_index_name = k label.document = self._document self._cache[k] = index self._names_cache.add(k) return index else: raise KeyError('Document does not have label index: ' + k) def __contains__(self, item): self._refresh_names() return item in self._names_cache def __len__(self) -> int: self._refresh_names() return len(self._names_cache) def __iter__(self) -> Iterator[str]: self._refresh_names() return iter(self._names_cache) def _refresh_names(self): if self._client is not None: infos = self._client.get_label_index_info(self._event_id, self._document_name) for info in infos: self._names_cache.add(info.index_name) def add_to_cache(self, label_index_name, index): self._cache[label_index_name] = index self._names_cache.add(label_index_name)