xopr.radar_util
1import numpy as np 2import scipy.constants 3import xarray as xr 4 5from xopr.geometry import project_dataset 6 7 8def add_along_track(ds: xr.Dataset, projection: str = None) -> xr.Dataset: 9 """ 10 Add an along-track distance coordinate to the dataset based on the latitude and longitude coordinates. 11 12 Parameters: 13 ds (xr.Dataset): Input xarray Dataset with 'Latitude' and 'Longitude' coordinates. 14 15 Returns: 16 xr.Dataset: Dataset with an added 'along_track' coordinate. 17 """ 18 19 if 'Latitude' not in ds or 'Longitude' not in ds: 20 if 'lat' in ds and 'lon' in ds: 21 ds = ds.rename({'lat': 'Latitude', 'lon': 'Longitude'}) 22 else: 23 raise ValueError("Dataset must contain 'Latitude' and 'Longitude' or 'lat' and 'lon' coordinates.") 24 25 # Project the dataset to the specified projection 26 if projection is None: 27 if ds['Latitude'].mean() < 0: 28 projection = "EPSG:3031" # Antarctic Polar Stereographic 29 else: 30 projection = "EPSG:3413" # Arctic Polar Stereographic 31 projected_ds = project_dataset(ds, target_crs=projection) 32 33 # Calculate differences between consecutive points 34 dx = projected_ds['x'].diff(dim='slow_time', label='upper').to_numpy() 35 dy = projected_ds['y'].diff(dim='slow_time', label='upper').to_numpy() 36 37 # Calculate incremental distances 38 distances = (dx**2 + dy**2)**0.5 39 # Add a zero at the start to align with slow_time 40 distances = np.insert(distances, 0, 0) 41 42 # Calculate cumulative distance along track 43 along_track = np.cumsum(distances) 44 45 # Add the along-track coordinate to the original dataset 46 ds = ds.assign_coords(along_track=('slow_time', along_track)) 47 ds['along_track'].attrs['units'] = 'meters' 48 ds['along_track'].attrs['description'] = 'Cumulative distance along the radar track' 49 50 return ds 51 52def estimate_vertical_distances(ds: xr.Dataset, epsilon_ice: float = 3.15) -> xr.Dataset: 53 """ 54 Estimate vertical distances from two-way travel time (TWTT) using the speed of light in ice. 55 56 Parameters: 57 ds (xr.Dataset): Input xarray Dataset with TWTT layers as variables. 58 epsilon_ice (float): Relative permittivity of ice. Default is 3.15. 59 60 Returns: 61 xr.Dataset: Dataset with added vertical distance variables for each TWTT layer. 62 """ 63 64 v_ice = scipy.constants.c / np.sqrt(epsilon_ice) # Speed of light in ice (m/s) 65 66 # Initialize local_speed with dimensions (slow_time, twtt) to always be scipy.constants.c 67 local_speed = xr.full_like(ds['Data'], scipy.constants.c) 68 69 # Where twtt (a 1D dimension) > ds['Surface'] (data variable with dimension slow_time), set local_speed to v_ice 70 # Broadcast comparison: expand Surface to match Data dimensions 71 surface_broadcast = ds['Surface'].broadcast_like(ds['Data']) 72 twtt_broadcast = ds['twtt'].broadcast_like(ds['Data']) 73 local_speed = xr.where(twtt_broadcast > surface_broadcast, v_ice, scipy.constants.c) 74 75 # Multiply against the differences in the twtt dimension to get the distance intervals 76 twtt_intervals = np.diff(ds['twtt']) 77 twtt_intervals = np.insert(twtt_intervals, 0, ds['twtt'].isel(twtt=0)) # Add the first interval 78 twtt_intervals = xr.DataArray(twtt_intervals, dims=['twtt'], coords={'twtt': ds['twtt']}) 79 80 # Calculate distance for each interval (one-way distance = speed * time / 2) 81 distance_intervals = local_speed * twtt_intervals / 2 82 83 # Cumulatively sum the distance intervals to get the vertical distance 84 vertical_distance = distance_intervals.cumsum(dim='twtt') 85 vertical_distance.name = 'vertical_distance' 86 vertical_distance.attrs['units'] = 'meters' 87 vertical_distance.attrs['description'] = 'Vertical distance from aircraft calculated from TWTT' 88 89 return vertical_distance 90 91 92def interpolate_to_vertical_grid(ds: xr.Dataset, 93 vertical_coordinate: str = 'range', 94 vert_min: float = None, 95 vert_max: float = None, 96 vert_spacing: float = 10.0, 97 epsilon_ice: float = 3.15) -> xr.Dataset: 98 """ 99 Interpolate radar data from TWTT coordinates to regular vertical distance coordinates. 100 101 Parameters: 102 ----------- 103 ds : xr.Dataset 104 Input dataset with 'Data' variable, 'along_track' coordinate, and 'Surface' variable 105 vertical_coordinate : str 106 The vertical coordinate to use for interpolation. 'range' will interpolate to the 107 vertical range from the instrument. 'wgs84' will interpolate to WGS84 elevation 108 using the 'Elevation' variable in the dataset. Default is 'range'. 109 vert_min : float 110 Minimum vertical distance in meters, if None, uses minimum from data 111 vert_max : float 112 Maximum vertical distance in meters, if None, uses maximum from data 113 vert_spacing : float 114 Vertical spacing in meters 115 epsilon_ice : float 116 Relative permittivity of ice (default 3.15) 117 118 Returns: 119 -------- 120 xr.Dataset 121 Dataset with data interpolated to regular vertical distance grid 122 """ 123 124 # Calculate vertical distances 125 vert_dist = estimate_vertical_distances(ds, epsilon_ice) 126 127 vert_coord_name = 'range' 128 129 if vertical_coordinate == 'wgs84': 130 if 'Elevation' not in ds: 131 raise ValueError("Dataset must contain 'Elevation' variable to use elevation as vertical coordinate.") 132 vert_dist = ds['Elevation'].broadcast_like(vert_dist) - vert_dist 133 vert_coord_name = 'wgs84' 134 elif vertical_coordinate != 'range': 135 raise ValueError("vertical_coordinate must be either 'range' or 'wgs84'") 136 137 if vert_min is None: 138 vert_min = float(vert_dist.min().values) 139 if vert_max is None: 140 vert_max = float(vert_dist.max().values) 141 142 # Create regular vertical distance grid 143 regular_vert = np.arange(vert_min, vert_max, vert_spacing) 144 145 # Use 1D interpolation along each trace (much faster than 2D griddata) 146 147 n_traces = len(ds['slow_time']) 148 n_vert = len(regular_vert) 149 data_regular = np.full((n_traces, n_vert), np.nan, dtype=np.float32) 150 151 # Interpolate each trace individually 152 for i in range(n_traces): 153 trace_data = ds['Data'].isel(slow_time=i).values 154 trace_vert = vert_dist.isel(slow_time=i).values 155 156 if vertical_coordinate == 'wgs84': 157 trace_data = trace_data[::-1] 158 trace_vert = trace_vert[::-1] 159 160 # Remove NaN values for this trace 161 valid_idx = ~(np.isnan(trace_data) | np.isnan(trace_vert)) 162 163 if not np.all(np.diff(trace_vert[valid_idx]) > 0): 164 raise ValueError("Vertical distances must be strictly increasing for interpolation.") 165 166 if np.sum(valid_idx) > 1: # Need at least 2 points for interpolation 167 data_regular[i, :] = np.interp(regular_vert, trace_vert[valid_idx], 168 trace_data[valid_idx], 169 left=-1, right=-2) 170 171 # Create new dataset 172 ds_regular = xr.Dataset( 173 { 174 'Data': (('slow_time', vert_coord_name), data_regular), 175 }, 176 coords={ 177 'slow_time': ds['slow_time'], 178 vert_coord_name: regular_vert, 179 } 180 ) 181 182 if 'along_track' in ds: 183 along_track = ds['along_track'].values 184 ds_regular = ds_regular.assign_coords(along_track=('slow_time', along_track)) 185 186 for data_var in ds.data_vars: 187 if data_var not in ['Data']: 188 ds_regular[data_var] = ds[data_var] 189 190 # Copy relevant attributes 191 ds_regular.attrs = ds.attrs.copy() 192 ds_regular[vert_coord_name].attrs['units'] = 'meters' 193 if vertical_coordinate == 'range': 194 ds_regular[vert_coord_name].attrs['description'] = 'Vertical distance from aircraft (positive down)' 195 else: 196 ds_regular[vert_coord_name].attrs['description'] = 'WGS84 Elevation (meters)' 197 198 return ds_regular 199 200def layer_twtt_to_range(layer_ds, surface_layer_ds, vertical_coordinate='range', subsurface_dielectric_permittivity=3.15): 201 """ 202 Convert layer two-way travel time (TWTT) to range or elevation coordinates. 203 204 Parameters: 205 ----------- 206 layer_ds : xr.Dataset 207 Dataset containing layer TWTT values 208 surface_layer_ds : xr.Dataset 209 Dataset containing surface layer TWTT values (typically layer 1) 210 vertical_coordinate : str 211 'range' for distance from aircraft or 'elevation'/'wgs84' for WGS84 elevation 212 subsurface_dielectric_permittivity : float 213 Dielectric permittivity for subsurface propagation (default 3.15 for ice) 214 215 Returns: 216 -------- 217 xr.Dataset 218 Copy of layer_ds with added 'range' or 'wgs84' field containing layer positions 219 """ 220 # Create a copy of the layer dataset 221 result_ds = layer_ds.copy() 222 223 # Calculate speed of light in the subsurface medium 224 speed_in_medium = scipy.constants.c / np.sqrt(subsurface_dielectric_permittivity) 225 226 # Get TWTT values 227 layer_twtt = layer_ds['twtt'] 228 surface_twtt = surface_layer_ds['twtt'] 229 230 # Calculate surface range (distance from aircraft to surface) 231 surface_range = surface_twtt * (scipy.constants.c / 2) 232 233 # Calculate TWTT difference from surface to layer 234 twtt_from_surface = layer_twtt - surface_twtt 235 236 # Calculate range from aircraft to layer 237 layer_range = surface_range + (twtt_from_surface * (speed_in_medium / 2)) 238 239 if vertical_coordinate == 'range': 240 result_ds['range'] = layer_range 241 result_ds['range'].attrs['units'] = 'meters' 242 result_ds['range'].attrs['description'] = 'Range from aircraft to layer' 243 elif vertical_coordinate == 'elevation' or vertical_coordinate == 'wgs84': 244 # Calculate WGS84 elevation 245 # Surface elevation = aircraft elevation - surface range 246 if 'elev' in surface_layer_ds: 247 surface_elev = surface_layer_ds['elev'] 248 else: 249 raise ValueError("Surface elevation data ('elev') required for elevation coordinate conversion") 250 251 surface_wgs84 = surface_elev - surface_range 252 253 # Layer elevation = surface elevation - distance from surface to layer 254 layer_wgs84 = surface_wgs84 - (twtt_from_surface * (speed_in_medium / 2)) 255 256 result_ds['wgs84'] = layer_wgs84 257 result_ds['wgs84'].attrs['units'] = 'meters' 258 result_ds['wgs84'].attrs['description'] = 'WGS84 elevation of layer' 259 else: 260 raise ValueError(f"Unknown vertical coordinate: {vertical_coordinate}. Use 'range', 'elevation', or 'wgs84'.") 261 262 return result_ds
9def add_along_track(ds: xr.Dataset, projection: str = None) -> xr.Dataset: 10 """ 11 Add an along-track distance coordinate to the dataset based on the latitude and longitude coordinates. 12 13 Parameters: 14 ds (xr.Dataset): Input xarray Dataset with 'Latitude' and 'Longitude' coordinates. 15 16 Returns: 17 xr.Dataset: Dataset with an added 'along_track' coordinate. 18 """ 19 20 if 'Latitude' not in ds or 'Longitude' not in ds: 21 if 'lat' in ds and 'lon' in ds: 22 ds = ds.rename({'lat': 'Latitude', 'lon': 'Longitude'}) 23 else: 24 raise ValueError("Dataset must contain 'Latitude' and 'Longitude' or 'lat' and 'lon' coordinates.") 25 26 # Project the dataset to the specified projection 27 if projection is None: 28 if ds['Latitude'].mean() < 0: 29 projection = "EPSG:3031" # Antarctic Polar Stereographic 30 else: 31 projection = "EPSG:3413" # Arctic Polar Stereographic 32 projected_ds = project_dataset(ds, target_crs=projection) 33 34 # Calculate differences between consecutive points 35 dx = projected_ds['x'].diff(dim='slow_time', label='upper').to_numpy() 36 dy = projected_ds['y'].diff(dim='slow_time', label='upper').to_numpy() 37 38 # Calculate incremental distances 39 distances = (dx**2 + dy**2)**0.5 40 # Add a zero at the start to align with slow_time 41 distances = np.insert(distances, 0, 0) 42 43 # Calculate cumulative distance along track 44 along_track = np.cumsum(distances) 45 46 # Add the along-track coordinate to the original dataset 47 ds = ds.assign_coords(along_track=('slow_time', along_track)) 48 ds['along_track'].attrs['units'] = 'meters' 49 ds['along_track'].attrs['description'] = 'Cumulative distance along the radar track' 50 51 return ds
Add an along-track distance coordinate to the dataset based on the latitude and longitude coordinates.
Parameters: ds (xr.Dataset): Input xarray Dataset with 'Latitude' and 'Longitude' coordinates.
Returns: xr.Dataset: Dataset with an added 'along_track' coordinate.
53def estimate_vertical_distances(ds: xr.Dataset, epsilon_ice: float = 3.15) -> xr.Dataset: 54 """ 55 Estimate vertical distances from two-way travel time (TWTT) using the speed of light in ice. 56 57 Parameters: 58 ds (xr.Dataset): Input xarray Dataset with TWTT layers as variables. 59 epsilon_ice (float): Relative permittivity of ice. Default is 3.15. 60 61 Returns: 62 xr.Dataset: Dataset with added vertical distance variables for each TWTT layer. 63 """ 64 65 v_ice = scipy.constants.c / np.sqrt(epsilon_ice) # Speed of light in ice (m/s) 66 67 # Initialize local_speed with dimensions (slow_time, twtt) to always be scipy.constants.c 68 local_speed = xr.full_like(ds['Data'], scipy.constants.c) 69 70 # Where twtt (a 1D dimension) > ds['Surface'] (data variable with dimension slow_time), set local_speed to v_ice 71 # Broadcast comparison: expand Surface to match Data dimensions 72 surface_broadcast = ds['Surface'].broadcast_like(ds['Data']) 73 twtt_broadcast = ds['twtt'].broadcast_like(ds['Data']) 74 local_speed = xr.where(twtt_broadcast > surface_broadcast, v_ice, scipy.constants.c) 75 76 # Multiply against the differences in the twtt dimension to get the distance intervals 77 twtt_intervals = np.diff(ds['twtt']) 78 twtt_intervals = np.insert(twtt_intervals, 0, ds['twtt'].isel(twtt=0)) # Add the first interval 79 twtt_intervals = xr.DataArray(twtt_intervals, dims=['twtt'], coords={'twtt': ds['twtt']}) 80 81 # Calculate distance for each interval (one-way distance = speed * time / 2) 82 distance_intervals = local_speed * twtt_intervals / 2 83 84 # Cumulatively sum the distance intervals to get the vertical distance 85 vertical_distance = distance_intervals.cumsum(dim='twtt') 86 vertical_distance.name = 'vertical_distance' 87 vertical_distance.attrs['units'] = 'meters' 88 vertical_distance.attrs['description'] = 'Vertical distance from aircraft calculated from TWTT' 89 90 return vertical_distance
Estimate vertical distances from two-way travel time (TWTT) using the speed of light in ice.
Parameters: ds (xr.Dataset): Input xarray Dataset with TWTT layers as variables. epsilon_ice (float): Relative permittivity of ice. Default is 3.15.
Returns: xr.Dataset: Dataset with added vertical distance variables for each TWTT layer.
93def interpolate_to_vertical_grid(ds: xr.Dataset, 94 vertical_coordinate: str = 'range', 95 vert_min: float = None, 96 vert_max: float = None, 97 vert_spacing: float = 10.0, 98 epsilon_ice: float = 3.15) -> xr.Dataset: 99 """ 100 Interpolate radar data from TWTT coordinates to regular vertical distance coordinates. 101 102 Parameters: 103 ----------- 104 ds : xr.Dataset 105 Input dataset with 'Data' variable, 'along_track' coordinate, and 'Surface' variable 106 vertical_coordinate : str 107 The vertical coordinate to use for interpolation. 'range' will interpolate to the 108 vertical range from the instrument. 'wgs84' will interpolate to WGS84 elevation 109 using the 'Elevation' variable in the dataset. Default is 'range'. 110 vert_min : float 111 Minimum vertical distance in meters, if None, uses minimum from data 112 vert_max : float 113 Maximum vertical distance in meters, if None, uses maximum from data 114 vert_spacing : float 115 Vertical spacing in meters 116 epsilon_ice : float 117 Relative permittivity of ice (default 3.15) 118 119 Returns: 120 -------- 121 xr.Dataset 122 Dataset with data interpolated to regular vertical distance grid 123 """ 124 125 # Calculate vertical distances 126 vert_dist = estimate_vertical_distances(ds, epsilon_ice) 127 128 vert_coord_name = 'range' 129 130 if vertical_coordinate == 'wgs84': 131 if 'Elevation' not in ds: 132 raise ValueError("Dataset must contain 'Elevation' variable to use elevation as vertical coordinate.") 133 vert_dist = ds['Elevation'].broadcast_like(vert_dist) - vert_dist 134 vert_coord_name = 'wgs84' 135 elif vertical_coordinate != 'range': 136 raise ValueError("vertical_coordinate must be either 'range' or 'wgs84'") 137 138 if vert_min is None: 139 vert_min = float(vert_dist.min().values) 140 if vert_max is None: 141 vert_max = float(vert_dist.max().values) 142 143 # Create regular vertical distance grid 144 regular_vert = np.arange(vert_min, vert_max, vert_spacing) 145 146 # Use 1D interpolation along each trace (much faster than 2D griddata) 147 148 n_traces = len(ds['slow_time']) 149 n_vert = len(regular_vert) 150 data_regular = np.full((n_traces, n_vert), np.nan, dtype=np.float32) 151 152 # Interpolate each trace individually 153 for i in range(n_traces): 154 trace_data = ds['Data'].isel(slow_time=i).values 155 trace_vert = vert_dist.isel(slow_time=i).values 156 157 if vertical_coordinate == 'wgs84': 158 trace_data = trace_data[::-1] 159 trace_vert = trace_vert[::-1] 160 161 # Remove NaN values for this trace 162 valid_idx = ~(np.isnan(trace_data) | np.isnan(trace_vert)) 163 164 if not np.all(np.diff(trace_vert[valid_idx]) > 0): 165 raise ValueError("Vertical distances must be strictly increasing for interpolation.") 166 167 if np.sum(valid_idx) > 1: # Need at least 2 points for interpolation 168 data_regular[i, :] = np.interp(regular_vert, trace_vert[valid_idx], 169 trace_data[valid_idx], 170 left=-1, right=-2) 171 172 # Create new dataset 173 ds_regular = xr.Dataset( 174 { 175 'Data': (('slow_time', vert_coord_name), data_regular), 176 }, 177 coords={ 178 'slow_time': ds['slow_time'], 179 vert_coord_name: regular_vert, 180 } 181 ) 182 183 if 'along_track' in ds: 184 along_track = ds['along_track'].values 185 ds_regular = ds_regular.assign_coords(along_track=('slow_time', along_track)) 186 187 for data_var in ds.data_vars: 188 if data_var not in ['Data']: 189 ds_regular[data_var] = ds[data_var] 190 191 # Copy relevant attributes 192 ds_regular.attrs = ds.attrs.copy() 193 ds_regular[vert_coord_name].attrs['units'] = 'meters' 194 if vertical_coordinate == 'range': 195 ds_regular[vert_coord_name].attrs['description'] = 'Vertical distance from aircraft (positive down)' 196 else: 197 ds_regular[vert_coord_name].attrs['description'] = 'WGS84 Elevation (meters)' 198 199 return ds_regular
Interpolate radar data from TWTT coordinates to regular vertical distance coordinates.
Parameters:
ds : xr.Dataset Input dataset with 'Data' variable, 'along_track' coordinate, and 'Surface' variable vertical_coordinate : str The vertical coordinate to use for interpolation. 'range' will interpolate to the vertical range from the instrument. 'wgs84' will interpolate to WGS84 elevation using the 'Elevation' variable in the dataset. Default is 'range'. vert_min : float Minimum vertical distance in meters, if None, uses minimum from data vert_max : float Maximum vertical distance in meters, if None, uses maximum from data vert_spacing : float Vertical spacing in meters epsilon_ice : float Relative permittivity of ice (default 3.15)
Returns:
xr.Dataset Dataset with data interpolated to regular vertical distance grid
201def layer_twtt_to_range(layer_ds, surface_layer_ds, vertical_coordinate='range', subsurface_dielectric_permittivity=3.15): 202 """ 203 Convert layer two-way travel time (TWTT) to range or elevation coordinates. 204 205 Parameters: 206 ----------- 207 layer_ds : xr.Dataset 208 Dataset containing layer TWTT values 209 surface_layer_ds : xr.Dataset 210 Dataset containing surface layer TWTT values (typically layer 1) 211 vertical_coordinate : str 212 'range' for distance from aircraft or 'elevation'/'wgs84' for WGS84 elevation 213 subsurface_dielectric_permittivity : float 214 Dielectric permittivity for subsurface propagation (default 3.15 for ice) 215 216 Returns: 217 -------- 218 xr.Dataset 219 Copy of layer_ds with added 'range' or 'wgs84' field containing layer positions 220 """ 221 # Create a copy of the layer dataset 222 result_ds = layer_ds.copy() 223 224 # Calculate speed of light in the subsurface medium 225 speed_in_medium = scipy.constants.c / np.sqrt(subsurface_dielectric_permittivity) 226 227 # Get TWTT values 228 layer_twtt = layer_ds['twtt'] 229 surface_twtt = surface_layer_ds['twtt'] 230 231 # Calculate surface range (distance from aircraft to surface) 232 surface_range = surface_twtt * (scipy.constants.c / 2) 233 234 # Calculate TWTT difference from surface to layer 235 twtt_from_surface = layer_twtt - surface_twtt 236 237 # Calculate range from aircraft to layer 238 layer_range = surface_range + (twtt_from_surface * (speed_in_medium / 2)) 239 240 if vertical_coordinate == 'range': 241 result_ds['range'] = layer_range 242 result_ds['range'].attrs['units'] = 'meters' 243 result_ds['range'].attrs['description'] = 'Range from aircraft to layer' 244 elif vertical_coordinate == 'elevation' or vertical_coordinate == 'wgs84': 245 # Calculate WGS84 elevation 246 # Surface elevation = aircraft elevation - surface range 247 if 'elev' in surface_layer_ds: 248 surface_elev = surface_layer_ds['elev'] 249 else: 250 raise ValueError("Surface elevation data ('elev') required for elevation coordinate conversion") 251 252 surface_wgs84 = surface_elev - surface_range 253 254 # Layer elevation = surface elevation - distance from surface to layer 255 layer_wgs84 = surface_wgs84 - (twtt_from_surface * (speed_in_medium / 2)) 256 257 result_ds['wgs84'] = layer_wgs84 258 result_ds['wgs84'].attrs['units'] = 'meters' 259 result_ds['wgs84'].attrs['description'] = 'WGS84 elevation of layer' 260 else: 261 raise ValueError(f"Unknown vertical coordinate: {vertical_coordinate}. Use 'range', 'elevation', or 'wgs84'.") 262 263 return result_ds
Convert layer two-way travel time (TWTT) to range or elevation coordinates.
Parameters:
layer_ds : xr.Dataset Dataset containing layer TWTT values surface_layer_ds : xr.Dataset Dataset containing surface layer TWTT values (typically layer 1) vertical_coordinate : str 'range' for distance from aircraft or 'elevation'/'wgs84' for WGS84 elevation subsurface_dielectric_permittivity : float Dielectric permittivity for subsurface propagation (default 3.15 for ice)
Returns:
xr.Dataset Copy of layer_ds with added 'range' or 'wgs84' field containing layer positions