From be87b3c122f55aa23ddd03bb11e0ae7feba4b51c Mon Sep 17 00:00:00 2001 From: lidh966 Date: Fri, 13 Jun 2025 01:30:50 -0400 Subject: [PATCH] Add mpi-enabled spatially chunked loading --- .gitignore | 4 ++ src/I_O/forcing_loader.cpp | 80 +++++++++++++++++++++++-- src/I_O/forcing_loader.hpp | 24 ++++++-- src/main.cpp | 116 +++++++++++++++++++++++++++++++++++-- 4 files changed, 210 insertions(+), 14 deletions(-) diff --git a/.gitignore b/.gitignore index 1563ba5..485a330 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,7 @@ data/pr_hourly_era5land_2019.nc data/t2m_daily_avg_era5land_2019.nc src/dense_204_a.csv src/final_204_a.csv + +############################################################################## +# ---- core dumps ------------------------------------------------------------- +*[0-9]+ diff --git a/src/I_O/forcing_loader.cpp b/src/I_O/forcing_loader.cpp index f4d6f43..8fd435e 100644 --- a/src/I_O/forcing_loader.cpp +++ b/src/I_O/forcing_loader.cpp @@ -194,6 +194,43 @@ std::unique_ptr NetCDFLoader::loadTimeChunk(size_t startTime, size_t nu return data; } + +// Load data by any chunk: time/lat/lon into memory +std::unique_ptr NetCDFLoader::loadChunk(size_t startTime, size_t numTime, + size_t startLat, size_t numLat, + size_t startLon, size_t numLon) { + // Check if chunk is out of bounds + if (numTime == 0 || numLat == 0 || numLon == 0) { + throw std::invalid_argument("Size of chunk dimensions must be greater than zero"); + } + if (startTime >= timeSize || startLat >= latSize || startLon >= lonSize) { + throw std::out_of_range("Start indices out of range"); + } + if (startTime + numTime > timeSize || startLat + numLat > latSize || startLon + numLon > lonSize) { + throw std::out_of_range("Requested chunk exceeds available data"); + } + + // Calculate total elements in the chunk + size_t totalElements = numTime * numLat * numLon; + + // Allocate memory for the chunk + std::unique_ptr data = std::make_unique(totalElements); + + // Define start and count arrays for subsetting + size_t start[3] = {startTime, startLat, startLon}; + size_t count[3] = {numTime, numLat, numLon}; + + // Read data from NetCDF file using C API + int status = nc_get_vara_float(ncid, varid, start, count, data.get()); + checkError(status, "Reading variable data"); + + std::cout << "Loaded chunk for " << varName << ": time steps " << startTime << " to " + << (startTime + numTime - 1) << ", lat " << startLat << " to " + << (startLat + numLat - 1) << ", lon " << startLon << " to " + << (startLon + numLon - 1) << std::endl; + + return data; +} // Get a single value from pre-loaded chunk data float NetCDFLoader::getValueFromChunk(const std::unique_ptr& chunkData, @@ -211,8 +248,43 @@ float NetCDFLoader::getValueFromChunk(const std::unique_ptr& chunkData, size_t index = relativeTimeIndex * (latSize * lonSize) + latIndex * lonSize + lonIndex; return chunkData[index]; } - -// Check if data is loaded correctly -bool NetCDFLoader::isDataLoaded() const { - return ncid >= 0 && timeSize > 0 && latSize > 0 && lonSize > 0; + + +// Calculate spatial chunks based on latitude and longitude sizes +std::vector NetCDFLoader::calculateSpatialChunks(size_t latSize, size_t lonSize, int numChunks) { + if (numChunks <= 0) { + throw std::invalid_argument("Number of chunks must be positive"); + } + if (latSize == 0 || lonSize == 0) { + throw std::invalid_argument("Spatial dimensions must be positive"); + } + + std::vector chunks(numChunks); + + // Divide by the smaller dimension for better load balancing + if (latSize <= lonSize) { + // Divide latitude dimension + size_t latChunkSize = latSize / numChunks; + size_t remainder = latSize % numChunks; + + for (int r = 0; r < numChunks; ++r) { + chunks[r].startLat = r * latChunkSize + std::min(static_cast(r), remainder); + chunks[r].numLat = latChunkSize + (r < remainder ? 1 : 0); + chunks[r].startLon = 0; + chunks[r].numLon = lonSize; // Full longitude range + } + } else { + // Divide longitude dimension + size_t lonChunkSize = lonSize / numChunks; + size_t remainder = lonSize % numChunks; + + for (int r = 0; r < numChunks; ++r) { + chunks[r].startLat = 0; + chunks[r].numLat = latSize; // Full latitude range + chunks[r].startLon = r * lonChunkSize + std::min(static_cast(r), remainder); + chunks[r].numLon = lonChunkSize + (r < remainder ? 1 : 0); + } + } + + return chunks; } diff --git a/src/I_O/forcing_loader.hpp b/src/I_O/forcing_loader.hpp index 2f593c6..1834028 100644 --- a/src/I_O/forcing_loader.hpp +++ b/src/I_O/forcing_loader.hpp @@ -9,6 +9,17 @@ #include // for std::pair #include +// Structure to hold spatial chunk information +// Current logic is simple: divide the lat/lon grid into rectangular chunks based on # of processes (ranks) +struct SpatialChunk { + size_t startLat, numLat; // Start index and number of latitudes in the chunk + size_t startLon, numLon; + + // Constructor for convenience + SpatialChunk(size_t sLat = 0, size_t nLat = 0, size_t sLon = 0, size_t nLon = 0) + : startLat(sLat), numLat(nLat), startLon(sLon), numLon(nLon) {} +}; + class LookupMapper { public: @@ -59,9 +70,11 @@ class NetCDFLoader { // Load data by time chunk into memory std::unique_ptr loadTimeChunk(size_t startTime, size_t numTimeSteps); - - // Check if data is loaded correctly - bool isDataLoaded() const; + + // Load data by any chunk: time/lat/lon into memory + std::unique_ptr loadChunk(size_t startTime, size_t numTime, // start index of time dimension, number of time steps in loading chunk + size_t startLat, size_t numLat, + size_t startLon, size_t numLon); // Getters size_t getTimeSize() const { return timeSize; } @@ -83,7 +96,10 @@ class NetCDFLoader { // Return NetCDF file and variable IDs for advanced operations: maybe needed in the future int getFileId() const { return ncid; } - int getVariableId() const { return varid; } + int getVariableId() const { return varid; } + + // Calculate chunks for this NetCDF file + std::vector calculateSpatialChunks(size_t latSize, size_t lonSize, int numChunks); }; diff --git a/src/main.cpp b/src/main.cpp index c672e4d..fa44802 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -277,11 +277,58 @@ int main(int argc, char** argv) { int baseChunk = totalRows / totalRanks; int remainder = totalRows % totalRanks; + /* for splitting links here, considering the whole forcing domain is split into #GPU rectangles, + we need to add logic that make sure the links in each rank has corresponding forcing data */ + int start = 0; std::cout << "Total rows: " << totalRows << std::endl; std::cout << "Total ranks: " << totalRanks << std::endl; + + /* LOGIC to pass information of spatial chunking + current logic is very simple: divide the lat/lon grid into rectangular chunks based on # of processes (ranks) + divide the smaller dimension + + This code is redundant, but kept for clarity and future reference + will combine the loading for pr and t2m into one function + + totalRanks here is a bit confusing. it's not the total number of ranks, but the number of ranks that will be used other than rank 0. + */ + // initialize netcdf loader for pr + NetCDFLoader prLoader("../data/pr_hourly_era5land_2019.nc", "pr"); + size_t lat_size = prLoader.getLatSize(); + size_t lon_size = prLoader.getLonSize(); + size_t time_size = prLoader.getTimeSize(); + std::cout << "Dimensions for pr: lat=" << lat_size << ", lon=" << lon_size << ", time=" << time_size << std::endl; + std::cout << totalRanks << " ranks will be used for spatial chunking." << std::endl; + // calculate spatial chunks + std::vector spatial_chunks_pr = prLoader.calculateSpatialChunks(lat_size, lon_size, totalRanks); + std::cout << "Spatial chunks for pr:" << std::endl; + for (int i = 0; i < spatial_chunks_pr.size(); ++i) { + std::cout << "Chunk " << i << ": lat[" << spatial_chunks_pr[i].startLat + << ":" << spatial_chunks_pr[i].startLat + spatial_chunks_pr[i].numLat - 1 << "], " + << "lon[" << spatial_chunks_pr[i].startLon + << ":" << spatial_chunks_pr[i].startLon + spatial_chunks_pr[i].numLon - 1 << "], " + << "size: " << spatial_chunks_pr[i].numLat << "x" << spatial_chunks_pr[i].numLon << std::endl; + } + + // similarly, initialize netcdf loader for t2m & calculate spatial chunks + NetCDFLoader t2mLoader("../data/t2m_daily_avg_era5land_2019.nc", "t2m"); + size_t lat_size_t2m = t2mLoader.getLatSize(); + size_t lon_size_t2m = t2mLoader.getLonSize(); + size_t time_size_t2m = t2mLoader.getTimeSize(); + std::cout << "Dimensions for t2m: lat=" << lat_size_t2m << ", lon=" << lon_size_t2m << ", time=" << time_size_t2m << std::endl; + std::vector spatial_chunks_t2m = t2mLoader.calculateSpatialChunks(lat_size_t2m, lon_size_t2m, totalRanks); + std::cout << "Spatial chunks for t2m:" << std::endl; + for (int i = 0; i < spatial_chunks_t2m.size(); ++i) { + std::cout << "Chunk " << i << ": lat[" << spatial_chunks_t2m[i].startLat + << ":" << spatial_chunks_t2m[i].startLat + spatial_chunks_t2m[i].numLat - 1 << "], " + << "lon[" << spatial_chunks_t2m[i].startLon + << ":" << spatial_chunks_t2m[i].startLon + spatial_chunks_t2m[i].numLon - 1 << "], " + << "size: " << spatial_chunks_t2m[i].numLat << "x" << spatial_chunks_t2m[i].numLon << std::endl; + } + for (int r = 1; r <= totalRanks; ++r) { // Calculate actual chunk size for this rank int chunkSize = baseChunk + (r <= remainder ? 1 : 0); @@ -304,6 +351,26 @@ int main(int argc, char** argv) { // Move to next chunk start = end; + + + /* Send spatial chunk information to each worker */ + // Send spatial chunk for pr - spatial boundaries + SpatialChunk chunk_pr = spatial_chunks_pr[r - 1]; // r-1 because rank 0 is master + std::cout << "Sending spatial chunk of pr to rank " << r << std::endl; + MPI_Send(&chunk_pr.startLat, 1, MPI_UNSIGNED_LONG, r, 0, MPI_COMM_WORLD); + MPI_Send(&chunk_pr.numLat, 1, MPI_UNSIGNED_LONG, r, 1, MPI_COMM_WORLD); + MPI_Send(&chunk_pr.startLon, 1, MPI_UNSIGNED_LONG, r, 2, MPI_COMM_WORLD); + MPI_Send(&chunk_pr.numLon, 1, MPI_UNSIGNED_LONG, r, 3, MPI_COMM_WORLD); + + // Send spatial chunk for t2m - spatial boundaries + SpatialChunk chunk_t2m = spatial_chunks_t2m[r - 1]; // r-1 because rank 0 is master + std::cout << "Sending spatial chunk of t2m to rank " << r << std::endl; + MPI_Send(&chunk_t2m.startLat, 1, MPI_UNSIGNED_LONG, r, 4, MPI_COMM_WORLD); + MPI_Send(&chunk_t2m.numLat, 1, MPI_UNSIGNED_LONG, r, 5, MPI_COMM_WORLD); + MPI_Send(&chunk_t2m.startLon, 1, MPI_UNSIGNED_LONG, r, 6, MPI_COMM_WORLD); + MPI_Send(&chunk_t2m.numLon, 1, MPI_UNSIGNED_LONG, r, 7, MPI_COMM_WORLD); + + std::cout << "Rank 0 finished sending chunks" << std::endl; } } @@ -504,14 +571,43 @@ int main(int argc, char** argv) { streamPoint[s] = lat * lon_size + lon; } + // Before adding forcings, we receieve the spatial chunk information + size_t start_lat_pr, num_lat_pr, start_lon_pr, num_lon_pr; + size_t start_lat_t2m, num_lat_t2m, start_lon_t2m, num_lon_t2m; + // Receive spatial chunk for pr + MPI_Recv(&start_lat_pr, 1, MPI_UNSIGNED_LONG, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&num_lat_pr, 1, MPI_UNSIGNED_LONG, 0, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&start_lon_pr, 1, MPI_UNSIGNED_LONG, 0, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&num_lon_pr, 1, MPI_UNSIGNED_LONG, 0, 3, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + // Receive spatial chunk for t2m + MPI_Recv(&start_lat_t2m, 1, MPI_UNSIGNED_LONG, 0, 4, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&num_lat_t2m, 1, MPI_UNSIGNED_LONG, 0, 5, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&start_lon_t2m, 1, MPI_UNSIGNED_LONG, 0, 6, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&num_lon_t2m, 1, MPI_UNSIGNED_LONG, 0, 7, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + // Print the received spatial chunk information + std::cout << "Received spatial chunk for pr: " + << "start_lat=" << start_lat_pr + << ", num_lat=" << num_lat_pr + << ", start_lon=" << start_lon_pr + << ", num_lon=" << num_lon_pr << std::endl; + std::cout << "Received spatial chunk for t2m: " + << "start_lat=" << start_lat_t2m + << ", num_lat=" << num_lat_t2m + << ", start_lon=" << start_lon_t2m + << ", num_lon=" << num_lon_t2m << std::endl; + // ────────── End receiving spatial chunks ────────── + // ─── Adding forcings (first 2 days only) ───────────────────────────────── struct NCForcing { std::string path, var; double dt; // hours per time step + + // add spatial chunking info + size_t start_lat, num_lat, start_lon, num_lon; }; std::vector ncForcings = { - {"../data/pr_hourly_era5land_2019.nc", "pr", 1.0}, - {"../data/t2m_daily_avg_era5land_2019.nc","t2m", 24.0} + {"../data/pr_hourly_era5land_2019.nc", "pr", 1.0, start_lat_pr, num_lat_pr, start_lon_pr, num_lon_pr}, + {"../data/t2m_daily_avg_era5land_2019.nc","t2m", 24.0, start_lat_t2m, num_lat_t2m, start_lon_t2m, num_lon_t2m} }; int nForc = int(ncForcings.size()); @@ -532,12 +628,20 @@ int main(int argc, char** argv) { size_t steps2d = size_t(std::round(daysToLoad * 24.0 / fm.dt)); steps2d = std::min(fullTime, steps2d); - auto raw = loader.loadTimeChunk(0, steps2d); + // auto raw = loader.loadTimeChunk(0, steps2d); + + // instead, load the chunk for the spatial rectangle defined by start_lat, num_lat, start_lon, num_lon + auto raw = loader.loadChunk( + 0, steps2d, + fm.start_lat, fm.num_lat, + fm.start_lon, fm.num_lon + ); h_forc_dt.push_back(fm.dt); h_forc_nT.push_back(steps2d); - size_t gridPts = loader.getLatSize() * loader.getLonSize(); + // size_t gridPts = loader.getLatSize() * loader.getLonSize(); + size_t gridPts = fm.num_lat * fm.num_lon; // HERE should be actual spatial chunk size instead of full domain size float *basePtr = raw.get(); for (size_t t = 0; t < steps2d; ++t) { @@ -793,8 +897,8 @@ int main(int argc, char** argv) { for (int v = 0; v < N_EQ; ++v) state_vals[v] = v; //Netcdf file attributes (will be defined in yaml) - std::string dense_filename = "/scratch/gpfs/am2192/dense_example_rank_"+ std::to_string(rank)+".nc"; - std::string final_filename = "/scratch/gpfs/am2192/final_example_rank_"+ std::to_string(rank)+".nc"; + std::string dense_filename = "/scratch/gpfs/dl0307/dense_example_rank_"+ std::to_string(rank)+".nc"; + std::string final_filename = "/scratch/gpfs/dl0307/final_example_rank_"+ std::to_string(rank)+".nc"; int compression_level = 0; // Write only the final time step (2D output)