Source code for fiberoptics.common.misc._interval

import functools
import re
from typing import Callable, List, Union

import pandas as pd


[docs] def find_continuous_intervals(intervals: pd.IntervalIndex, threshold=0) -> List[pd.IntervalIndex]: """Splits a list of intervals into multiple lists of continuous intervals. Parameters ---------- intervals : IntervalIndex A list of (possibly) non-continuous intervals. threshold : interval delta, default 0 The threshold used to determine continuity. The threshold type should reflect the type of the intervals, e.g. int for int intervals and timedelta for datetime intervals. Returns ------- List, of type IntervalIndex A list of continuous intervals. """ intervals = pd.IntervalIndex(intervals).sort_values() if intervals.empty: return list() if "datetime" in str(intervals.dtype): threshold = pd.Timedelta(threshold) right_sorted = intervals.sort_values(key=lambda x: getattr(x, "right", x)) left_continuous = intervals.left[1:] - intervals.right[:-1] <= threshold right_continuous = right_sorted.left[1:] - right_sorted.right[:-1] <= threshold # Give each interval a number, where continuous intervals have the same number groups = [0, *(~(left_continuous | right_continuous)).cumsum()] # Group by the given numbers and transform each group to an IntervalIndex return list(intervals.to_series().groupby(groups).agg(list).map(pd.IntervalIndex))
[docs] def combine_continuous_intervals(intervals: pd.IntervalIndex, threshold=0): """Combines continuous (or overlapping) intervals. Parameters ---------- intervals : IntervalIndex A list of (possibly) non-continuous intervals. threshold : interval delta, default 0 The threshold used to determine continuity. The threshold type should reflect the type of the intervals, e.g. int for int intervals and timedelta for datetime intervals. Returns ------- IntervalIndex Containing non-overlapping intervals with a gap larger than `threshold`. The length of the returned index is smaller or equal to the input index. """ # Convert the IntervalIndex to a list of continuous IntervalIndex objects result = find_continuous_intervals(intervals, threshold) # Convert each IntervalIndex object to a tuple with its end points and back into # an IntervalIndex with correct type return pd.IntervalIndex.from_tuples( [(min(x.left), max(x.right)) for x in result], closed=intervals.closed, dtype=intervals.dtype, )
[docs] def add_interval(index: pd.IntervalIndex, other: pd.Interval): """Add an interval to a continuous interval index. A continuous interval index has one specific property; none of the intervals overlap. Hence, adding intervals to a continuous interval index has the following behavior: Adding an interval which does not overlap with the index The interval is simply added. Adding an interval which overlaps partially with an interval in the index The index' interval is extended. Adding an interval which overlaps completely with an interval in the index The interval is ignored. Parameters ---------- index : IntervalIndex The continuous interval index. other : Interval The interval to add. Must have the same interval type. Returns ------- IntervalIndex The new index after adding the interval. """ other = pd.IntervalIndex([other], dtype=index.dtype) return combine_continuous_intervals(index.append(other))
[docs] def subtract_interval(index: pd.IntervalIndex, other: pd.Interval): """Subtract an interval from the existing intervals. A continuous interval index has one specific property; none of the intervals overlap. Hence, subtracting intervals from a continuous interval index has the following behavior: Subtracting an interval which does not overlap with the index The interval is ignored. Subtracting an interval which overlaps partially with an interval in the index The index' interval is shortened. Subtracting an interval which overlaps completely with an interval in the index The index' interval is split in two. Parameters ---------- index : IntervalIndex The continuous interval index. other : Interval The interval to subtract. Must have the same interval type. Returns ------- IntervalIndex The new index after subtracting the interval. """ def generator(): for interval in index: if not interval.overlaps(other): yield interval else: if interval.left < other.left: yield pd.Interval(interval.left, other.left) if other.right < interval.right: yield pd.Interval(other.right, interval.right) return pd.IntervalIndex(list(generator()))
[docs] def with_interval_cache(get_data_function: Callable): """Wraps a `get_data_function` with cache functionality. Should only be used when you expect to request the same or overlapping intervals. The decorator makes sure you only request the missing data. E.g. when requesting data for the period [3, 4) and then [2, 5), the last request is transformed into two requests, namely [2, 3) and [4, 5). """ cached_intervals = dict() cached_data = dict() @functools.wraps(get_data_function) def wrapped_function(id_or_ids: Union[str, List[str]], start_time: pd.Timestamp, end_time: pd.Timestamp, **kwargs): ids = [id_or_ids] if isinstance(id_or_ids, str) else id_or_ids start_time = pd.Timestamp(start_time) end_time = pd.Timestamp(end_time) missing_intervals = pd.IntervalIndex.from_tuples([(start_time, end_time)]) dtype = missing_intervals.dtype # Use the same time zone for id in ids: if id not in cached_intervals: cached_intervals[id] = pd.IntervalIndex([], dtype=dtype) cached_data[id] = pd.DataFrame() for interval in cached_intervals[id]: missing_intervals = subtract_interval(missing_intervals, interval) for interval in missing_intervals: df = get_data_function(id, interval.left, interval.right, **kwargs) cached_intervals[id] = add_interval(cached_intervals[id], interval) cached_data[id] = pd.concat([cached_data[id], df]).sort_index() # Multi-level column-index is only returned if a list of ids is given if isinstance(id_or_ids, str): return cached_data[id_or_ids][start_time:end_time] return pd.concat([cached_data[id][start_time:end_time] for id in ids], axis=1, keys=ids) return wrapped_function
[docs] def serialize_interval_index(intervals: pd.IntervalIndex): """Serialize an interval index. An interval index can be represented in two different ways: range If the intervals has a fixed frequency, the only necessary information is the `start`, `end` and `freq` parameters. This is the most compact representation. arrays Otherwise, all intervals are stored in `left` and `right` arrays. Parameters ---------- intervals : IntervalIndex The intervals to serialize. Returns ------- dict The serialized interval index. Raises ------ ValueError If the deserialization of the serialization is not identical to the input. """ dtype = str(intervals.dtype) if intervals.empty: return dict(left=[], right=[], dtype=dtype) def serialize_range(): start = intervals[0].left end = intervals[-1].right freq = intervals[0].right - intervals[0].left if "datetime" in dtype: start = start.value end = end.value freq = freq.value return dict(start=start, end=end, freq=freq, dtype=dtype) def serialize_arrays(): left = intervals.left right = intervals.right if "datetime" in dtype: left = left.view("int64") right = right.view("int64") return dict(left=list(left), right=list(right), dtype=dtype) for serialization_method in [serialize_range, serialize_arrays]: serialized = serialization_method() deserialized = deserialize_interval_index(serialized) try: pd.testing.assert_index_equal(deserialized, intervals) return serialized except AssertionError: pass raise ValueError("Serialization failed")
[docs] def deserialize_interval_index(serialized: dict): """Deserialize a serialized interval index. An interval index can be represented in two different ways: range If the intervals has a fixed frequency, the only necessary information is the `start`, `end` and `freq` parameters. This is the most compact representation. arrays Otherwise, all intervals are stored in `left` and `right` arrays. Parameters ---------- serialized : dict The serialized interval index. Returns ------- IntervalIndex The deserialized interval index. Raises ------ ValueError If the deserialization fails. """ # Extract information from the interval index dtype match = re.match(r"interval\[(.+), (.+)\]", serialized["dtype"]) dtype = match.group(1) closed = match.group(2) # Try to extract timezone information match = re.match(r"datetime64\[ns, (.+)\]", dtype) tz = match.group(1) if match is not None else None def deserialize_range(): start = serialized["start"] end = serialized["end"] freq = serialized["freq"] if dtype.startswith("datetime"): start = pd.Timestamp(start) end = pd.Timestamp(end) freq = pd.Timedelta(freq) if tz is not None: start = start.tz_localize("UTC").tz_convert(tz) end = end.tz_localize("UTC").tz_convert(tz) return pd.interval_range(start, end, freq=freq, closed=closed) def deserialize_arrays(): left = serialized["left"] right = serialized["right"] if dtype.startswith("datetime"): left = pd.DatetimeIndex(left) right = pd.DatetimeIndex(right) if tz is not None: left = left.tz_localize("UTC").tz_convert(tz) right = right.tz_localize("UTC").tz_convert(tz) return pd.IntervalIndex.from_arrays(left, right, closed=closed) for deserialization_method in [deserialize_range, deserialize_arrays]: try: return deserialization_method() except KeyError: pass raise ValueError("Deserialization failed")