diff --git a/TopoPyScale/fetch_era5.py b/TopoPyScale/fetch_era5.py index f870bb6..36899f5 100644 --- a/TopoPyScale/fetch_era5.py +++ b/TopoPyScale/fetch_era5.py @@ -911,7 +911,7 @@ def remap_CDSbeta(file_pattern, file_type='SURF'): try: ds = xr.open_dataset(nc_file) ds = ds.rename({ 'pressure_level': 'level', 'valid_time' : 'time'}) - ds = ds.isel(level=slice(None, None, -1)) # reverse order of levels + ds = ds.sortby('level', ascending=True) # sort levels ascending (300 to 1000) try: ds = ds.drop_vars('number') @@ -984,7 +984,7 @@ def era5_request_surf_snowmapper( today, latN, latS, lonE, lonW, eraDir, output_ bbox = [str(latN), str(lonW), str(latS), str(lonE)] - target = eraDir + "/forecast/SURF_%04d%02d%02d.nc" % (today.year, today.month, today.day) + target = str(eraDir) + "/forecast/SURF_%04d%02d%02d.nc" % (today.year, today.month, today.day) c = cdsapi.Client() c.retrieve( @@ -1036,7 +1036,7 @@ def era5_request_plev_snowmapper(today, latN, latS, lonE, lonW, eraDir, plevels, bbox = [str(latN), str(lonW), str(latS), str(lonE)] - target = eraDir + "/forecast/PLEV_%04d%02d%02d.nc" % (today.year, today.month, today.day) + target = str(eraDir) + "/forecast/PLEV_%04d%02d%02d.nc" % (today.year, today.month, today.day) c = cdsapi.Client() diff --git a/TopoPyScale/sim_fsm2oshd.py b/TopoPyScale/sim_fsm2oshd.py index 7b56827..6ee7022 100644 --- a/TopoPyScale/sim_fsm2oshd.py +++ b/TopoPyScale/sim_fsm2oshd.py @@ -27,7 +27,7 @@ def read_fsm_ds_with_mask(path): ''' function to read fsm as ds, adding a dummy cluster for masked area ''' - ds = xr.open_mfdataset(path, concat_dim='point_ind', combine='nested') + ds = xr.open_mfdataset(path, concat_dim='point_ind', combine='nested', parallel=True) tp = ds.isel(point_ind=0).copy() tp['point_ind'] = -9999 for var in list(tp.keys()): diff --git a/TopoPyScale/topo_param.py b/TopoPyScale/topo_param.py index da23120..6bb866e 100644 --- a/TopoPyScale/topo_param.py +++ b/TopoPyScale/topo_param.py @@ -87,16 +87,16 @@ def extract_pts_param(df_pts, ds_param, method='nearest'): df_pts[['elevation', 'slope', 'aspect', 'aspect_cos', 'aspect_sin', 'svf']] = 0 if method == 'nearest': - for i, row in df_pts.iterrows(): + for row in df_pts.itertuples(): d_mini = ds_param.sel(x=row.x, y=row.y, method='nearest') - df_pts.loc[i, ['elevation', 'slope', 'aspect', 'aspect_cos', 'aspect_sin', 'svf']] = np.array((d_mini.elevation.values, + df_pts.loc[row.Index, ['elevation', 'slope', 'aspect', 'aspect_cos', 'aspect_sin', 'svf']] = np.array((d_mini.elevation.values, d_mini.slope.values, d_mini.aspect.values, d_mini.aspect_cos, d_mini.aspect_sin, d_mini.svf.values)) elif method == 'idw' or method == 'linear': - for i, row in df_pts.iterrows(): + for row in df_pts.itertuples(): ind_lat = np.abs(ds_param.y-row.y).argmin() ind_lon = np.abs(ds_param.x-row.x).argmin() ds_param_pt = ds_param.isel(y=[ind_lat-1, ind_lat, ind_lat+1], x=[ind_lon-1, ind_lon, ind_lon+1]) @@ -119,7 +119,7 @@ def extract_pts_param(df_pts, ds_param, method='nearest'): ) dw = xr.Dataset.weighted(ds_param_pt, da_idw) d_mini = dw.sum(['x', 'y'], keep_attrs=True) - df_pts.loc[i, ['elevation', 'slope', 'aspect', 'aspect_cos', 'aspect_sin', 'svf']] = np.array((d_mini.elevation.values, + df_pts.loc[row.Index, ['elevation', 'slope', 'aspect', 'aspect_cos', 'aspect_sin', 'svf']] = np.array((d_mini.elevation.values, d_mini.slope.values, d_mini.aspect.values, d_mini.aspect_cos, diff --git a/TopoPyScale/topo_scale.py b/TopoPyScale/topo_scale.py index 807ce90..5c6ad0f 100644 --- a/TopoPyScale/topo_scale.py +++ b/TopoPyScale/topo_scale.py @@ -42,6 +42,7 @@ from pyproj import Transformer import numpy as np import sys, time +from types import SimpleNamespace import datetime as dt from TopoPyScale import meteo_util as mu from TopoPyScale import topo_utils as tu @@ -79,7 +80,10 @@ def pt_downscale_interp(row, ds_plev_pt, ds_surf_pt, meta): # convert gridcells coordinates from WGS84 to DEM projection lons, lats = np.meshgrid(ds_plev_pt.longitude.values, ds_plev_pt.latitude.values) - trans = Transformer.from_crs("epsg:4326", "epsg:" + str(meta.get('target_epsg')), always_xy=True) + trans = meta.get('transformer') # Use cached transformer + if trans is None: + # Fallback for backwards compatibility + trans = Transformer.from_crs("epsg:4326", f"epsg:{meta.get('target_epsg')}", always_xy=True) Xs, Ys = trans.transform(lons.flatten(), lats.flatten()) Xs = Xs.reshape(lons.shape) Ys = Ys.reshape(lons.shape) @@ -148,7 +152,6 @@ def pt_downscale_interp(row, ds_plev_pt, ds_surf_pt, meta): except: raise ValueError( f'ERROR: Upper pressure level {plev_interp.level.min().values} hPa geopotential is lower than cluster mean elevation {row.elevation} {plev_interp.z}') - top = plev_interp.isel(level=ind_z_top) bot = plev_interp.isel(level=ind_z_bot) @@ -177,10 +180,10 @@ def pt_downscale_interp(row, ds_plev_pt, ds_surf_pt, meta): }, coords={'month': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]} ) - down_pt['precip_lapse_rate'] = (1 + monthly_coeffs.coef.sel(month=down_pt.month.values).data * ( - row.elevation - surf_interp.z) * 1e-3) / \ - (1 - monthly_coeffs.coef.sel(month=down_pt.month.values).data * ( - row.elevation - surf_interp.z) * 1e-3) + # Cache coefficient selection (avoid duplicate .sel() call) + coef = monthly_coeffs.coef.sel(month=down_pt.month.values).data + elev_diff = (row.elevation - surf_interp.z) * 1e-3 + down_pt['precip_lapse_rate'] = (1 + coef * elev_diff) / (1 - coef * elev_diff) else: down_pt['precip_lapse_rate'] = down_pt.t * 0 + 1 @@ -396,8 +399,8 @@ def downscale_climate(project_directory, # =========== Open dataset with Dask ================= tvec = pd.date_range(start_date, pd.to_datetime(end_date) + pd.to_timedelta('1D'), freq=tstep, inclusive='left') - flist_PLEV = glob.glob(f'{climate_directory}/PLEV*.nc') - flist_SURF = glob.glob(f'{climate_directory}/SURF*.nc') + flist_PLEV = sorted(glob.glob(f'{climate_directory}/PLEV*.nc')) + flist_SURF = sorted(glob.glob(f'{climate_directory}/SURF*.nc')) if 'object' in list(df_centroids.dtypes): pass @@ -407,7 +410,7 @@ def downscale_climate(project_directory, def _open_dataset_climate(flist): - ds_ = xr.open_mfdataset(flist, parallel=False, concat_dim="time", combine='nested', coords='minimal') + ds_ = xr.open_mfdataset(flist, parallel=True, concat_dim="time", combine='nested', coords='minimal') # this block handles the expver dimension that is in downloaded ERA5 data if data is ERA5/ERA5T mix. If only ERA5 or # only ERA5T it is not present. ERA5T data can be present in the timewindow T-5days to T -3months, where T is today. @@ -473,11 +476,9 @@ def _subset_climate_dataset(ds_, row, type='plev'): ds_plev = ds_plev.sel(time=tvec.values) - row_list = [] - ds_list = [] - for _, row in df_centroids.iterrows(): - row_list.append(row) - ds_list.append(ds_plev) + # Convert to list of row tuples (faster than iterrows) + row_list = list(df_centroids.itertuples(index=False)) + ds_list = [ds_plev] * len(row_list) fun_param = zip(ds_list, row_list, ['plev'] * len(row_list)) # construct here the tuple that goes into the pooling for arguments tu.multithread_pooling(_subset_climate_dataset, fun_param, n_threads=n_core) @@ -509,9 +510,8 @@ def _subset_climate_dataset(ds_, row, type='plev'): print(f"Requested time range: {requested_times.min()} to {requested_times.max()}") print(f"Missing timesteps: {missing_str}") - ds_list = [] - for _, _ in df_centroids.iterrows(): - ds_list.append(ds_surf) + # Repeat ds_surf for each centroid (no loop needed) + ds_list = [ds_surf] * len(row_list) fun_param = zip(ds_list, row_list, ['surf'] * len(row_list)) # construct here the tuple that goes into the pooling for arguments tu.multithread_pooling(_subset_climate_dataset, fun_param, n_threads=n_core) @@ -519,29 +519,34 @@ def _subset_climate_dataset(ds_, row, type='plev'): ds_surf = None # Preparing list to feed into Pooling - surf_pt_list = [] - plev_pt_list = [] - ds_solar_list = [] - horizon_da_list = [] - row_list = [] - meta_list = [] - i = 0 - for _, row in df_centroids.iterrows(): - surf_pt_list.append(xr.open_dataset(output_directory / f'tmp/ds_surf_pt_{row.point_name}.nc', engine='h5netcdf')) - plev_pt_list.append(xr.open_dataset(output_directory / f'tmp/ds_plev_pt_{row.point_name}.nc', engine='h5netcdf')) - ds_solar_list.append(ds_solar.sel(point_name=row.point_name)) - horizon_da_list.append(horizon_da) - row_list.append(row) - meta_list.append({'interp_method': interp_method, - 'lw_terrain_flag': lw_terrain_flag, - 'tstep': tstep_dict.get(tstep), - 'n_digits': n_digits, - 'file_pattern': file_pattern, - 'target_epsg':target_EPSG, - 'precip_lapse_rate_flag':precip_lapse_rate_flag, - 'output_directory':output_directory, - 'lw_terrain_flag':lw_terrain_flag}) - i+=1 + # Pre-build meta dict once (same for all points) + # Pre-create transformer once (used by all points for CRS conversion) + transformer = Transformer.from_crs("epsg:4326", f"epsg:{target_EPSG}", always_xy=True) + + meta_template = { + 'interp_method': interp_method, + 'lw_terrain_flag': lw_terrain_flag, + 'tstep': tstep_dict.get(tstep), + 'n_digits': n_digits, + 'file_pattern': file_pattern, + 'target_epsg': target_EPSG, + 'precip_lapse_rate_flag': precip_lapse_rate_flag, + 'output_directory': output_directory, + 'transformer': transformer, # Cached CRS transformer + } + + # Build lists using itertuples (faster than iterrows) + # Convert to SimpleNamespace for pickling compatibility with pandas 2.x + multiprocessing + cols = df_centroids.columns.tolist() + row_list = [SimpleNamespace(**dict(zip(cols, row))) + for row in df_centroids.itertuples(index=False, name=None)] + point_names = df_centroids.point_name.values + + surf_pt_list = [xr.open_dataset(output_directory / f'tmp/ds_surf_pt_{pn}.nc', engine='h5netcdf') for pn in point_names] + plev_pt_list = [xr.open_dataset(output_directory / f'tmp/ds_plev_pt_{pn}.nc', engine='h5netcdf') for pn in point_names] + ds_solar_list = [ds_solar.sel(point_name=pn) for pn in point_names] + horizon_da_list = [horizon_da] * len(row_list) + meta_list = [meta_template.copy() for _ in row_list] fun_param = zip(row_list, plev_pt_list, surf_pt_list, meta_list) # construct here the tuple that goes into the pooling for arguments tu.multicore_pooling(pt_downscale_interp, fun_param, n_core) @@ -570,5 +575,5 @@ def read_downscaled(path='outputs/down_pt*.nc'): dataset: merged dataset readily to use and loaded in chuncks via Dask """ - down_pts = xr.open_mfdataset(path, concat_dim='point_name', combine='nested', parallel=False) + down_pts = xr.open_mfdataset(path, concat_dim='point_name', combine='nested', parallel=True) return down_pts diff --git a/TopoPyScale/topo_sub.py b/TopoPyScale/topo_sub.py index 58976d0..4ebbf7a 100644 --- a/TopoPyScale/topo_sub.py +++ b/TopoPyScale/topo_sub.py @@ -21,6 +21,7 @@ import time from typing import Union, Optional from pathlib import Path +from concurrent.futures import ProcessPoolExecutor from TopoPyScale import topo_utils as tu @@ -154,6 +155,65 @@ def minibatch_kmeans_clustering(df_param, return df_centers, miniBkmeans, df_param['cluster_labels'] +def _evaluate_n_clusters(args): + """ + Worker function to evaluate a single n_clusters value. + Used by search_number_of_clusters for parallel execution. + """ + n_clusters, df_param, method, features, scaler_type, scaler, seed = args + feature_list = list(features.keys()) + + # Scale data + if scaler_type is not None: + df_scaled, scaler_l = scale_df(df_param[feature_list], scaler=scaler_type, features=features) + else: + df_scaled = df_param[feature_list] + scaler_l = None + + # Run clustering + if method.lower() in ['minibatchkmean', 'minibatchkmeans']: + df_centroids, kmeans_obj, cluster_labels = minibatch_kmeans_clustering(df_scaled, + n_clusters, + features=features, + seed=seed) + elif method.lower() in ['kmean', 'kmeans']: + df_centroids, kmeans_obj, cluster_labels = kmeans_clustering(df_scaled, + n_clusters, + features=features, + seed=seed) + + labels = kmeans_obj.labels_ + + # Compute RMSE + if scaler_type is not None and scaler_l is not None: + cluster_elev = inverse_scale_df(df_centroids[feature_list], scaler_l, features=features).elevation.loc[ + cluster_labels].values + rmse = (((df_param.elevation - cluster_elev) ** 2).mean()) ** 0.5 + elif scaler is not None: + cluster_elev = inverse_scale_df(df_centroids[feature_list], scaler, features=features).elevation.loc[ + cluster_labels].values + rmse = (((df_param.elevation - cluster_elev) ** 2).mean()) ** 0.5 + else: + rmse = 0 + + # Compute cluster size stats + df_tmp = df_param.copy() + df_tmp['cluster_labels'] = cluster_labels + pix_count = df_tmp.groupby('cluster_labels').count()['x'] + + return { + 'n_clusters': n_clusters, + 'wcss_score': kmeans_obj.inertia_, + 'db_score': davies_bouldin_score(df_param, labels), + 'ch_score': calinski_harabasz_score(df_param, labels), + 'rmse_elevation': rmse, + 'n_pixels_min': pix_count.min(), + 'n_pixels_max': pix_count.max(), + 'n_pixels_mean': pix_count.mean(), + 'n_pixels_median': pix_count.median(), + } + + def search_number_of_clusters(df_param, method='minibatchkmean', cluster_range=np.arange(100, 1000, 200), @@ -161,7 +221,8 @@ def search_number_of_clusters(df_param, scaler_type=StandardScaler(), scaler=None, seed=2, - plot=True): + plot=True, + n_workers=None): ''' Function to help identify an optimum number of clusters using the elbow method Args: @@ -173,72 +234,35 @@ def search_number_of_clusters(df_param, scaler (scikit_learn obj): fitted scaler to dataset. Implies that df_param is already scaled seed (int): random seed for kmeans clustering plot (bool): plot results or not + n_workers (int): number of parallel workers. None = sequential, 0 or negative = all CPUs Returns: dataframe: wcss score, Davies Boulding score, Calinsky Harabasz score ''' - feature_list = features.keys() - wcss = [] # Define a list to hold the Within-Cluster-Sum-of-Squares (WCSS) - db_scores = [] - ch_scores = [] - rmse_elevation = [] - n_pixels_median = [] - n_pixels_min = [] - n_pixels_max = [] - n_pixels_mean = [] - - for n_clusters in cluster_range: - if scaler_type is not None: - df_scaled, scaler_l = scale_df(df_param[feature_list], scaler=scaler_type, features=features) - else: - df_scaled = df_param[feature_list] - - if method.lower() in ['minibatchkmean', 'minibatchkmeans']: - df_centroids, kmeans_obj, df_param['cluster_labels'] = minibatch_kmeans_clustering(df_scaled, - n_clusters, - features=features, - seed=seed) - elif method.lower() in ['kmean', 'kmeans']: - df_centroids, kmeans_obj, df_param['cluster_labels'] = kmeans_clustering(df_scaled, - n_clusters, - features=features, - seed=seed) - - labels = kmeans_obj.labels_ - if scaler_type is not None: - cluster_elev = inverse_scale_df(df_centroids[feature_list], scaler_l, features=features).elevation.loc[ - df_param.cluster_labels].values - rmse = (((df_param.elevation - cluster_elev) ** 2).mean()) ** 0.5 - elif scaler is not None: - cluster_elev = inverse_scale_df(df_centroids[feature_list], scaler, features=features).elevation.loc[ - df_param.cluster_labels].values - rmse = (((df_param.elevation - cluster_elev) ** 2).mean()) ** 0.5 - else: - rmse = 0 - - # compute scores - wcss.append(kmeans_obj.inertia_) - db_scores.append(davies_bouldin_score(df_param, labels)) - ch_scores.append(calinski_harabasz_score(df_param, labels)) - rmse_elevation.append(rmse) - - # compute stats on cluster sizes - pix_count = df_param.groupby('cluster_labels').count()['x'] - n_pixels_min.append(pix_count.min()) - n_pixels_max.append(pix_count.max()) - n_pixels_mean.append(pix_count.mean()) - n_pixels_median.append(pix_count.median()) - - df = pd.DataFrame({'n_clusters': cluster_range, - 'wcss_score': wcss, - 'db_score': db_scores, - 'ch_score': ch_scores, - 'rmse_elevation': rmse_elevation, - 'n_pixels_min': n_pixels_min, - 'n_pixels_median': n_pixels_median, - 'n_pixels_mean': n_pixels_mean, - 'n_pixels_max': n_pixels_max}) + import os + + # Build argument list for parallel execution + args_list = [ + (n_clusters, df_param, method, features, scaler_type, scaler, seed) + for n_clusters in cluster_range + ] + + # Parallel or sequential execution + if n_workers is None or len(cluster_range) < 3: + # Sequential execution + results = [_evaluate_n_clusters(args) for args in args_list] + else: + # Parallel execution + if n_workers <= 0: + n_workers = os.cpu_count() + with ProcessPoolExecutor(max_workers=n_workers) as executor: + results = list(executor.map(_evaluate_n_clusters, args_list)) + + # Sort results by n_clusters (parallel execution may return out of order) + results = sorted(results, key=lambda x: x['n_clusters']) + + df = pd.DataFrame(results) if plot: fig, ax = plt.subplots(4, 1, sharex=True) diff --git a/TopoPyScale/topoclass.py b/TopoPyScale/topoclass.py index f674aed..4125605 100644 --- a/TopoPyScale/topoclass.py +++ b/TopoPyScale/topoclass.py @@ -306,9 +306,15 @@ def extract_topo_cluster_param(self): # read df param df_param = ts.ds_to_indexed_dataframe(self.toposub.ds_param) + # Auto-mask nodata pixels (e.g. from reprojected DEMs with non-rectangular valid regions) + valid_elev = ~df_param['elevation'].isna() + n_nodata = (~valid_elev).sum() + if n_nodata > 0: + print(f'---> Auto-masking {n_nodata} nodata pixels from DEM') + # add mask and cluster feature if mask_file in [None, {}]: - mask = [True] * len(df_param) + mask = valid_elev else: if not os.path.isabs(mask_file): mask_file = Path(self.config.project.directory, mask_file) @@ -330,8 +336,8 @@ def extract_topo_cluster_param(self): f'The GeoTIFFS of the DEM and the MASK need to have the same bounds/resolution. \n{str_bounds}\n{str_res}') print(f'---> Only consider grid cells inside mask ({Path(mask_file).name})') - # get mask - mask = ts.ds_to_indexed_dataframe(ds_mask)['mask'] == 1 + # get mask (combine with nodata auto-mask) + mask = (ts.ds_to_indexed_dataframe(ds_mask)['mask'] == 1) & valid_elev # add cluster groups. Groups can be landcover classes for instance if groups_file in [None, {}]: