Source code for clouddrift.pairs

"""
Functions to analyze pairs of contiguous data segments.
"""

import itertools
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Union

import numpy as np
import pandas as pd
import xarray as xr

from clouddrift import ragged, sphere

array_like = Union[list[float], np.ndarray[float], pd.Series, xr.DataArray]


[docs] def chance_pair( lon1: array_like, lat1: array_like, lon2: array_like, lat2: array_like, time1: array_like | None = None, time2: array_like | None = None, space_distance: float | None = 0, time_distance: float | None = 0, ): """Given two sets of longitudes, latitudes, and times arrays, return in pairs the indices of collocated data points that are within prescribed distances in space and time. Also known as chance pairs. Parameters ---------- lon1 : array_like First array of longitudes in degrees. lat1 : array_like First array of latitudes in degrees. lon2 : array_like Second array of longitudes in degrees. lat2 : array_like Second array of latitudes in degrees. time1 : array_like, optional First array of times. time2 : array_like, optional Second array of times. space_distance : float, optional Maximum allowable space distance in meters for a pair to qualify as chance pair. If the separation is within this distance, the pair is considered to be a chance pair. Default is 0, or no distance, i.e. the positions must be exactly the same. time_distance : float, optional Maximum allowable time distance for a pair to qualify as chance pair. If a separation is within this distance, and a space distance condition is satisfied, the pair is considered a chance pair. Default is 0, or no distance, i.e. the times must be exactly the same. Returns ------- indices1 : np.ndarray[int] Indices within the first set of arrays that lead to chance pair. indices2 : np.ndarray[int] Indices within the second set of arrays that lead to chance pair. Examples -------- In the following example, we load the GLAD dataset, extract the first two trajectories, and find between these the array indices that satisfy the chance pair criteria of 6 km separation distance and no time separation: >>> from clouddrift.datasets import glad >>> from clouddrift.pairs import chance_pair >>> from clouddrift.ragged import unpack >>> ds = glad() >>> lon1 = unpack(ds["longitude"], ds["rowsize"], rows=0).pop() >>> lat1 = unpack(ds["latitude"], ds["rowsize"], rows=0).pop() >>> time1 = unpack(ds["time"], ds["rowsize"], rows=0).pop() >>> lon2 = unpack(ds["longitude"], ds["rowsize"], rows=1).pop() >>> lat2 = unpack(ds["latitude"], ds["rowsize"], rows=1).pop() >>> time2 = unpack(ds["time"], ds["rowsize"], rows=1).pop() >>> i1, i2 = chance_pair(lon1, lat1, lon2, lat2, time1, time2, 6000, np.timedelta64(0)) >>> i1, i2 (array([177, 180, 183, 186, 189, 192]), array([166, 169, 172, 175, 178, 181])) Check to ensure our collocation in space worked by calculating the distance between the identified pairs: >>> sphere.distance(lon1[i1], lat1[i1], lon2[i2], lat2[i2]) array([5967.4844, 5403.253 , 5116.9136, 5185.715 , 5467.8555, 5958.4917], dtype=float32) Check the collocation in time: >>> time1[i1] - time2[i2] <xarray.DataArray 'time' (obs: 6)> array([0, 0, 0, 0, 0, 0], dtype='timedelta64[ns]') Coordinates: time (obs) datetime64[ns] 2012-07-21T21:30:00.524160 ... 2012-07-22T0... Dimensions without coordinates: obs Raises ------ ValueError If ``time1`` and ``time2`` are not both provided or both omitted. """ if (time1 is None and time2 is not None) or (time1 is not None and time2 is None): raise ValueError( "Both time1 and time2 must be provided or both must be omitted." ) time_present = time1 is not None and time2 is not None if time_present: # If time is provided, subset the trajectories to the overlapping times. overlap1, overlap2 = pair_time_overlap(time1, time2, time_distance) else: # Otherwise, initialize the overlap indices to the full length of the # trajectories. overlap1 = np.arange(lon1.size) overlap2 = np.arange(lon2.size) # Provided space distance is in meters, but here we convert it to degrees # for the bounding box overlap check. space_distance_degrees = np.degrees(space_distance / sphere.EARTH_RADIUS_METERS) # Compute the indices for each trajectory where the two trajectories' # bounding boxes overlap. bbox_overlap1, bbox_overlap2 = pair_bounding_box_overlap( lon1[overlap1], lat1[overlap1], lon2[overlap2], lat2[overlap2], space_distance_degrees, ) # bbox_overlap1 and bbox_overlap2 subset the overlap1 and overlap2 indices. overlap1 = overlap1[bbox_overlap1] overlap2 = overlap2[bbox_overlap2] # If time is present, first search for collocation in time. if time_present: time_separation = pair_time_distance(time1[overlap1], time2[overlap2]) time_match2, time_match1 = np.where(time_separation <= time_distance) overlap1 = overlap1[time_match1] overlap2 = overlap2[time_match2] # Now search for collocation in space. space_separation = pair_space_distance( lon1[overlap1], lat1[overlap1], lon2[overlap2], lat2[overlap2] ) space_overlap = space_separation <= space_distance if time_present: time_separation = pair_time_distance(time1[overlap1], time2[overlap2]) time_overlap = time_separation <= time_distance match2, match1 = np.where(space_overlap & time_overlap) else: match2, match1 = np.where(space_overlap) overlap1 = overlap1[match1] overlap2 = overlap2[match2] return overlap1, overlap2
[docs] def chance_pairs_from_ragged( lon: array_like, lat: array_like, rowsize: array_like, space_distance: float | None = 0, time: array_like | None = None, time_distance: float | None = 0, ) -> list[tuple[tuple[int, int], tuple[np.ndarray, np.ndarray]]]: """Return all chance pairs of contiguous trajectories in a ragged array, and their collocated points in space and (optionally) time, given input ragged arrays of longitude, latitude, and (optionally) time, and chance pair criteria as maximum allowable distances in space and time. If ``time`` and ``time_distance`` are omitted, the search will be done only on the spatial criteria, and the result will not include the time arrays. If ``time`` and ``time_distance`` are provided, the search will be done on both the spatial and temporal criteria, and the result will include the time arrays. Parameters ---------- lon : array_like Array of longitudes in degrees. lat : array_like Array of latitudes in degrees. rowsize : array_like Array of rowsizes. space_distance : float, optional Maximum space distance in meters for the pair to qualify as chance pair. If the separation is within this distance, the pair is considered to be a chance pair. Default is 0, or no distance, i.e. the positions must be exactly the same. time : array_like, optional Array of times. time_distance : float, optional Maximum time distance allowed for the pair to qualify as chance pair. If the separation is within this distance, and the space distance condition is satisfied, the pair is considered a chance pair. Default is 0, or no distance, i.e. the times must be exactly the same. Returns ------- pairs : List[Tuple[Tuple[int, int], Tuple[np.ndarray, np.ndarray]]] List of tuples, each tuple containing a Tuple of integer indices that corresponds to the trajectory rows in the ragged array, indicating the pair of trajectories that satisfy the chance pair criteria, and a Tuple of arrays containing the indices of the collocated points for each trajectory in the chance pair. Examples -------- In the following example, we load GLAD dataset as a ragged array dataset, subset the result to retain the first five trajectories, and finally find all trajectories that satisfy the chance pair criteria of 12 km separation distance and no time separation, as well as the indices of the collocated points for each pair. >>> from clouddrift.datasets import glad >>> from clouddrift.pairs import chance_pairs_from_ragged >>> from clouddrift.ragged import subset >>> ds = subset(glad(), {"id": ["CARTHE_001", "CARTHE_002", "CARTHE_003", "CARTHE_004", "CARTHE_005"]}, id_var_name="id") >>> pairs = chance_pairs_from_ragged( ds["longitude"].values, ds["latitude"].values, ds["rowsize"].values, space_distance=12000, time=ds["time"].values, time_distance=np.timedelta64(0) ) [((0, 1), (array([153, 156, 159, 162, 165, 168, 171, 174, 177, 180, 183, 186, 189, 192, 195, 198, 201, 204, 207, 210, 213, 216]), array([142, 145, 148, 151, 154, 157, 160, 163, 166, 169, 172, 175, 178, 181, 184, 187, 190, 193, 196, 199, 202, 205]))), ((3, 4), (array([141, 144, 147, 150, 153, 156, 159, 162, 165, 168, 171, 174, 177, 180, 183]), array([136, 139, 142, 145, 148, 151, 154, 157, 160, 163, 166, 169, 172, 175, 178])))] The result above shows that 2 chance pairs were found. Raises ------ ValueError If ``rowsize`` has fewer than two elements. """ if len(rowsize) < 2: raise ValueError("rowsize must have at least two elements.") pairs = list(itertools.combinations(np.arange(rowsize.size), 2)) i = ragged.rowsize_to_index(rowsize) results = [] with ThreadPoolExecutor() as executor: if time is None: futures = [ executor.submit( chance_pair, lon[i[j] : i[j + 1]], lat[i[j] : i[j + 1]], lon[i[k] : i[k + 1]], lat[i[k] : i[k + 1]], space_distance=space_distance, ) for j, k in pairs ] else: futures = [ executor.submit( chance_pair, lon[i[j] : i[j + 1]], lat[i[j] : i[j + 1]], lon[i[k] : i[k + 1]], lat[i[k] : i[k + 1]], time[i[j] : i[j + 1]], time[i[k] : i[k + 1]], space_distance, time_distance, ) for j, k in pairs ] for future in as_completed(futures): res = future.result() # chance_pair function returns empty arrays if no chance criteria # are satisfied. We only want to keep pairs that satisfy the # criteria. chance_pair returns a tuple of arrays that are always # the same size, so we only need to check the length of the first # array. if res[0].size > 0: results.append((pairs[futures.index(future)], res)) return results
[docs] def pair_bounding_box_overlap( lon1: array_like, lat1: array_like, lon2: array_like, lat2: array_like, distance: float | None = 0, ) -> tuple[np.ndarray[bool], np.ndarray[bool]]: """Given two arrays of longitudes and latitudes, return boolean masks for their overlapping bounding boxes. Parameters ---------- lon1 : array_like First array of longitudes in degrees. lat1 : array_like First array of latitudes in degrees. lon2 : array_like Second array of longitudes in degrees. lat2 : array_like Second array of latitudes in degrees. distance : float, optional Distance in degrees for the overlap. If the overlap is within this distance, the bounding boxes are considered to overlap. Default is 0. Returns ------- overlap1 : np.ndarray[int] Indices ``lon1`` and ``lat1`` where their bounding box overlaps with that of ``lon2`` and ``lat2``. overlap2 : np.ndarray[int] Indices ``lon2`` and ``lat2`` where their bounding box overlaps with that of ``lon1`` and ``lat1``. Examples -------- >>> lon1 = [0, 0, 1, 1] >>> lat1 = [0, 0, 1, 1] >>> lon2 = [1, 1, 2, 2] >>> lat2 = [1, 1, 2, 2] >>> pair_bounding_box_overlap(lon1, lat1, lon2, lat2, 0.5) (array([2, 3]), array([0, 1])) """ # First get the bounding box of each trajectory. # We unwrap the longitudes before computing min/max because we want to # consider trajectories that cross the dateline. lon1_min, lon1_max = ( np.min(np.unwrap(lon1, period=360)), np.max(np.unwrap(lon1, period=360)), ) lat1_min, lat1_max = np.min(lat1), np.max(lat1) lon2_min, lon2_max = ( np.min(np.unwrap(lon2, period=360)), np.max(np.unwrap(lon2, period=360)), ) lat2_min, lat2_max = np.min(lat2), np.max(lat2) bounding_boxes_overlap = ( (lon1_min <= lon2_max + distance) & (lon1_max >= lon2_min - distance) & (lat1_min <= lat2_max + distance) & (lat1_max >= lat2_min - distance) ) # Now check if the trajectories overlap within the bounding box. if bounding_boxes_overlap: overlap_start = ( max(lon1_min, lon2_min) - distance, # West max(lat1_min, lat2_min) - distance, # South ) overlap_end = ( min(lon1_max, lon2_max) + distance, # East min(lat1_max, lat2_max) + distance, # North ) overlap1 = ( (lon1 >= overlap_start[0]) & (lon1 <= overlap_end[0]) & (lat1 >= overlap_start[1]) & (lat1 <= overlap_end[1]) ) overlap2 = ( (lon2 >= overlap_start[0]) & (lon2 <= overlap_end[0]) & (lat2 >= overlap_start[1]) & (lat2 <= overlap_end[1]) ) return np.where(overlap1)[0], np.where(overlap2)[0] else: return np.array([], dtype=int), np.array([], dtype=int)
[docs] def pair_space_distance( lon1: array_like, lat1: array_like, lon2: array_like, lat2: array_like, ) -> np.ndarray[float]: """Given two arrays of longitudes and latitudes, return the distance on a sphere between all pairs of points. Parameters ---------- lon1 : array_like First array of longitudes in degrees. lat1 : array_like First array of latitudes in degrees. lon2 : array_like Second array of longitudes in degrees. lat2 : array_like Second array of latitudes in degrees. Returns ------- distance : np.ndarray[float] Array of distances between all pairs of points. Examples -------- >>> lon1 = [0, 0, 1, 1] >>> lat1 = [0, 0, 1, 1] >>> lon2 = [1, 1, 2, 2] >>> lat2 = [1, 1, 2, 2] >>> pair_space_distance(lon1, lat1, lon2, lat2) array([[157424.62387233, 157424.62387233, 0. , 0. ], [157424.62387233, 157424.62387233, 0. , 0. ], [314825.26360286, 314825.26360286, 157400.64794884, 157400.64794884], [314825.26360286, 314825.26360286, 157400.64794884, 157400.64794884]]) """ # Create longitude and latitude matrices from arrays to compute distance lon1_2d, lon2_2d = np.meshgrid(lon1, lon2, copy=False) lat1_2d, lat2_2d = np.meshgrid(lat1, lat2, copy=False) # Compute distance between all pairs of points distance = sphere.distance(lon1_2d, lat1_2d, lon2_2d, lat2_2d) return distance
[docs] def pair_time_distance( time1: array_like, time2: array_like, ) -> np.ndarray[float]: """Given two arrays of times (or any other monotonically increasing quantity), return the temporal distance between all pairs of times. Parameters ---------- time1 : array_like First array of times. time2 : array_like Second array of times. Returns ------- distance : np.ndarray[float] Array of distances between all pairs of times. Examples -------- >>> time1 = np.arange(4) >>> time2 = np.arange(2, 6) >>> pair_time_distance(time1, time2) array([[2, 1, 0, 1], [3, 2, 1, 0], [4, 3, 2, 1], [5, 4, 3, 2]]) """ # Create time matrices from arrays to compute distance time1_2d, time2_2d = np.meshgrid(time1, time2, copy=False) # Compute distance between all pairs of times distance = np.abs(time1_2d - time2_2d) return distance
[docs] def pair_time_overlap( time1: array_like, time2: array_like, distance: float | None = 0, ) -> tuple[np.ndarray[int], np.ndarray[int]]: """Given two arrays of times (or any other monotonically increasing quantity), return indices where the times are within a prescribed distance. Although higher-level array containers like xarray and pandas are supported for input arrays, this function is an order of magnitude faster when passing in numpy arrays. Parameters ---------- time1 : array_like First array of times. time2 : array_like Second array of times. distance : float Maximum distance within which the values of ``time1`` and ``time2`` are considered to overlap. Default is 0, or, the values must be exactly the same. Returns ------- overlap1 : np.ndarray[int] Indices of ``time1`` where its time overlaps with ``time2``. overlap2 : np.ndarray[int] Indices of ``time2`` where its time overlaps with ``time1``. Examples -------- >>> time1 = np.arange(4) >>> time2 = np.arange(2, 6) >>> pair_time_overlap(time1, time2) (array([2, 3]), array([0, 1])) >>> pair_time_overlap(time1, time2, 1) (array([1, 2, 3]), array([0, 1, 2])) """ time1_min, time1_max = np.min(time1), np.max(time1) time2_min, time2_max = np.min(time2), np.max(time2) overlap_start = max(time1_min, time2_min) - distance overlap_end = min(time1_max, time2_max) + distance overlap1 = np.where((time1 >= overlap_start) & (time1 <= overlap_end))[0] overlap2 = np.where((time2 >= overlap_start) & (time2 <= overlap_end))[0] return overlap1, overlap2