# Copyright (c) 2025, TU Wien
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of TU Wien, Department of Geodesy and Geoinformation
# nor the names of its contributors may be used to endorse or promote
# products derived from this software without specific prior written
# permission.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL TU WIEN DEPARTMENT OF GEODESY AND
# GEOINFORMATION BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import sys
import argparse
from pathlib import Path
import numpy as np
import xarray as xr
from pyresample import kd_tree, SwathDefinition
from ascat.grids.grid_registry import GridRegistry
from ascat.product_info import get_swath_product_id
from ascat.product_info import swath_io_catalog
from ascat.regrid.regrid import retrieve_or_store_grid_lut
[docs]
def parse_args_swath_resample(args):
"""
Parse command line arguments for resampling an ASCAT swath file
to a regular grid.
Parameters
----------
args : list
Command line arguments.
Returns
-------
parser : ArgumentParser
Argument Parser object.
"""
parser = argparse.ArgumentParser(
description="Resample an ASCAT swath file to a regular grid")
parser.add_argument(
"filepath", metavar="FILEPATH", help="Path to file or folder")
parser.add_argument(
"outpath", metavar="OUTPATH", help="Path to the output data")
parser.add_argument(
"resample_deg",
metavar="RESAMPLE_DEG",
type=float,
help="Target grid spacing in degrees")
parser.add_argument(
"--product_id",
metavar="PRODUCT_ID",
help="Product identifier (e.g. H129, H125, H121, etc.). If not provided, an attempt is made to determine it from the file name.")
parser.add_argument(
"--grid_store",
metavar="GRID_STORE",
help="Path for storing/loading lookup tables")
parser.add_argument(
"--suffix",
metavar="SUFFIX",
help="File suffix (default: _RESAMPLE_DEGdeg)")
parser.add_argument(
"--neighbours",
metavar="N",
type=int,
default=6,
help="Number of neighbours to consider for each grid point (default: 6)"
)
parser.add_argument(
"--radius",
metavar="RADIUS",
type=float,
default=10000,
help="Cut off distance in meters (default: 10000)")
return parser.parse_args(args)
[docs]
def swath_resample_main(cli_args):
"""
Read command line arguments and perform an inverse distance resampling
procedure on a single ASCAT swath file or directory of swath files.
Parameters
----------
cli_args : list
Command line arguments.
"""
args = parse_args_swath_resample(cli_args)
product_id = args.product_id
filepath = Path(args.filepath)
trg_grid_size = args.resample_deg
k = args.neighbours
radius = args.radius
outpath = Path(args.outpath)
outpath.parent.mkdir(parents=True, exist_ok=True)
if args.suffix:
suffix = args.suffix
else:
suffix = f"_{args.resample_deg}deg"
if args.grid_store:
grid_store = Path(args.grid_store)
grid_store.parent.mkdir(parents=True, exist_ok=True)
else:
grid_store = None
inverse_distance_resampling(filepath, outpath, trg_grid_size, suffix, k,
radius, grid_store, product_id=product_id)
[docs]
def inverse_distance_resampling(filepath,
outpath,
sampling,
suffix,
k=6,
radius=10000.,
grid_store=None,
product_id=None):
"""
Inverse distance resampling of ASCAT swath data.
Parameters
----------
filepath : pathlib.Path
Path to file or folder.
outpath : pathlib.Path
Path to the output data.
sampling : float
Target grid spacing in degrees.
suffix : str
Filename suffix.
k : int, optional
Number of neighbours to consider for each grid point (default: 6)
radius : float, optional
Cut off distance in meters (default: 10000.)
grid_store : pathlib.Path, optional
Path for storing/loading lookup tables (default: None).
product_id : str, optional
Product identifier (e.g. H129, H125, H121, etc.). If not provided,
an attempt is made to determine it from the file name.
"""
if filepath.is_dir():
files = list(filepath.glob("**/*.nc"))
elif filepath.is_file() and filepath.suffix == ".nc":
files = [filepath]
else:
raise RuntimeError("No files found at the provided filepath")
first_file = files[0]
product_id = product_id or get_swath_product_id(str(first_file.name))
registry = GridRegistry()
product = swath_io_catalog[product_id]
src_grid = registry.get(product.grid_name)
src_grid_size = src_grid.res
src_grid_id = f"fib_grid_{src_grid_size}km"
if (radius < 1000) or (radius > 100000):
raise ValueError(f"Radius outside limits: 1000 < {radius} < 100000")
if (k < 1) or (k > 100):
raise ValueError(
f"Number of neighbours outside limits 1 < {k} < {100}")
trg_grid_id = f"reg_grid_{sampling}deg"
trg_grid, grid_lut = retrieve_or_store_grid_lut(src_grid, src_grid_id,
trg_grid_id, sampling,
grid_store)
var_list = [
("time", "nn"),
("as_des_pass", "nn"),
("swath_indicator", "nn"),
("surface_flag", "nn"),
("surface_flag_source", "nn"),
("surface_soil_moisture", "idw"),
("surface_soil_moisture_noise", "idw"),
("surface_soil_moisture_sensitivity", "idw"),
("backscatter40", "idw"),
("slope40", "idw"),
("curvature40", "idw"),
("snow_cover_probability", "idw"),
("frozen_soil_probability", "idw"),
("topographic_complexity", "idw"),
("wetland_fraction", "idw"),
("subsurface_scattering_prob", "idw"),
# ("processing_flag", "major"),
("correction_flag", "bitwise_or"),
("backscatter40_flag", "bitwise_or"),
]
target_def = SwathDefinition(lons=trg_grid.arrlon, lats=trg_grid.arrlat)
output_shape = trg_grid.shape
dim = ("latitude", "longitude")
for f in files:
outfile = outpath / Path(f.stem + suffix + f.suffix)
with xr.open_dataset(f, decode_cf=False, mask_and_scale=False) as ds:
lons = ds["longitude"] * ds["longitude"].scale_factor
lats = ds["latitude"] * ds["latitude"].scale_factor
coords = {
"latitude": np.int32(trg_grid.lat2d[:, 0] / 1e-6),
"longitude": np.int32(trg_grid.lon2d[0] / 1e-6)
}
resampled_ds = xr.Dataset(coords=coords)
resampled_ds.attrs = ds.attrs
resampled_ds["latitude"].attrs = ds["latitude"].attrs
resampled_ds["longitude"].attrs = ds["longitude"].attrs
swath_def = SwathDefinition(lons=lons, lats=lats)
valid_input_index, valid_output_index, index_array, distance_array = \
kd_tree.get_neighbour_info(swath_def, target_def,
radius, neighbours=k)
invalid_pos = index_array == lons.size
index_array = index_array.astype(np.int32)
index_array[invalid_pos] = -1
for var, method in var_list:
if var not in ds or len(ds[var].dims) == 0:
continue
data = ds[var].data[valid_input_index][index_array]
data[invalid_pos] = ds[var]._FillValue
resam_data = np.zeros(
data.shape[0], dtype=ds[var].dtype) + ds[var]._FillValue
if method == "idw":
p = 2
weights = 1 / (distance_array**p)
invalid = invalid_pos | (data == ds[var]._FillValue)
weights[invalid] = 0
total_weights = weights.sum(axis=1)
idx = total_weights != 0
resam_data[idx] = np.sum(
weights * data, axis=1)[idx] / total_weights[idx]
elif method == "nn":
valid = index_array != -1
first_idx = np.where(
valid.any(axis=1), valid.argmax(axis=1), -1)
valid_mask = first_idx != -1
valid_rows = np.arange(index_array.shape[0])[valid_mask]
resam_data[valid_rows] = data[valid_rows,
first_idx[valid_rows]]
elif method == "bitwise_or":
mask = index_array == -1
data[mask] = 0
resam_data = np.bitwise_or.reduce(data, axis=1)
else:
raise ValueError("Resampling method unknown")
resampled_ds[var] = (dim, resam_data.reshape(output_shape))
resampled_ds[var].attrs = ds[var].attrs
resampled_ds[var].encoding = {"zlib": True, "complevel": 4}
resampled_ds.to_netcdf(outfile)
[docs]
def run_swath_resample():
"""Run command line interface for reample ASCAT swath data."""
swath_resample_main(sys.argv[1:])