# Copyright (c) 2025, TU Wien
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of TU Wien, Department of Geodesy and Geoinformation
# nor the names of its contributors may be used to endorse or promote
# products derived from this software without specific prior written
# permission.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL TU WIEN DEPARTMENT OF GEODESY AND
# GEOINFORMATION BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Readers for ASCAT Level 1b and Level 2 data in BUFR format.
"""
import os
import numpy as np
import xarray as xr
from cadati.cal_date import cal2dt
try:
import pdbufr
except ImportError:
pass
from ascat.utils import tmp_unzip
from ascat.utils import mask_dtype_nans
from ascat.utils import uint8_nan
from ascat.utils import uint16_nan
from ascat.utils import int32_nan
from ascat.utils import float32_nan
from ascat.read_native import AscatFile
bufr_nan = 1.7e+38
nan_val_dict = {
np.float32: float32_nan,
np.uint8: uint8_nan,
np.uint16: uint16_nan,
np.int32: int32_nan
}
[docs]
class AscatL1bBufrFile(AscatFile):
"""
Read ASCAT Level 1b file in BUFR format.
"""
def __init__(self, filename, **kwargs):
"""
Initialize AscatL1bBufrFile.
Parameters
----------
filename : str
Filename.
"""
super().__init__(filename, **kwargs)
for i, fname in enumerate(self.filenames):
if os.path.splitext(fname)[1] == '.gz':
self.filenames[i] = tmp_unzip(fname)
self.msg_name_lookup = {
4: "Satellite Identifier",
6: "Direction Of Motion Of Moving Observing Platform",
16: "Orbit Number",
17: "Cross-Track Cell Number",
21: "f_Beam Identifier",
22: "f_Radar Incidence Angle",
23: "f_Antenna Beam Azimuth",
24: "f_Backscatter",
25: "f_Radiometric Resolution (Noise Value)",
26: "f_ASCAT KP Estimate Quality",
27: "f_ASCAT Sigma-0 Usability",
34: "f_ASCAT Land Fraction",
35: "m_Beam Identifier",
36: "m_Radar Incidence Angle",
37: "m_Antenna Beam Azimuth",
38: "m_Backscatter",
39: "m_Radiometric Resolution (Noise Value)",
40: "m_ASCAT KP Estimate Quality",
41: "m_ASCAT Sigma-0 Usability",
48: "m_ASCAT Land Fraction",
49: "a_Beam Identifier",
50: "a_Radar Incidence Angle",
51: "a_Antenna Beam Azimuth",
52: "a_Backscatter",
53: "a_Radiometric Resolution (Noise Value)",
54: "a_ASCAT KP Estimate Quality",
55: "a_ASCAT Sigma-0 Usability",
62: "a_ASCAT Land Fraction"
}
def _read(self, filename, generic=False, to_xarray=False):
"""
Read one ASCAT Level 1b BUFR file.
Parameters
----------
generic : bool, optional
'True' reading and converting into generic format or
'False' reading original field names (default: False).
to_xarray : bool, optional
'True' return data as xarray.Dataset
'False' return data as numpy.ndarray (default: False).
Returns
-------
ds : xarray.Dataset, numpy.ndarray
ASCAT Level 1b data.
"""
df = pdbufr.read_bufr(filename, columns="data", flat=True)
col_rename = {}
for i, col in enumerate(df.columns.to_list()):
name = self.msg_name_lookup.get(i + 1, None)
if name is not None:
col_rename[col] = name
data = df.rename(columns=col_rename)[col_rename.values()]
data["lat"] = df["#1#latitude"].values.astype(np.float32)
data["lon"] = df["#1#longitude"].values.astype(np.float32)
year = df["#1#year"].values.astype(int)
month = df["#1#month"].values.astype(int)
day = df["#1#day"].values.astype(int)
hour = df["#1#hour"].values.astype(int)
minute = df["#1#minute"].values.astype(int)
seconds = df["#1#second"].values.astype(int)
milliseconds = np.zeros(seconds.size)
cal_dates = np.vstack(
(year, month, day, hour, minute, seconds, milliseconds)).T
data['time'] = cal2dt(cal_dates)
data = data.to_records(index=False)
data = {name:data[name] for name in data.dtype.names}
metadata = {}
metadata['platform_id'] = data['Satellite Identifier'][0].astype(int)
metadata['orbit_start'] = np.uint32(data['Orbit Number'][0])
metadata['filename'] = os.path.basename(filename)
# add/rename/remove fields according to generic format
if generic:
data = conv_bufrl1b_generic(data, metadata)
# convert dict to xarray.Dataset or numpy.ndarray
if to_xarray:
for k in data.keys():
if len(data[k].shape) == 1:
dim = ['obs']
elif len(data[k].shape) == 2:
dim = ['obs', 'beam']
data[k] = (dim, data[k])
coords = {}
coords_fields = ['lon', 'lat', 'time']
for cf in coords_fields:
coords[cf] = data.pop(cf)
data = xr.Dataset(data, coords=coords, attrs=metadata)
if generic:
data = mask_dtype_nans(data)
else:
# collect dtype info
dtype = []
for var_name in data.keys():
if len(data[var_name].shape) == 1:
dtype.append((var_name, data[var_name].dtype.str))
elif len(data[var_name].shape) > 1:
dtype.append((var_name, data[var_name].dtype.str,
data[var_name].shape[1:]))
ds = np.empty(data['time'].size, dtype=np.dtype(dtype))
for k, v in data.items():
ds[k] = v
data = ds
return data, metadata
def _merge(self, data):
"""
Merge data.
Parameters
----------
data : list
List of array.
Returns
-------
data : numpy.ndarray or xarray.Dataset
Data.
"""
if isinstance(data[0], tuple):
data, metadata = zip(*data)
if isinstance(data[0], xr.Dataset):
data = xr.concat(data,
dim="obs",
combine_attrs="drop_conflicts")
else:
data = np.hstack(data)
data = (data, metadata)
else:
data = np.hstack(data)
return data
[docs]
class AscatL1bBufrFileGeneric(AscatL1bBufrFile):
"""
The same as AscatL1bBufrFile but with generic=True by default.
"""
def _read(self, filename, generic=True, to_xarray=False, **kwargs):
return super()._read(filename, generic=generic, to_xarray=to_xarray, **kwargs)
[docs]
def conv_bufrl1b_generic(data, metadata):
"""
Rename and convert data types of dataset.
Spacecraft_id vs sat_id encoding
BUFR encoding - Spacecraft_id
- 1 ERS 1
- 2 ERS 2
- 3 Metop-1 (Metop-B)
- 4 Metop-2 (Metop-A)
- 5 Metop-3 (Metop-C)
Internal encoding - sat_id
- 1 ERS 1
- 2 ERS 2
- 3 Metop-2 (Metop-A)
- 4 Metop-1 (Metop-B)
- 5 Metop-3 (Metop-C)
Parameters
----------
data: dict of numpy.ndarray
Original dataset.
metadata: dict
Metadata.
Returns
-------
data: dict of numpy.ndarray
Converted dataset.
"""
skip_fields = ['Satellite Identifier']
gen_fields_beam = {
'Radar Incidence Angle': ('inc', np.float32, bufr_nan, 1),
'Backscatter': ('sig', np.float32, bufr_nan, 1),
'Antenna Beam Azimuth': ('azi', np.float32, bufr_nan, 1),
'ASCAT Sigma-0 Usability': ('f_usable', np.uint8, None, 1),
'Beam Identifier': ('beam_num', np.uint8, None, 1),
'Radiometric Resolution (Noise Value)':
('kp', np.float32, bufr_nan, 0.01),
'ASCAT KP Estimate Quality': ('kp_quality', np.uint8, bufr_nan, 1),
'ASCAT Land Fraction': ('f_land', np.float32, None, 1)
}
gen_fields_lut = {
'Orbit Number': ('abs_orbit_nr', np.int32),
'Cross-Track Cell Number': ('node_num', np.uint8),
'Direction Of Motion Of Moving Observing Platform':
('sat_track_azi', np.float32)
}
for var_name in skip_fields:
if var_name in data:
data.pop(var_name)
for var_name, (new_name, new_dtype) in gen_fields_lut.items():
data[new_name] = data.pop(var_name).astype(new_dtype)
for var_name, (new_name, new_dtype, nan_val, s) in gen_fields_beam.items():
f = ['{}_{}'.format(b, var_name) for b in ['f', 'm', 'a']]
data[new_name] = np.vstack((data.pop(f[0]), data.pop(f[1]),
data.pop(f[2]))).T.astype(new_dtype)
if nan_val is not None:
valid = data[new_name] != nan_val
data[new_name][~valid] = nan_val_dict[new_dtype]
data[new_name][valid] *= s
if data['node_num'].max() == 82:
data['swath_indicator'] = 1 * (data['node_num'] > 41)
elif data['node_num'].max() == 42:
data['swath_indicator'] = 1 * (data['node_num'] > 21)
else:
raise ValueError('Cross-track cell number size unknown')
n_lines = data['lat'].shape[0] / data['node_num'].max()
data['line_num'] = np.arange(n_lines).repeat(data['node_num'].max())
sat_id = np.array([0, 0, 0, 4, 3, 5], dtype=np.uint8)
data['sat_id'] = np.zeros(data['time'].size, dtype=np.uint8) + sat_id[int(
metadata['platform_id'])]
# compute ascending/descending direction
data['as_des_pass'] = (data['sat_track_azi'] < 270).astype(np.uint8)
return data
[docs]
class AscatL2BufrFile(AscatFile):
"""
Read ASCAT Level 2 file in BUFR format.
"""
def __init__(self, filename, **kwargs):
"""
Initialize AscatL2BufrFile.
Parameters
----------
filename: str
Filename.
"""
super().__init__(filename, **kwargs)
for i, fname in enumerate(self.filenames):
if os.path.splitext(fname)[1] == '.gz':
self.filenames[i] = tmp_unzip(fname)
self.msg_name_lookup = {
4: "Satellite Identifier",
6: "Direction Of Motion Of Moving Observing Platform",
16: "Orbit Number",
17: "Cross-Track Cell Number",
21: "f_Beam Identifier",
22: "f_Radar Incidence Angle",
23: "f_Antenna Beam Azimuth",
24: "f_Backscatter",
25: "f_Radiometric Resolution (Noise Value)",
26: "f_ASCAT KP Estimate Quality",
27: "f_ASCAT Sigma-0 Usability",
34: "f_ASCAT Land Fraction",
35: "m_Beam Identifier",
36: "m_Radar Incidence Angle",
37: "m_Antenna Beam Azimuth",
38: "m_Backscatter",
39: "m_Radiometric Resolution (Noise Value)",
40: "m_ASCAT KP Estimate Quality",
41: "m_ASCAT Sigma-0 Usability",
48: "m_ASCAT Land Fraction",
49: "a_Beam Identifier",
50: "a_Radar Incidence Angle",
51: "a_Antenna Beam Azimuth",
52: "a_Backscatter",
53: "a_Radiometric Resolution (Noise Value)",
54: "a_ASCAT KP Estimate Quality",
55: "a_ASCAT Sigma-0 Usability",
62: "a_ASCAT Land Fraction",
65: "Surface Soil Moisture (Ms)",
66: "Estimated Error In Surface Soil Moisture",
67: "Backscatter",
68: "Estimated Error In Sigma0 At 40 Deg Incidence Angle",
69: "Slope At 40 Deg Incidence Angle",
70: "Estimated Error In Slope At 40 Deg Incidence Angle",
71: "Soil Moisture Sensitivity",
72: "Dry Backscatter",
73: "Wet Backscatter",
74: "Mean Surface Soil Moisture",
# 75: "Rain Fall Detection",
76: "Soil Moisture Correction Flag",
77: "Soil Moisture Processing Flag",
78: "Soil Moisture Quality",
79: "Snow Cover",
80: "Frozen Land Surface Fraction",
81: "Inundation And Wetland Fraction",
82: "Topographic Complexity",
}
def _read(self, filename, generic=False, to_xarray=False):
"""
Read one ASCAT Level 2 BUFR file.
Parameters
----------
generic : bool, optional
'True' reading and converting into generic format or
'False' reading original field names(default: False).
to_xarray : bool, optional
'True' return data as xarray.Dataset
'False' return data as numpy.ndarray(default: False).
Returns
-------
data : xarray.Dataset or numpy.ndarray
ASCAT data.
metadata : dict
Metadata.
"""
df = pdbufr.read_bufr(filename, columns="data", flat=True)
col_rename = {}
for i, col in enumerate(df.columns.to_list()):
name = self.msg_name_lookup.get(i + 1, None)
if name is not None:
col_rename[col] = name
data = df.rename(columns=col_rename)[col_rename.values()]
data["lat"] = df["#1#latitude"].values.astype(np.float32)
data["lon"] = df["#1#longitude"].values.astype(np.float32)
year = df["#1#year"].values.astype(int)
month = df["#1#month"].values.astype(int)
day = df["#1#day"].values.astype(int)
hour = df["#1#hour"].values.astype(int)
minute = df["#1#minute"].values.astype(int)
seconds = df["#1#second"].values.astype(int)
milliseconds = np.zeros(seconds.size)
cal_dates = np.vstack(
(year, month, day, hour, minute, seconds, milliseconds)).T
data['time'] = cal2dt(cal_dates)
data = data.to_records(index=False)
data = {name:data[name] for name in data.dtype.names}
data["Mean Surface Soil Moisture"] *= 100.
metadata = {}
metadata['platform_id'] = data['Satellite Identifier'][0].astype(int)
metadata['orbit_start'] = np.uint32(data['Orbit Number'][0])
metadata['filename'] = os.path.basename(filename)
# add/rename/remove fields according to generic format
if generic:
data = conv_bufrl2_generic(data, metadata)
# convert dict to xarray.Dataset or numpy.ndarray
if to_xarray:
for k in data.keys():
if len(data[k].shape) == 1:
dim = ['obs']
elif len(data[k].shape) == 2:
dim = ['obs', 'beam']
data[k] = (dim, data[k])
coords = {}
coords_fields = ['lon', 'lat', 'time']
for cf in coords_fields:
coords[cf] = data.pop(cf)
data = xr.Dataset(data, coords=coords, attrs=metadata)
if generic:
data = mask_dtype_nans(data)
else:
# collect dtype info
dtype = []
# fill_value = []
for var_name in data.keys():
if len(data[var_name].shape) == 1:
dtype.append((var_name, data[var_name].dtype.str))
# fill_value.append(data[var_name].fill_value)
elif len(data[var_name].shape) > 1:
dtype.append((var_name, data[var_name].dtype.str,
data[var_name].shape[1:]))
# fill_value.append(data[var_name].shape[1] *
# [data[var_name].fill_value])
ds = np.ma.empty(data['time'].size, dtype=np.dtype(dtype))
# fill_value_arr = np.array((*fill_value, ), dtype=np.dtype(dtype))
for k, v in data.items():
ds[k] = v
# ds.fill_value = fill_value_arr
data = ds
return data, metadata
def _merge(self, data):
"""
Merge data.
Parameters
----------
data : list
List of array.
Returns
-------
data : numpy.ndarray or xarray.Dataset
Data.
"""
if isinstance(data[0], tuple):
data, metadata = zip(*data)
if isinstance(data[0], xr.Dataset):
data = xr.concat(data,
dim="obs",
combine_attrs="drop_conflicts")
else:
data = np.hstack(data)
data = (data, metadata)
else:
data = np.hstack(data)
return data
[docs]
class AscatL2BufrFileGeneric(AscatL2BufrFile):
"""
The same as AscatL1bBufrFile but with generic=True by default.
"""
def _read(self, filename, generic=True, to_xarray=False, **kwargs):
return super()._read(filename, generic=generic, to_xarray=to_xarray, **kwargs)
[docs]
def conv_bufrl2_generic(data, metadata):
"""
Rename and convert data types of dataset.
Spacecraft_id vs sat_id encoding
BUFR encoding - Spacecraft_id
- 1 ERS 1
- 2 ERS 2
- 3 Metop-1 (Metop-B)
- 4 Metop-2 (Metop-A)
- 5 Metop-3 (Metop-C)
Internal encoding - sat_id
- 1 ERS 1
- 2 ERS 2
- 3 Metop-2 (Metop-A)
- 4 Metop-1 (Metop-B)
- 5 Metop-3 (Metop-C)
Parameters
----------
data: dict of numpy.ndarray
Original dataset.
metadata: dict
Metadata.
Returns
-------
data: dict of numpy.ndarray
Converted dataset.
"""
skip_fields = ['Satellite Identifier']
gen_fields_beam = {
'Radar Incidence Angle': ('inc', np.float32),
'Backscatter': ('sig', np.float32),
'Antenna Beam Azimuth': ('azi', np.float32),
'ASCAT Sigma-0 Usability': ('f_usable', np.uint8),
'Beam Identifier': ('beam_num', np.uint8),
'Radiometric Resolution (Noise Value)': ('kp_noise', np.float32),
'ASCAT KP Estimate Quality': ('kp', np.float32),
'ASCAT Land Fraction': ('f_land', np.float32)
}
gen_fields_lut = {
'Orbit Number': ('abs_orbit_nr', np.int32),
'Cross-Track Cell Number': ('node_num', np.uint8),
'Direction Of Motion Of Moving Observing Platform':
('sat_track_azi', np.float32),
'Surface Soil Moisture (Ms)': ('sm', np.float32),
'Estimated Error In Surface Soil Moisture': ('sm_noise', np.float32),
'Backscatter': ('sig40', np.float32),
'Estimated Error In Sigma0 At 40 Deg Incidence Angle':
('sig40_noise', np.float32),
'Slope At 40 Deg Incidence Angle': ('slope40', np.float32),
'Estimated Error In Slope At 40 Deg Incidence Angle':
('slope40_noise', np.float32),
'Soil Moisture Sensitivity': ('sm_sens', np.float32),
'Dry Backscatter': ('dry_sig40', np.float32),
'Wet Backscatter': ('wet_sig40', np.float32),
'Mean Surface Soil Moisture': ('sm_mean', np.float32),
# 'Rain Fall Detection': ('rf', np.float32),
'Soil Moisture Correction Flag': ('corr_flag', np.uint8),
'Soil Moisture Processing Flag': ('proc_flag', np.uint8),
'Soil Moisture Quality': ('agg_flag', np.uint8),
'Snow Cover': ('snow_prob', np.uint8),
'Frozen Land Surface Fraction': ('frozen_prob', np.uint8),
'Inundation And Wetland Fraction': ('wetland', np.uint8),
'Topographic Complexity': ('topo', np.uint8)
}
for var_name in skip_fields:
if var_name in data:
data.pop(var_name)
for var_name, (new_name, new_dtype) in gen_fields_lut.items():
mask = (data[var_name] == bufr_nan) | (np.isnan(data[var_name]))
data[var_name][mask] = nan_val_dict[new_dtype]
data[new_name] = np.ma.array(data.pop(var_name).astype(new_dtype), mask=mask)
data[new_name].fill_value = nan_val_dict[new_dtype]
for var_name, (new_name, new_dtype) in gen_fields_beam.items():
f = ['{}_{}'.format(b, var_name) for b in ['f', 'm', 'a']]
mask = np.vstack((data[f[0]] == bufr_nan, data[f[1]] == bufr_nan,
data[f[2]] == bufr_nan)).T
data[new_name] = np.ma.vstack((data.pop(f[0]), data.pop(f[1]),
data.pop(f[2]))).T.astype(new_dtype)
data[new_name].mask = mask
data[new_name][mask] = nan_val_dict[new_dtype]
data[new_name].fill_value = nan_val_dict[new_dtype]
if data['node_num'].max() == 82:
data['swath_indicator'] = np.ma.array(1 * (data['node_num'] > 41),
dtype=np.uint8,
mask=data['node_num'] > 82)
elif data['node_num'].max() == 42:
data['swath_indicator'] = np.ma.array(1 * (data['node_num'] > 21),
dtype=np.uint8,
mask=data['node_num'] > 42)
else:
raise ValueError('Cross-track cell number size unknown')
n_lines = data['lat'].shape[0] / data['node_num'].max()
line_num = np.arange(n_lines).repeat(data['node_num'].max())
data['line_num'] = np.ma.array(line_num,
dtype=np.uint16,
mask=np.zeros_like(line_num),
fill_value=uint16_nan)
sat_id = np.ma.array([0, 0, 0, 4, 3, 5], dtype=np.uint8)
data['sat_id'] = np.ma.zeros(data['time'].size,
dtype=np.uint8) + sat_id[int(
metadata['platform_id'])]
data['sat_id'].mask = np.zeros(data['time'].size)
data['sat_id'].fill_value = uint8_nan
# compute ascending/descending direction
data['as_des_pass'] = np.ma.array(data['sat_track_azi'] < 270,
dtype=np.uint8,
mask=np.zeros(data['time'].size),
fill_value=uint8_nan)
mask = data['lat'] == bufr_nan
data['lat'] = np.ma.array(data['lat'], mask=mask, fill_value=float32_nan)
mask = data['lon'] == bufr_nan
data['lon'] = np.ma.array(data['lon'], mask=mask, fill_value=float32_nan)
data['time'] = np.ma.array(data['time'], mask=mask, fill_value=0)
return data