# Copyright (c) 2025, TU Wien, Department of Geodesy and Geoinformation
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of TU Wien, Department of Geodesy and Geoinformation
# nor the names of its contributors may be used to endorse or promote
# products derived from this software without specific prior written
# permission.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL TU WIEN DEPARTMENT OF GEODESY AND
# GEOINFORMATION BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Readers for ASCAT Level 1b and Level 2 data in NetCDF format.
"""
import os
from datetime import datetime
from datetime import timedelta
import netCDF4
import numpy as np
import xarray as xr
from ascat.utils import tmp_unzip
from ascat.utils import daterange
from ascat.utils import mask_dtype_nans
from ascat.utils import uint8_nan
from ascat.utils import float32_nan
from ascat.file_handling import ChronFiles
from ascat.read_native import AscatFile
[docs]
def read_nc(filename, generic, to_xarray, skip_fields, gen_fields_lut):
"""
Read NetCDF file.
Parameters
----------
filename : str
Filename.
generic : bool
'True' reading and converting into generic format or
'False' reading original field names.
to_xarray : bool
'True' return data as xarray.Dataset
'False' return data as numpy.ndarray.
skip_fields : list
Variables to skip.
gen_fields_lut : dict
Conversion look-up table for generic names.
Returns
-------
data : xarray.Dataset or numpy.ndarray
ASCAT data.
metadata : dict
Metadata.
"""
data = {}
metadata = {}
with netCDF4.Dataset(filename) as fid:
if hasattr(fid, 'platform'):
metadata['platform_id'] = fid.platform[2:]
elif hasattr(fid, 'platform_long_name'):
metadata['platform_id'] = fid.platform_long_name[2:]
metadata['orbit_start'] = fid.start_orbit_number
metadata['processor_major_version'] = fid.processor_major_version
metadata['product_minor_version'] = fid.product_minor_version
metadata['format_major_version'] = fid.format_major_version
metadata['format_minor_version'] = fid.format_minor_version
metadata['filename'] = os.path.basename(filename)
num_rows = fid.dimensions['numRows'].size
num_cells = fid.dimensions['numCells'].size
dtype = []
for var_name in fid.variables.keys():
if var_name in ['sigma0']:
continue
if generic and var_name in skip_fields:
continue
if generic and var_name in gen_fields_lut:
new_var_name = gen_fields_lut[var_name][0]
fill_value = gen_fields_lut[var_name][2]
else:
new_var_name = var_name
fill_value = None
var_data = fid.variables[var_name][:].filled(fill_value)
if var_name == 'azi_angle_trip':
var_data[(var_data < 0) & (var_data != fill_value)] += 360
if len(fid.variables[var_name].shape) == 1:
var_data = var_data.repeat(num_cells)
elif len(fid.variables[var_name].shape) == 2:
var_data = var_data.flatten()
elif len(fid.variables[var_name].shape) == 3:
var_data = var_data.reshape(-1, 3)
else:
raise RuntimeError('Unknown dimension')
if var_name == 'utc_line_nodes':
var_data = var_data.astype('timedelta64[s]') + np.datetime64(
'2000-01-01')
data[new_var_name] = var_data
if len(var_data.shape) == 1:
dtype.append((new_var_name, var_data.dtype.str))
elif len(var_data.shape) > 1:
dtype.append(
(new_var_name, var_data.dtype.str, var_data.shape[1:]))
num_records = num_rows * num_cells
coords_fields = ['lon', 'lat', 'time']
if generic:
sat_id = np.array([0, 4, 3, 5], dtype=np.uint8)
data['sat_id'] = np.zeros(num_records, dtype=np.uint8) + sat_id[int(
metadata['platform_id'])]
dtype.append(('sat_id', np.uint8))
n_records = data['lat'].shape[0]
n_lines = n_records // num_cells
data['node_num'] = np.tile((np.arange(num_cells) + 1), n_lines)
dtype.append(('node_num', np.uint8))
data['line_num'] = np.arange(n_lines).repeat(num_cells)
dtype.append(('line_num', np.int32))
if to_xarray:
for k in data.keys():
if len(data[k].shape) == 1:
dim = ['obs']
elif len(data[k].shape) == 2:
dim = ['obs', 'beam']
data[k] = (dim, data[k])
coords = {}
for cf in coords_fields:
coords[cf] = data.pop(cf)
data = xr.Dataset(data, coords=coords, attrs=metadata)
if generic:
data = mask_dtype_nans(data)
else:
ds = np.empty(num_records, dtype=np.dtype(dtype))
for k, v in data.items():
ds[k] = v
data = ds
return data, metadata
[docs]
class AscatL1bNcFile(AscatFile):
"""
Read ASCAT Level 1b file in NetCDF format.
"""
def __init__(self, filename, **kwargs):
"""
Initialize AscatL1bNcFile.
Parameters
----------
filename : str
Filename.
"""
super().__init__(filename, **kwargs)
for i, fname in enumerate(self.filenames):
if os.path.splitext(fname)[1] == '.gz':
self.filenames[i] = tmp_unzip(fname)
def _read(self, filename, generic=False, to_xarray=False):
"""
Read one ASCAT Level 1b NetCDF4 file.
Parameters
----------
generic : bool, optional
'True' reading and converting into generic format or
'False' reading original field names (default: False).
to_xarray : bool, optional
'True' return data as xarray.Dataset
'False' return data as numpy.ndarray (default: False).
Returns
-------
data : xarray.Dataset or numpy.ndarray
ASCAT data.
metadata : dict
Metadata.
"""
gen_fields_lut = {
'longitude': ('lon', np.float32, None),
'latitude': ('lat', np.float32, None),
'utc_line_nodes': ('time', np.float32, None),
'inc_angle_trip': ('inc', np.float32, float32_nan),
'azi_angle_trip': ('azi', np.float32, float32_nan),
'sigma0_trip': ('sig', np.float32, float32_nan),
'kp': ('kp', np.float32, float32_nan),
'f_kp': ('kp_quality', np.float32, uint8_nan),
'num_val_trip': ('num_val', np.float32, None)
}
skip_fields = [
'f_f', 'f_v', 'f_oa', 'f_sa', 'f_tel', 'f_ref', 'abs_line_number'
]
data, metadata = read_nc(filename, generic, to_xarray,
skip_fields, gen_fields_lut)
return data, metadata
def _merge(self, data):
"""
Merge data.
Parameters
----------
data : list
List of array.
Returns
-------
data : numpy.ndarray
Data.
"""
if isinstance(data[0], tuple):
data, metadata = zip(*data)
if isinstance(data[0], xr.Dataset):
data = xr.concat(data,
dim="obs",
combine_attrs="drop_conflicts")
else:
data = np.hstack(data)
data = (data, metadata)
else:
data = np.hstack(data)
return data
[docs]
class AscatL1bNcFileGeneric(AscatL1bNcFile):
"""
The same as AscatL1bNcFile but with generic=True by default.
"""
def _read(self, filename, generic=True, to_xarray=False, **kwargs):
return super()._read(filename, generic=generic, to_xarray=to_xarray, **kwargs)
[docs]
class AscatL2NcFile(AscatFile):
"""
Read ASCAT Level 2 file in NetCDF format.
"""
def __init__(self, filename, **kwargs):
"""
Initialize AscatL2NcFile.
Parameters
----------
filename : str
Filename.
"""
super().__init__(filename, **kwargs)
for i, fname in enumerate(self.filenames):
if os.path.splitext(fname)[1] == '.gz':
self.filenames[i] = tmp_unzip(fname)
def _read(self, filename, generic=False, to_xarray=False):
"""
Read one ASCAT Level 2 NetCDF4 file.
Parameters
----------
generic : bool, optional
'True' reading and converting into generic format or
'False' reading original field names (default: False).
to_xarray : bool, optional
'True' return data as xarray.Dataset
'False' return data as numpy.ndarray (default: False).
Returns
-------
ds : dict, xarray.Dataset
ASCAT Level 2 data.
"""
gen_fields_lut = {
'longitude': ('lon', np.float32, None),
'latitude': ('lat', np.float32, None),
'utc_line_nodes': ('time', np.float32, None),
'inc_angle_trip': ('inc', np.float32, float32_nan),
'azi_angle_trip': ('azi', np.float32, float32_nan),
'sigma0_trip': ('sig', np.float32, float32_nan),
'kp': ('kp', np.float32, float32_nan),
'soil_moisture': ('sm', np.float32, float32_nan),
'soil_moisture_error': ('sm_noise', np.float32, float32_nan),
'sigma40': ('sig40', np.float32, float32_nan),
'sigma40_error': ('sig40_noise', np.float32, float32_nan),
'slope40': ('slope40', np.float32, float32_nan),
'slope40_error': ('slope40_noise', np.float32, float32_nan),
'soil_moisture_sensitivity': ('sm_sens', np.float32, float32_nan),
'dry_backscatter': ('dry_sig40', np.float32, float32_nan),
'wet_backscatter': ('wet_sig40', np.float32, float32_nan),
'mean_soil_moisture': ('sm_mean', np.float32, float32_nan),
'proc_flag1': ('corr_flag', np.uint8, None),
'proc_flag2': ('proc_flag', np.uint8, None),
'aggregated_quality_flag': ('agg_flag', np.uint8, None),
'snow_cover_probability': ('snow_prob', np.uint8, None),
'frozen_soil_probability': ('frozen_prob', np.uint8, None),
'wetland_flag': ('wetland', np.uint8, None),
'topography_flag': ('topo', np.uint8, None)
}
skip_fields = ['abs_line_number']
data, metadata = read_nc(filename, generic, to_xarray,
skip_fields, gen_fields_lut)
return data, metadata
def _merge(self, data):
"""
Merge data.
Parameters
----------
data : list
List of array.
Returns
-------
data : numpy.ndarray
Data.
"""
if isinstance(data[0], tuple):
data, metadata = zip(*data)
if isinstance(data[0], xr.Dataset):
data = xr.concat(data,
dim="obs",
combine_attrs="drop_conflicts")
else:
data = np.hstack(data)
data = (data, metadata)
else:
data = np.hstack(data)
return data
[docs]
class AscatL2NcFileGeneric(AscatL2NcFile):
"""
The same as AscatL1bNcFile but with generic=True by default.
"""
def _read(self, filename, generic=True, to_xarray=False, **kwargs):
return super()._read(filename, generic=generic, to_xarray=to_xarray, **kwargs)
[docs]
class AscatSsmNcSwathFile(AscatFile):
"""
Class reading ASCAT Surface Soil Moisture Netcdf swath file.
"""
def _read(self, filename, mask_and_scale=None, sel_dt=None):
"""
Read/load data from NetCDF file.
Parameters
----------
mask_and_scale : boolean, optional
Mask and scale data using _FillValue.
Returns
-------
data : xarray.Dataset
Data.
"""
with xr.open_dataset(self.filename,
mask_and_scale=mask_and_scale) as ds:
data = ds.load()
if sel_dt is not None:
data = data.where((data.time >= sel_dt[0])
& (data.time <= sel_dt[1]),
drop=True)
return data
[docs]
class AscatSsmNcSwathFileList(ChronFiles):
"""
Class reading ASCAT Surface Soil Moisture Netcdf swath file list.
"""
def __init__(self,
path,
filename_template=None,
subfolder_template=None,
sat="?",
cls_kwargs=None):
"""
Initialize object.
Parameters
----------
path : str
Root path to data.
filename_template : str, optional
Filename template (default: "W_IT-HSAF-ROME,SAT,SSM-ASCAT-METOP{sat}-6.25-H???_C_LIIB_{date}_*_*____.nc")
subfolder_template : str, optional
Subfolder template (default: /path/metop_{sat}/{year}/)
sat : str, optional
Satellite Metop-A: "A", Metop-B: "B", Metop-C: "C" or all: "?"
(default: "?")
cls_kwargs : dict, optional
Keyword arguments passed to file IO class (default: None).
"""
if filename_template is None:
filename_template = "W_IT-HSAF-ROME,SAT,SSM-ASCAT-METOP{sat}-6.25-H???_C_LIIB_{date}_*_*____.nc"
if subfolder_template is None:
subfolder_template = {"satellite": "metop_{sat}", "date": "{year}"}
if cls_kwargs is None:
cls_kwargs = {}
self.sat = sat
super().__init__(path,
AscatSsmNcSwathFile,
filename_template,
sf_templ=subfolder_template,
cls_kwargs=cls_kwargs)
def _fmt(self, timestamp):
"""
Definition of filename and subfolder format.
Parameters
----------
timestamp : datetime
Time stamp.
Returns
-------
fn_fmt : dict
Filename format.
sf_fmt : dict
Subfolder format.
"""
fn_read_fmt = {
"date": timestamp.strftime("%Y%m%d%H%M%S"),
"sat": self.sat.upper()
}
fn_write_fmt = None
sf_read_fmt = {
"satellite": {
"sat": self.sat
},
"date": {
"year": timestamp.strftime("%Y"),
"month": timestamp.strftime("%m"),
"day": timestamp.strftime("%d")
}
}
sf_write_fmt = None
return fn_read_fmt, sf_read_fmt, fn_write_fmt, sf_write_fmt
def _parse_date(self, filename, start=53, end=64):
"""
Parse date from filename.
Parameters
----------
filename : str
Filename.
start : int, optional
Start position of date field (default: 58).
start : int, optional
End position of date field (default: 72).
Returns
-------
date : datetime
Parsed date.
"""
return datetime.strptime(
os.path.basename(filename)[start:end], "%Y%m%d%H%M%S")
[docs]
def search_date(self, timestamp, **kwargs):
"""
Search date.
Parameters
----------
timestamp : datetime
Date.
Returns
-------
filenames : list
Filenames.
"""
return super().search_date(timestamp, date_str="%Y%m%d%H*", **kwargs)
[docs]
def iter_daterange(self, start_date, end_date):
"""
Generator returning filenames between start and end date.
Parameters
----------
start_date : datetime
Start date.
end_date : datetime
End date.
Yields
------
filename : str
Filename.
"""
for single_date in daterange(start_date, end_date):
filenames = self.search_date(single_date)
for filename in filenames:
yield filename
[docs]
def read_date(self, timestamp):
"""
Read data for given timestamp.
Parameters
----------
timestamp : datetime
Date.
Returns
-------
data : xarray.Dataset
Data.
"""
filenames = self.search_date(timestamp)
if len(filenames) > 1:
raise RuntimeError(
f"Multiple files found for timestamp {timestamp}")
elif len(filenames) == 0:
print(f"No file found for timestamp {timestamp}")
data = None
else:
self._open(filenames[0])
data = self.fid.read()
return data
[docs]
def read_period(self,
start_dt,
end_dt,
delta_dt=timedelta(hours=1),
buffer_dt=timedelta(hours=1),
**kwargs):
"""
Read data for given interval.
Parameters
----------
start_dt : datetime
Start datetime.
end_dt : datetime
End datetime.
delta_dt : timedelta, optional
Time delta used to jump through search date.
buffer_dt : timedelta, optional
Search buffer used to find files which could possibly contain
data but would be left out because of dt_start.
Returns
-------
data : dict, numpy.ndarray
Data stored in file.
"""
filenames = self.search_period(start_dt - buffer_dt,
end_dt + buffer_dt, delta_dt)
merged_data = []
sel_dt = (np.datetime64(start_dt - timedelta(minutes=15)),
np.datetime64(end_dt + timedelta(minutes=15)))
for filename in filenames:
self._open(filename)
try:
data = self.fid.read(sel_dt=sel_dt)
if data is not None:
merged_data.append(data)
except:
print(f"Error reading: {self.fid.filename}")
if merged_data:
merged_data = xr.concat(merged_data,
dim="obs",
combine_attrs="drop_conflicts")
else:
merged_data = None
return merged_data