Skip to content
6 changes: 3 additions & 3 deletions TopoPyScale/fetch_era5.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ def remap_CDSbeta(file_pattern, file_type='SURF'):
try:
ds = xr.open_dataset(nc_file)
ds = ds.rename({ 'pressure_level': 'level', 'valid_time' : 'time'})
ds = ds.isel(level=slice(None, None, -1)) # reverse order of levels
ds = ds.sortby('level', ascending=True) # sort levels ascending (300 to 1000)

try:
ds = ds.drop_vars('number')
Expand Down Expand Up @@ -984,7 +984,7 @@ def era5_request_surf_snowmapper( today, latN, latS, lonE, lonW, eraDir, output_

bbox = [str(latN), str(lonW), str(latS), str(lonE)]

target = eraDir + "/forecast/SURF_%04d%02d%02d.nc" % (today.year, today.month, today.day)
target = str(eraDir) + "/forecast/SURF_%04d%02d%02d.nc" % (today.year, today.month, today.day)

c = cdsapi.Client()
c.retrieve(
Expand Down Expand Up @@ -1036,7 +1036,7 @@ def era5_request_plev_snowmapper(today, latN, latS, lonE, lonW, eraDir, plevels,

bbox = [str(latN), str(lonW), str(latS), str(lonE)]

target = eraDir + "/forecast/PLEV_%04d%02d%02d.nc" % (today.year, today.month, today.day)
target = str(eraDir) + "/forecast/PLEV_%04d%02d%02d.nc" % (today.year, today.month, today.day)


c = cdsapi.Client()
Expand Down
2 changes: 1 addition & 1 deletion TopoPyScale/sim_fsm2oshd.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def read_fsm_ds_with_mask(path):
'''
function to read fsm as ds, adding a dummy cluster for masked area
'''
ds = xr.open_mfdataset(path, concat_dim='point_ind', combine='nested')
ds = xr.open_mfdataset(path, concat_dim='point_ind', combine='nested', parallel=True)
tp = ds.isel(point_ind=0).copy()
tp['point_ind'] = -9999
for var in list(tp.keys()):
Expand Down
8 changes: 4 additions & 4 deletions TopoPyScale/topo_param.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,16 @@ def extract_pts_param(df_pts, ds_param, method='nearest'):
df_pts[['elevation', 'slope', 'aspect', 'aspect_cos', 'aspect_sin', 'svf']] = 0

if method == 'nearest':
for i, row in df_pts.iterrows():
for row in df_pts.itertuples():
d_mini = ds_param.sel(x=row.x, y=row.y, method='nearest')
df_pts.loc[i, ['elevation', 'slope', 'aspect', 'aspect_cos', 'aspect_sin', 'svf']] = np.array((d_mini.elevation.values,
df_pts.loc[row.Index, ['elevation', 'slope', 'aspect', 'aspect_cos', 'aspect_sin', 'svf']] = np.array((d_mini.elevation.values,
d_mini.slope.values,
d_mini.aspect.values,
d_mini.aspect_cos,
d_mini.aspect_sin,
d_mini.svf.values))
elif method == 'idw' or method == 'linear':
for i, row in df_pts.iterrows():
for row in df_pts.itertuples():
ind_lat = np.abs(ds_param.y-row.y).argmin()
ind_lon = np.abs(ds_param.x-row.x).argmin()
ds_param_pt = ds_param.isel(y=[ind_lat-1, ind_lat, ind_lat+1], x=[ind_lon-1, ind_lon, ind_lon+1])
Expand All @@ -119,7 +119,7 @@ def extract_pts_param(df_pts, ds_param, method='nearest'):
)
dw = xr.Dataset.weighted(ds_param_pt, da_idw)
d_mini = dw.sum(['x', 'y'], keep_attrs=True)
df_pts.loc[i, ['elevation', 'slope', 'aspect', 'aspect_cos', 'aspect_sin', 'svf']] = np.array((d_mini.elevation.values,
df_pts.loc[row.Index, ['elevation', 'slope', 'aspect', 'aspect_cos', 'aspect_sin', 'svf']] = np.array((d_mini.elevation.values,
d_mini.slope.values,
d_mini.aspect.values,
d_mini.aspect_cos,
Expand Down
87 changes: 46 additions & 41 deletions TopoPyScale/topo_scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from pyproj import Transformer
import numpy as np
import sys, time
from types import SimpleNamespace
import datetime as dt
from TopoPyScale import meteo_util as mu
from TopoPyScale import topo_utils as tu
Expand Down Expand Up @@ -79,7 +80,10 @@ def pt_downscale_interp(row, ds_plev_pt, ds_surf_pt, meta):

# convert gridcells coordinates from WGS84 to DEM projection
lons, lats = np.meshgrid(ds_plev_pt.longitude.values, ds_plev_pt.latitude.values)
trans = Transformer.from_crs("epsg:4326", "epsg:" + str(meta.get('target_epsg')), always_xy=True)
trans = meta.get('transformer') # Use cached transformer
if trans is None:
# Fallback for backwards compatibility
trans = Transformer.from_crs("epsg:4326", f"epsg:{meta.get('target_epsg')}", always_xy=True)
Xs, Ys = trans.transform(lons.flatten(), lats.flatten())
Xs = Xs.reshape(lons.shape)
Ys = Ys.reshape(lons.shape)
Expand Down Expand Up @@ -148,7 +152,6 @@ def pt_downscale_interp(row, ds_plev_pt, ds_surf_pt, meta):
except:
raise ValueError(
f'ERROR: Upper pressure level {plev_interp.level.min().values} hPa geopotential is lower than cluster mean elevation {row.elevation} {plev_interp.z}')


top = plev_interp.isel(level=ind_z_top)
bot = plev_interp.isel(level=ind_z_bot)
Expand Down Expand Up @@ -177,10 +180,10 @@ def pt_downscale_interp(row, ds_plev_pt, ds_surf_pt, meta):
},
coords={'month': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]}
)
down_pt['precip_lapse_rate'] = (1 + monthly_coeffs.coef.sel(month=down_pt.month.values).data * (
row.elevation - surf_interp.z) * 1e-3) / \
(1 - monthly_coeffs.coef.sel(month=down_pt.month.values).data * (
row.elevation - surf_interp.z) * 1e-3)
# Cache coefficient selection (avoid duplicate .sel() call)
coef = monthly_coeffs.coef.sel(month=down_pt.month.values).data
elev_diff = (row.elevation - surf_interp.z) * 1e-3
down_pt['precip_lapse_rate'] = (1 + coef * elev_diff) / (1 - coef * elev_diff)
else:
down_pt['precip_lapse_rate'] = down_pt.t * 0 + 1

Expand Down Expand Up @@ -396,8 +399,8 @@ def downscale_climate(project_directory,
# =========== Open dataset with Dask =================
tvec = pd.date_range(start_date, pd.to_datetime(end_date) + pd.to_timedelta('1D'), freq=tstep, inclusive='left')

flist_PLEV = glob.glob(f'{climate_directory}/PLEV*.nc')
flist_SURF = glob.glob(f'{climate_directory}/SURF*.nc')
flist_PLEV = sorted(glob.glob(f'{climate_directory}/PLEV*.nc'))
flist_SURF = sorted(glob.glob(f'{climate_directory}/SURF*.nc'))

if 'object' in list(df_centroids.dtypes):
pass
Expand All @@ -407,7 +410,7 @@ def downscale_climate(project_directory,

def _open_dataset_climate(flist):

ds_ = xr.open_mfdataset(flist, parallel=False, concat_dim="time", combine='nested', coords='minimal')
ds_ = xr.open_mfdataset(flist, parallel=True, concat_dim="time", combine='nested', coords='minimal')

# this block handles the expver dimension that is in downloaded ERA5 data if data is ERA5/ERA5T mix. If only ERA5 or
# only ERA5T it is not present. ERA5T data can be present in the timewindow T-5days to T -3months, where T is today.
Expand Down Expand Up @@ -473,11 +476,9 @@ def _subset_climate_dataset(ds_, row, type='plev'):

ds_plev = ds_plev.sel(time=tvec.values)

row_list = []
ds_list = []
for _, row in df_centroids.iterrows():
row_list.append(row)
ds_list.append(ds_plev)
# Convert to list of row tuples (faster than iterrows)
row_list = list(df_centroids.itertuples(index=False))
ds_list = [ds_plev] * len(row_list)

fun_param = zip(ds_list, row_list, ['plev'] * len(row_list)) # construct here the tuple that goes into the pooling for arguments
tu.multithread_pooling(_subset_climate_dataset, fun_param, n_threads=n_core)
Expand Down Expand Up @@ -509,39 +510,43 @@ def _subset_climate_dataset(ds_, row, type='plev'):
print(f"Requested time range: {requested_times.min()} to {requested_times.max()}")
print(f"Missing timesteps: {missing_str}")

ds_list = []
for _, _ in df_centroids.iterrows():
ds_list.append(ds_surf)
# Repeat ds_surf for each centroid (no loop needed)
ds_list = [ds_surf] * len(row_list)

fun_param = zip(ds_list, row_list, ['surf'] * len(row_list)) # construct here the tuple that goes into the pooling for arguments
tu.multithread_pooling(_subset_climate_dataset, fun_param, n_threads=n_core)
fun_param = None
ds_surf = None

# Preparing list to feed into Pooling
surf_pt_list = []
plev_pt_list = []
ds_solar_list = []
horizon_da_list = []
row_list = []
meta_list = []
i = 0
for _, row in df_centroids.iterrows():
surf_pt_list.append(xr.open_dataset(output_directory / f'tmp/ds_surf_pt_{row.point_name}.nc', engine='h5netcdf'))
plev_pt_list.append(xr.open_dataset(output_directory / f'tmp/ds_plev_pt_{row.point_name}.nc', engine='h5netcdf'))
ds_solar_list.append(ds_solar.sel(point_name=row.point_name))
horizon_da_list.append(horizon_da)
row_list.append(row)
meta_list.append({'interp_method': interp_method,
'lw_terrain_flag': lw_terrain_flag,
'tstep': tstep_dict.get(tstep),
'n_digits': n_digits,
'file_pattern': file_pattern,
'target_epsg':target_EPSG,
'precip_lapse_rate_flag':precip_lapse_rate_flag,
'output_directory':output_directory,
'lw_terrain_flag':lw_terrain_flag})
i+=1
# Pre-build meta dict once (same for all points)
# Pre-create transformer once (used by all points for CRS conversion)
transformer = Transformer.from_crs("epsg:4326", f"epsg:{target_EPSG}", always_xy=True)

meta_template = {
'interp_method': interp_method,
'lw_terrain_flag': lw_terrain_flag,
'tstep': tstep_dict.get(tstep),
'n_digits': n_digits,
'file_pattern': file_pattern,
'target_epsg': target_EPSG,
'precip_lapse_rate_flag': precip_lapse_rate_flag,
'output_directory': output_directory,
'transformer': transformer, # Cached CRS transformer
}

# Build lists using itertuples (faster than iterrows)
# Convert to SimpleNamespace for pickling compatibility with pandas 2.x + multiprocessing
cols = df_centroids.columns.tolist()
row_list = [SimpleNamespace(**dict(zip(cols, row)))
for row in df_centroids.itertuples(index=False, name=None)]
point_names = df_centroids.point_name.values

surf_pt_list = [xr.open_dataset(output_directory / f'tmp/ds_surf_pt_{pn}.nc', engine='h5netcdf') for pn in point_names]
plev_pt_list = [xr.open_dataset(output_directory / f'tmp/ds_plev_pt_{pn}.nc', engine='h5netcdf') for pn in point_names]
ds_solar_list = [ds_solar.sel(point_name=pn) for pn in point_names]
horizon_da_list = [horizon_da] * len(row_list)
meta_list = [meta_template.copy() for _ in row_list]

fun_param = zip(row_list, plev_pt_list, surf_pt_list, meta_list) # construct here the tuple that goes into the pooling for arguments
tu.multicore_pooling(pt_downscale_interp, fun_param, n_core)
Expand Down Expand Up @@ -570,5 +575,5 @@ def read_downscaled(path='outputs/down_pt*.nc'):
dataset: merged dataset readily to use and loaded in chuncks via Dask
"""

down_pts = xr.open_mfdataset(path, concat_dim='point_name', combine='nested', parallel=False)
down_pts = xr.open_mfdataset(path, concat_dim='point_name', combine='nested', parallel=True)
return down_pts
Loading