Source code for cnt.rulebase.rules.interval_based_operations.interval_based_replacer

"""
Replace the unicode codepoint specified by intervals with arbitary strings.
"""
from typing import Callable, Generator, List, Tuple, Dict, Type, cast, Optional

from cnt.rulebase import workflow
from cnt.rulebase.rules.interval_based_operations.basic_operation import (
        IntervalBasedOperationOutputGenerator,
        IntervalsCollectionBasedOperation,
)

IntervalsWithLabelType = Tuple[workflow.IntervalType, workflow.IntervalType, bool]
ReplacerSegmentType = Tuple[str, IntervalsWithLabelType]

ResultLazyType = Generator[ReplacerSegmentType, None, None]
ResultType = List[ReplacerSegmentType]

ReplacerFunctionType = Callable[[str], str]


[docs]class IntervalsCollectionBasedReplacerConfig(workflow.BasicConfig): def __init__(self, labeler2repl: Dict[Type[workflow.IntervalLabeler], ReplacerFunctionType]): self.labeler2repl = labeler2repl
[docs]class IntervalsCollectionBasedReplacerLabelProcessor(workflow.BasicLabelProcessor):
[docs] def result(self) -> Generator[Tuple[int, Optional[Type[workflow.IntervalLabeler]]], None, None]: while True: try: index, labels = next(self.index_labels_generator) except StopIteration: return labeler_cls = [lcls for lcls, marked in labels.items() if marked] if len(labeler_cls) > 1: raise RuntimeError('Labeler conflict!') aggregated_mark = cast(Type[workflow.IntervalLabeler], labeler_cls[0]) if labeler_cls else None yield index, aggregated_mark
#pylint: disable=W0223 class _IntervalsCollectionBasedReplacerOutputGenerator(IntervalBasedOperationOutputGenerator): def _result(self) -> ResultLazyType: """ ``self.config.replacer_function``(``Callable[[str], str]``) must exists. """ config = cast(IntervalsCollectionBasedReplacerConfig, self.config) diff_acc = 0 for interval, aggregated_mark in self.continuous_intervals(): start, end = interval processed_start = start + diff_acc processed_end = end + diff_acc segment = self.input_sequence[start:end] if aggregated_mark is not None: processed_segment = config.labeler2repl[cast(Type[workflow.IntervalLabeler], aggregated_mark)](segment) if not processed_segment: # segment is removed. processed_end = processed_start else: processed_end = processed_start + len(processed_segment) diff_acc += len(processed_segment) - len(segment) segment = processed_segment yield segment, (interval, (processed_start, processed_end), aggregated_mark is not None)
[docs]class IntervalsCollectionBasedReplacerOutputGeneratorLazy( _IntervalsCollectionBasedReplacerOutputGenerator):
[docs] def result(self) -> ResultLazyType: return self._result()
[docs]class IntervalsCollectionBasedReplacerOutputGenerator( _IntervalsCollectionBasedReplacerOutputGenerator):
[docs] def result(self) -> ResultType: return list(self._result())
#pylint: disable=W0223
[docs]class IntervalsCollectionBasedReplacerOperation(IntervalsCollectionBasedOperation): def __init__(self, replacer_intervals: Dict[ReplacerFunctionType, workflow.IntervalListType]) -> None: replacer_functions = [] interval_collections = [] for func, intervals in replacer_intervals.items(): replacer_functions.append(func) interval_collections.append(intervals) super().__init__(interval_collections) labeler2repl = { labeler_cls: replacer_function for labeler_cls, replacer_function in zip( self.sequential_labeler_classes, replacer_functions) } self.config = IntervalsCollectionBasedReplacerConfig(labeler2repl=labeler2repl)
[docs] def initialize_label_processor_class(self) -> None: self._label_processor_class = IntervalsCollectionBasedReplacerLabelProcessor
[docs]class IntervalsCollectionBasedReplacerLazy(IntervalsCollectionBasedReplacerOperation):
[docs] def initialize_output_generator_class(self) -> None: self._output_generator_class = IntervalsCollectionBasedReplacerOutputGeneratorLazy
[docs] def result(self, text: str) -> ResultLazyType: return cast(ResultLazyType, self.interval_based_workflow.result(text, self.config))
[docs]class IntervalsCollectionBasedReplacer(IntervalsCollectionBasedReplacerOperation):
[docs] def initialize_output_generator_class(self) -> None: self._output_generator_class = IntervalsCollectionBasedReplacerOutputGenerator
[docs] def result(self, text: str) -> ResultType: return cast(ResultType, self.interval_based_workflow.result(text, self.config))
[docs]class IntervalsCollectionBasedReplacerToString(IntervalsCollectionBasedReplacerOperation):
[docs] def initialize_output_generator_class(self) -> None: self._output_generator_class = IntervalsCollectionBasedReplacerOutputGeneratorLazy
[docs] def result(self, text: str) -> str: return ''.join( segment for segment, _ in self.interval_based_workflow.result(text, self.config))