diff --git a/src/api/close_server.c b/src/api/close_server.c index 853826980..96f34c58a 100644 --- a/src/api/close_server.c +++ b/src/api/close_server.c @@ -42,11 +42,12 @@ main(int argc, char *argv[]) int rank; double start; MPI_Init(&argc, &argv); - +#endif + pdc = PDCinit("pdc"); +#ifdef ENABLE_MPI MPI_Comm_rank(MPI_COMM_WORLD, &rank); start = MPI_Wtime(); #endif - pdc = PDCinit("pdc"); PDC_Client_close_all_server(); @@ -54,6 +55,7 @@ main(int argc, char *argv[]) LOG_ERROR("Failed to close PDC\n"); #ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); if (!rank) { LOG_INFO("total close time = %lf\n", MPI_Wtime() - start); } diff --git a/src/commons/serde/bulki/bulki.c b/src/commons/serde/bulki/bulki.c index b9d4deaeb..372a784b5 100644 --- a/src/commons/serde/bulki/bulki.c +++ b/src/commons/serde/bulki/bulki.c @@ -624,7 +624,9 @@ BULKI_Entity_free(BULKI_Entity *bulk_entity, int free_struct) for (size_t i = 0; i < bulk_entity->count; i++) { BULKI_free(&bulki_array[i], 0); } +#ifdef BUKLI_DEBUG_LOG LOG_INFO("Freeing bulki_array 1\n"); +#endif bulki_array = NULL; } else if (bulk_entity->pdc_type == PDC_BULKI_ENT && bulk_entity->data != NULL) { @@ -632,7 +634,9 @@ BULKI_Entity_free(BULKI_Entity *bulk_entity, int free_struct) for (size_t i = 0; i < bulk_entity->count; i++) { BULKI_Entity_free(&bulki_entity_array[i], 0); } +#ifdef BUKLI_DEBUG_LOG LOG_INFO("Freeing bulki_array 2\n"); +#endif bulki_entity_array = NULL; } } @@ -640,15 +644,21 @@ BULKI_Entity_free(BULKI_Entity *bulk_entity, int free_struct) if (bulk_entity->pdc_type == PDC_BULKI && bulk_entity->data != NULL) { BULKI_free((BULKI *)bulk_entity->data, 0); bulk_entity->data = NULL; +#ifdef BUKLI_DEBUG_LOG LOG_INFO("Freeing bulki_item 1\n"); +#endif } } +#ifdef BUKLI_DEBUG_LOG LOG_INFO("Freeing bulk_entity\n"); +#endif if (bulk_entity->data != NULL) { +#ifdef BUKLI_DEBUG_LOG LOG_INFO( "bulki_entity->class: %d, bulki_entity->class: %d, bulki_entity->data: %p, bulki_entity: " "%p\n", bulk_entity->pdc_class, bulk_entity->pdc_type, bulk_entity->data, bulk_entity); +#endif bulk_entity->data = (void *)PDC_free(bulk_entity->data); bulk_entity->data = NULL; } diff --git a/src/server/pdc_server.c b/src/server/pdc_server.c index abe35434d..2ec1f06ec 100644 --- a/src/server/pdc_server.c +++ b/src/server/pdc_server.c @@ -74,6 +74,17 @@ rocksdb_t *rocksdb_g; sqlite3 *sqlite3_db_g; #endif +#include "bulki.h" +#include "bulki_serde.h" + +// PDC checkpoint version management +#define PDC_CHECKPOINT_MAGIC_2026 "PDC26.03" // Year 2026, month 03 +#define PDC_CHECKPOINT_MAGIC_2027 "PDC27.01" // Year 2027, month 01 + +#define PDC_CHECKPOINT_MAGIC_LEN 8 // All must be same length +#define PDC_CHECKPOINT_MAGIC_CURRENT PDC_CHECKPOINT_MAGIC_2026 +#define PDC_CHECKPOINT_VERSION_MATCH(m, v) (strncmp((m), (v), 8) == 0) + // Check how long PDC has run every OP_INTERVAL operations #define PDC_CHECKPOINT_CHK_OP_INTERVAL 2000 // Checkpoint every INTERVAL_SEC second and at least OP_INTERVAL operations @@ -946,7 +957,7 @@ PDC_Server_init(int port, hg_class_t **hg_class, hg_context_t **hg_context) } else { // We are starting a brand new server - transfer_request_metadata_query_init(pdc_server_size_g, NULL); + transfer_request_metadata_query_init_bulki(pdc_server_size_g, NULL); if (is_hash_table_init_g != 1) { ret_value = PDC_Server_init_hash_table(); if (ret_value != SUCCEED) @@ -1191,12 +1202,13 @@ PDC_Server_checkpoint() HashTablePair pair; char checkpoint_file[ADDR_MAX], checkpoint_file_local[ADDR_MAX], cmd[4096]; HashTableIterator hash_table_iter; - char * checkpoint; char * env_char; - uint64_t checkpoint_size; bool use_tmpfs = false; FILE * file; + // BULKI-specific variables + BULKI *checkpoint_bulki = NULL; + #ifdef PDC_TIMING // Timing struct timeval pdc_timer_start; @@ -1223,31 +1235,42 @@ PDC_Server_checkpoint() if (pdc_server_rank_g == 0) LOG_INFO("Checkpoint file [%s]\n", checkpoint_file); - if (use_tmpfs) - file = fopen(checkpoint_file_local, "w+"); - else - file = fopen(checkpoint_file, "w+"); + // initialize BULKI structure - estimate initial capacity + checkpoint_bulki = BULKI_init(5); - if (file == NULL) - PGOTO_ERROR(FAIL, "Checkpoint file open error"); + // BULKI version number for validation + BULKI_put(checkpoint_bulki, BULKI_singleton_ENTITY("version_number", PDC_STRING), + BULKI_ENTITY(PDC_CHECKPOINT_MAGIC_CURRENT, 1, PDC_STRING, PDC_CLS_ITEM)); - // Checkpoint containers + // checkpoint containers n_entry = hash_table_num_entries(container_hash_table_g); - fwrite(&n_entry, sizeof(int), 1, file); + + BULKI_Entity *containers_array = empty_BULKI_Array_Entity(); hash_table_iterate(container_hash_table_g, &hash_table_iter); while (n_entry != 0 && hash_table_iter_has_more(&hash_table_iter)) { pair = hash_table_iter_next(&hash_table_iter); cont_head = pair.value; + BULKI *container_entry = BULKI_init(2); + hash_key = PDC_get_hash_by_name(cont_head->cont_name); - fwrite(&hash_key, sizeof(uint32_t), 1, file); - fwrite(cont_head, sizeof(pdc_cont_hash_table_entry_t), 1, file); + BULKI_put(container_entry, BULKI_singleton_ENTITY("hash_key", PDC_STRING), + BULKI_ENTITY(&hash_key, 1, PDC_UINT32, PDC_CLS_ITEM)); + + // store the container structure as binary blob + BULKI_put(container_entry, BULKI_singleton_ENTITY("cont_data", PDC_STRING), + BULKI_ENTITY(cont_head, sizeof(pdc_cont_hash_table_entry_t), PDC_UINT8, PDC_CLS_ARRAY)); + + BULKI_ENTITY_append_BULKI(containers_array, container_entry); } - // DHT + BULKI_put(checkpoint_bulki, BULKI_singleton_ENTITY("containers", PDC_STRING), containers_array); + + // checkpoint metadata hash table n_entry = hash_table_num_entries(metadata_hash_table_g); - fwrite(&n_entry, sizeof(int), 1, file); + + BULKI_Entity *metadata_entries_array = empty_BULKI_Array_Entity(); hash_table_iterate(metadata_hash_table_g, &hash_table_iter); @@ -1255,82 +1278,185 @@ PDC_Server_checkpoint() pair = hash_table_iter_next(&hash_table_iter); head = pair.value; - fwrite(&head->n_obj, sizeof(int), 1, file); + BULKI *hash_entry = BULKI_init(3); + + // Store number of objects + BULKI_put(hash_entry, BULKI_singleton_ENTITY("n_obj", PDC_STRING), + BULKI_ENTITY(&head->n_obj, 1, PDC_INT, PDC_CLS_ITEM)); + hash_key = PDC_get_hash_by_name(head->metadata->obj_name); - fwrite(&hash_key, sizeof(uint32_t), 1, file); + BULKI_put(hash_entry, BULKI_singleton_ENTITY("hash_key", PDC_STRING), + BULKI_ENTITY(&hash_key, 1, PDC_UINT32, PDC_CLS_ITEM)); - // Iterate every metadata structure in current entry + // array of metadata objects + BULKI_Entity *metadata_objs_array = empty_BULKI_Array_Entity(); + + // iterate every metadata structure in current entry DL_FOREACH(head->metadata, elt) { - // Write entire metadata structure - fwrite(elt, sizeof(pdc_metadata_t), 1, file); + BULKI *metadata_obj = BULKI_init(3); + + // store metadata structure + BULKI_put(metadata_obj, BULKI_singleton_ENTITY("metadata", PDC_STRING), + BULKI_ENTITY(elt, sizeof(pdc_metadata_t), PDC_UINT8, PDC_CLS_ARRAY)); - // Write kv tags + // kv tags DL_COUNT(elt->kvtag_list_head, kvlist_elt, n_kvtag); - fwrite(&n_kvtag, sizeof(int), 1, file); + + BULKI_Entity *kvtags_array = empty_BULKI_Array_Entity(); DL_FOREACH(elt->kvtag_list_head, kvlist_elt) { - key_len = strlen(kvlist_elt->kvtag->name) + 1; - fwrite(&key_len, sizeof(int), 1, file); - fwrite(kvlist_elt->kvtag->name, key_len, 1, file); - fwrite(&kvlist_elt->kvtag->size, sizeof(uint32_t), 1, file); - fwrite(&kvlist_elt->kvtag->type, sizeof(int8_t), 1, file); - fwrite(kvlist_elt->kvtag->value, kvlist_elt->kvtag->size, 1, file); + BULKI *kvtag_entry = BULKI_init(4); + + BULKI_put(kvtag_entry, BULKI_singleton_ENTITY("key", PDC_STRING), + BULKI_singleton_ENTITY(kvlist_elt->kvtag->name, PDC_STRING)); + + BULKI_put(kvtag_entry, BULKI_singleton_ENTITY("size", PDC_STRING), + BULKI_ENTITY(&kvlist_elt->kvtag->size, 1, PDC_UINT32, PDC_CLS_ITEM)); + + BULKI_put(kvtag_entry, BULKI_singleton_ENTITY("type", PDC_STRING), + BULKI_ENTITY(&kvlist_elt->kvtag->type, 1, PDC_INT8, PDC_CLS_ITEM)); + + BULKI_put(kvtag_entry, BULKI_singleton_ENTITY("value", PDC_STRING), + BULKI_ENTITY(kvlist_elt->kvtag->value, kvlist_elt->kvtag->size, PDC_UINT8, + PDC_CLS_ARRAY)); + + BULKI_ENTITY_append_BULKI(kvtags_array, kvtag_entry); } - // Write region info + BULKI_put(metadata_obj, BULKI_singleton_ENTITY("kvtags", PDC_STRING), kvtags_array); + + // storage regions n_region = 0; DL_COUNT(elt->storage_region_list_head, region_elt, n_region); - fwrite(&n_region, sizeof(int), 1, file); + + BULKI_Entity *regions_array = empty_BULKI_Array_Entity(); if (n_region > 0) { n_write_region = 0; DL_FOREACH(elt->storage_region_list_head, region_elt) { - fwrite(region_elt, sizeof(region_list_t), 1, file); - n_write_region++; + BULKI *region_entry = BULKI_init(3); + + // store region structure + BULKI_put(region_entry, BULKI_singleton_ENTITY("region", PDC_STRING), + BULKI_ENTITY(region_elt, sizeof(region_list_t), PDC_UINT8, PDC_CLS_ARRAY)); + + // store histogram if present int has_hist = 0; if (region_elt->region_hist != NULL) has_hist = 1; - fwrite(&has_hist, sizeof(int), 1, file); + + BULKI_put(region_entry, BULKI_singleton_ENTITY("has_hist", PDC_STRING), + BULKI_ENTITY(&has_hist, 1, PDC_INT, PDC_CLS_ITEM)); + if (has_hist == 1) { - fwrite(®ion_elt->region_hist->dtype, sizeof(int), 1, file); - fwrite(®ion_elt->region_hist->nbin, sizeof(int), 1, file); - fwrite(region_elt->region_hist->range, sizeof(double), - region_elt->region_hist->nbin * 2, file); - fwrite(region_elt->region_hist->bin, sizeof(uint64_t), region_elt->region_hist->nbin, - file); - fwrite(®ion_elt->region_hist->incr, sizeof(double), 1, file); + BULKI *histogram = BULKI_init(5); + + BULKI_put(histogram, BULKI_singleton_ENTITY("dtype", PDC_STRING), + BULKI_ENTITY(®ion_elt->region_hist->dtype, 1, PDC_INT, PDC_CLS_ITEM)); + + BULKI_put(histogram, BULKI_singleton_ENTITY("nbin", PDC_STRING), + BULKI_ENTITY(®ion_elt->region_hist->nbin, 1, PDC_INT, PDC_CLS_ITEM)); + + BULKI_put(histogram, BULKI_singleton_ENTITY("range", PDC_STRING), + BULKI_ENTITY(region_elt->region_hist->range, + region_elt->region_hist->nbin * 2, PDC_DOUBLE, PDC_CLS_ARRAY)); + + BULKI_put(histogram, BULKI_singleton_ENTITY("bin", PDC_STRING), + BULKI_ENTITY(region_elt->region_hist->bin, region_elt->region_hist->nbin, + PDC_UINT64, PDC_CLS_ARRAY)); + + BULKI_put(histogram, BULKI_singleton_ENTITY("incr", PDC_STRING), + BULKI_ENTITY(®ion_elt->region_hist->incr, 1, PDC_DOUBLE, PDC_CLS_ITEM)); + + BULKI_put(region_entry, BULKI_singleton_ENTITY("histogram", PDC_STRING), + BULKI_ENTITY(histogram, 1, PDC_BULKI, PDC_CLS_ITEM)); } + + BULKI_ENTITY_append_BULKI(regions_array, region_entry); + n_write_region++; } if (n_write_region != n_region) LOG_ERROR("Error with number of regions\n"); } + + BULKI_put(metadata_obj, BULKI_singleton_ENTITY("regions", PDC_STRING), regions_array); + + BULKI_ENTITY_append_BULKI(metadata_objs_array, metadata_obj); + metadata_size++; region_count += n_region; - } // End for metadata entry linked list - } // End for hash table metadata entry + } // end for metadata entry linked list + + BULKI_put(hash_entry, BULKI_singleton_ENTITY("metadata_objects", PDC_STRING), metadata_objs_array); + + BULKI_ENTITY_append_BULKI(metadata_entries_array, hash_entry); + } // end for hash table metadata entry - // Note data server region are managed by data server instead of metadata server + BULKI_put(checkpoint_bulki, BULKI_singleton_ENTITY("metadata_entries", PDC_STRING), + metadata_entries_array); + + // data server regions data_server_region_t *region = NULL; DL_COUNT(dataserver_region_g, region, n_objs); - fwrite(&n_objs, sizeof(int), 1, file); + + BULKI_Entity *dataserver_regions_array = empty_BULKI_Array_Entity(); + DL_FOREACH(dataserver_region_g, region) { - fwrite(®ion->obj_id, sizeof(uint64_t), 1, file); + BULKI *dataserver_obj = BULKI_init(2); + + BULKI_put(dataserver_obj, BULKI_singleton_ENTITY("obj_id", PDC_STRING), + BULKI_ENTITY(®ion->obj_id, 1, PDC_UINT64, PDC_CLS_ITEM)); + DL_COUNT(region->region_storage_head, region_elt, n_region); - fwrite(&n_region, sizeof(int), 1, file); + + BULKI_Entity *ds_regions_array = empty_BULKI_Array_Entity(); DL_FOREACH(region->region_storage_head, region_elt) { - fwrite(region_elt, sizeof(region_list_t), 1, file); + BULKI *ds_region = BULKI_init(1); + BULKI_put(ds_region, BULKI_singleton_ENTITY("region", PDC_STRING), + BULKI_ENTITY(region_elt, sizeof(region_list_t), PDC_UINT8, PDC_CLS_ARRAY)); + + BULKI_ENTITY_append_BULKI(ds_regions_array, ds_region); } + + BULKI_put(dataserver_obj, BULKI_singleton_ENTITY("regions", PDC_STRING), ds_regions_array); + + BULKI_ENTITY_append_BULKI(dataserver_regions_array, dataserver_obj); + } + + BULKI_put(checkpoint_bulki, BULKI_singleton_ENTITY("dataserver_regions", PDC_STRING), + dataserver_regions_array); + + // transfer request metadata query + BULKI *transfer_query_bulki = NULL; + ret_value = transfer_request_metadata_query_checkpoint_bulki(&transfer_query_bulki); + if (ret_value != SUCCEED || transfer_query_bulki == NULL) { + LOG_ERROR("Failed to create transfer query checkpoint\n"); + PGOTO_ERROR(FAIL, "Transfer query checkpoint failed"); } - transfer_request_metadata_query_checkpoint(&checkpoint, &checkpoint_size); - fwrite(&checkpoint_size, sizeof(uint64_t), 1, file); - fwrite(checkpoint, checkpoint_size, 1, file); + BULKI_put(checkpoint_bulki, BULKI_singleton_ENTITY("transfer_query", PDC_STRING), + BULKI_ENTITY(transfer_query_bulki, 1, PDC_BULKI, PDC_CLS_ITEM)); - fclose(file); + // ========== Serialize and Write ========== + if (use_tmpfs) + file = fopen(checkpoint_file_local, "wb"); + else + file = fopen(checkpoint_file, "wb"); + + if (file == NULL) + PGOTO_ERROR(FAIL, "Checkpoint file open error"); + + // note: BULKI_Entity_serialize_to_file() still do in-memory serialization + // note: need to implement a streaming file writing without in-memory serialization + BULKI_serialize_to_file(checkpoint_bulki, file); + file = NULL; // file was closed by BULKI_serialize_to_file() + + // Clean up + BULKI_free(checkpoint_bulki, 1); if (use_tmpfs) { #ifdef PDC_TIMING @@ -1365,7 +1491,7 @@ PDC_Server_checkpoint() checkpoint_time = PDC_get_elapsed_time_double(&pdc_timer_start, &pdc_timer_end); if (pdc_server_rank_g == 0) - LOG_ERROR("Rank[ ALL]: Total checkpoint time = %.6f\n", checkpoint_time); + LOG_INFO("Rank[ ALL]: Total checkpoint time = %.6f\n", checkpoint_time); #endif if (pdc_server_rank_g == 0) { @@ -1402,274 +1528,412 @@ PDC_Server_restart(char *filename) { FUNC_ENTER(NULL); - perr_t ret_value = SUCCEED; - int n_entry, count, i, j, nobj = 0, all_nobj = 0, all_n_region, n_region, n_objs, total_region = 0, - n_kvtag, key_len; - int n_cont, all_cont; + perr_t ret_value = SUCCEED; + int i, j, nobj = 0, all_nobj = 0, all_n_region, total_region = 0; + int all_cont; pdc_metadata_t * metadata, *elt; region_list_t * region_list; pdc_hash_table_entry_head * entry; pdc_cont_hash_table_entry_t *cont_entry; uint32_t * hash_key; unsigned idx; - uint64_t checkpoint_size; - char * checkpoint_buf; -#ifdef PDC_TIMING + + // BULKI-specific variables + BULKI * checkpoint_bulki = NULL; + BULKI_Entity *containers_array = NULL; + BULKI_Entity *metadata_entries_array = NULL; + BULKI_Entity *dataserver_regions_array = NULL; + char magic[PDC_CHECKPOINT_MAGIC_LEN + 1]; + FILE * file = NULL; + +#if defined(PDC_TIMING) || defined(ENABLE_MPI) double start = MPI_Wtime(); #endif // init hash table ret_value = PDC_Server_init_hash_table(); if (ret_value != SUCCEED) - PGOTO_ERROR(FAIL, "Error wtih PDC_Server_init_hash_table"); + PGOTO_ERROR(FAIL, "Error with PDC_Server_init_hash_table"); - FILE *file = fopen(filename, "r"); - if (file == NULL) + // open file + file = fopen(filename, "rb"); + if (file == NULL) { PGOTO_ERROR(FAIL, "Error with fopen, filename: [%s]", filename); - - if (fread(&n_cont, sizeof(int), 1, file) != 1) { - LOG_ERROR("Read failed for n_count\n"); } - all_cont = n_cont; - while (n_cont > 0) { - hash_key = (uint32_t *)PDC_malloc(sizeof(uint32_t)); - if (fread(hash_key, sizeof(uint32_t), 1, file) != 1) { - LOG_ERROR("Read failed for hash_key\n"); - } - total_mem_usage_g += sizeof(uint32_t); - - // Reconstruct hash table - cont_entry = (pdc_cont_hash_table_entry_t *)PDC_malloc(sizeof(pdc_cont_hash_table_entry_t)); - total_mem_usage_g += sizeof(pdc_cont_hash_table_entry_t); - if (fread(cont_entry, sizeof(pdc_cont_hash_table_entry_t), 1, file) != 1) { - LOG_ERROR("Read failed for cont_entry\n"); - } -#ifdef ENABLE_MULTITHREAD - hg_thread_mutex_lock(&pdc_container_hash_table_mutex_g); -#endif - if (hash_table_insert(container_hash_table_g, hash_key, cont_entry) != 1) { - LOG_ERROR("Hash table insert failed\n"); - ret_value = FAIL; - } -#ifdef ENABLE_MULTITHREAD - hg_thread_mutex_unlock(&pdc_container_hash_table_mutex_g); -#endif + // note: BULKI_deserialize_from_file will close the file + checkpoint_bulki = BULKI_deserialize_from_file(file); + file = NULL; // File was closed by BULKI_deserialize_from_file - n_cont--; - } // End while + if (checkpoint_bulki == NULL) { + LOG_ERROR("Failed to deserialize checkpoint file\n"); + PGOTO_ERROR(FAIL, "Deserialization failed"); + } - if (fread(&n_entry, sizeof(int), 1, file) != 1) { - LOG_ERROR("Read failed for n_entry\n"); + // read and validate bulki version + BULKI_Entity *version_entity = + BULKI_get(checkpoint_bulki, BULKI_singleton_ENTITY("version_number", PDC_STRING)); + int equal = BULKI_Entity_equal(version_entity, + BULKI_ENTITY(PDC_CHECKPOINT_MAGIC_CURRENT, 1, PDC_STRING, PDC_CLS_ITEM)); + if (!equal) { + LOG_ERROR("Checkpoint version mismatch: expected '%s', found '%s'\n", PDC_CHECKPOINT_MAGIC_CURRENT, + version_entity ? (char *)version_entity->data : "NULL"); + PGOTO_ERROR(FAIL, "Checkpoint version mismatch"); } - while (n_entry > 0) { - if (fread(&count, sizeof(int), 1, file) != 1) { - LOG_ERROR("Read failed for count\n"); - } - hash_key = (uint32_t *)PDC_malloc(sizeof(uint32_t)); - if (fread(hash_key, sizeof(uint32_t), 1, file) != 1) { - LOG_ERROR("Read failed for hash_key\n"); - } - total_mem_usage_g += sizeof(uint32_t); - - // Reconstruct hash table - entry = (pdc_hash_table_entry_head *)PDC_malloc(sizeof(pdc_hash_table_entry_head)); - entry->n_obj = 0; - entry->bloom = NULL; - entry->metadata = NULL; - // Init hash table metadata (w/ bloom) with first obj - PDC_Server_hash_table_list_init(entry, hash_key); - - metadata = (pdc_metadata_t *)PDC_calloc(sizeof(pdc_metadata_t), count); - for (i = 0; i < count; i++) { - if (fread(metadata + i, sizeof(pdc_metadata_t), 1, file) != 1) { - LOG_ERROR("Read failed for metadata\n"); - } + // restore containers + containers_array = BULKI_get(checkpoint_bulki, BULKI_singleton_ENTITY("containers", PDC_STRING)); - (metadata + i)->storage_region_list_head = NULL; - (metadata + i)->region_lock_head = NULL; - (metadata + i)->region_map_head = NULL; - (metadata + i)->region_buf_map_head = NULL; - (metadata + i)->bloom = NULL; - (metadata + i)->prev = NULL; - (metadata + i)->next = NULL; - (metadata + i)->kvtag_list_head = NULL; - (metadata + i)->all_storage_region_distributed = 0; - - // Read kv tags - if (fread(&n_kvtag, sizeof(int), 1, file) != 1) { - LOG_ERROR("Read failed for n_kvtag\n"); - } - for (j = 0; j < n_kvtag; j++) { - pdc_kvtag_list_t *kvtag_list = (pdc_kvtag_list_t *)PDC_calloc(1, sizeof(pdc_kvtag_list_t)); - kvtag_list->kvtag = (pdc_kvtag_t *)PDC_malloc(sizeof(pdc_kvtag_t)); - if (fread(&key_len, sizeof(int), 1, file) != 1) { - LOG_ERROR("Read failed for key_len\n"); - } - kvtag_list->kvtag->name = PDC_malloc(key_len); - if (fread((void *)(kvtag_list->kvtag->name), key_len, 1, file) != 1) { - LOG_ERROR("Read failed for kvtag_list->kvtag->name\n"); - } - if (fread(&kvtag_list->kvtag->size, sizeof(uint32_t), 1, file) != 1) { - LOG_ERROR("Read failed for kvtag_list->kvtag->size\n"); - } - if (fread(&kvtag_list->kvtag->type, sizeof(int8_t), 1, file) != 1) { - LOG_ERROR("Read failed for kvtag_list->kvtag->type\n"); - } - kvtag_list->kvtag->value = PDC_malloc(kvtag_list->kvtag->size); - if (fread(kvtag_list->kvtag->value, kvtag_list->kvtag->size, 1, file) != 1) { - LOG_ERROR("Read failed for kvtag_list->kvtag->value\n"); - } - DL_APPEND((metadata + i)->kvtag_list_head, kvtag_list); - } + if (containers_array != NULL && containers_array->pdc_type == PDC_BULKI) { + BULKI_Entity_Iterator *cont_iter = Bent_iterator_init(containers_array, NULL, PDC_BULKI); + all_cont = containers_array->count; - if (fread(&n_region, sizeof(int), 1, file) != 1) { - LOG_ERROR("Read failed for n_region\n"); - } - if (n_region < 0) - PGOTO_ERROR(FAIL, "Checkpoint file region was less than 0"); + while (Bent_iterator_has_next_BULKI(cont_iter)) { + BULKI *container_entry = Bent_iterator_next_BULKI(cont_iter); - /* if (n_region == 0) */ - /* continue; */ + // Extract hash key + BULKI_Entity *hash_key_ent = + BULKI_get(container_entry, BULKI_singleton_ENTITY("hash_key", PDC_STRING)); + hash_key = (uint32_t *)PDC_malloc(sizeof(uint32_t)); + memcpy(hash_key, hash_key_ent->data, sizeof(uint32_t)); + total_mem_usage_g += sizeof(uint32_t); - total_region += n_region; + // Extract container data + BULKI_Entity *cont_data_ent = + BULKI_get(container_entry, BULKI_singleton_ENTITY("cont_data", PDC_STRING)); + cont_entry = (pdc_cont_hash_table_entry_t *)PDC_malloc(sizeof(pdc_cont_hash_table_entry_t)); + memcpy(cont_entry, cont_data_ent->data, sizeof(pdc_cont_hash_table_entry_t)); + total_mem_usage_g += sizeof(pdc_cont_hash_table_entry_t); - for (j = 0; j < n_region; j++) { - region_list = (region_list_t *)PDC_malloc(sizeof(region_list_t)); - if (fread(region_list, sizeof(region_list_t), 1, file) != 1) { - LOG_ERROR("Read failed for region_list\n"); - } +#ifdef ENABLE_MULTITHREAD + hg_thread_mutex_lock(&pdc_container_hash_table_mutex_g); +#endif + if (hash_table_insert(container_hash_table_g, hash_key, cont_entry) != 1) { + LOG_ERROR("Hash table insert failed\n"); + ret_value = FAIL; + } +#ifdef ENABLE_MULTITHREAD + hg_thread_mutex_unlock(&pdc_container_hash_table_mutex_g); +#endif + } + } - int has_hist = 0; - if (fread(&has_hist, sizeof(int), 1, file) != 1) { - LOG_ERROR("Read failed for has_list\n"); - } - if (has_hist == 1) { - region_list->region_hist = (pdc_histogram_t *)PDC_malloc(sizeof(pdc_histogram_t)); - if (fread(®ion_list->region_hist->dtype, sizeof(int), 1, file) != 1) { - LOG_ERROR("Read failed for region_list->region_hist->dtype\n"); - } - if (fread(®ion_list->region_hist->nbin, sizeof(int), 1, file) != 1) { - LOG_ERROR("Read failed for region_list->region_hist->nbin\n"); + // restore metadata + metadata_entries_array = + BULKI_get(checkpoint_bulki, BULKI_singleton_ENTITY("metadata_entries", PDC_STRING)); + + if (metadata_entries_array != NULL && metadata_entries_array->pdc_type == PDC_BULKI) { + BULKI_Entity_Iterator *entry_iter = Bent_iterator_init(metadata_entries_array, NULL, PDC_BULKI); + + while (Bent_iterator_has_next_BULKI(entry_iter)) { + BULKI *hash_entry = Bent_iterator_next_BULKI(entry_iter); + + // extract n_obj + BULKI_Entity *n_obj_ent = BULKI_get(hash_entry, BULKI_singleton_ENTITY("n_obj", PDC_STRING)); + int count; + memcpy(&count, n_obj_ent->data, sizeof(int)); + + // extract hash key + BULKI_Entity *hash_key_ent = + BULKI_get(hash_entry, BULKI_singleton_ENTITY("hash_key", PDC_STRING)); + hash_key = (uint32_t *)PDC_malloc(sizeof(uint32_t)); + memcpy(hash_key, hash_key_ent->data, sizeof(uint32_t)); + total_mem_usage_g += sizeof(uint32_t); + + // reconstruct hash table entry + entry = (pdc_hash_table_entry_head *)PDC_malloc(sizeof(pdc_hash_table_entry_head)); + entry->n_obj = 0; + entry->bloom = NULL; + entry->metadata = NULL; + PDC_Server_hash_table_list_init(entry, hash_key); + + metadata = (pdc_metadata_t *)PDC_calloc(sizeof(pdc_metadata_t), count); + + // extract metadata objects array + BULKI_Entity *metadata_objs_array = + BULKI_get(hash_entry, BULKI_singleton_ENTITY("metadata_objects", PDC_STRING)); + + if (metadata_objs_array != NULL && metadata_objs_array->pdc_type == PDC_BULKI) { + BULKI_Entity_Iterator *obj_iter = Bent_iterator_init(metadata_objs_array, NULL, PDC_BULKI); + i = 0; + + while (Bent_iterator_has_next_BULKI(obj_iter) && i < count) { + BULKI *metadata_obj = Bent_iterator_next_BULKI(obj_iter); + + // extract metadata structure + BULKI_Entity *metadata_ent = + BULKI_get(metadata_obj, BULKI_singleton_ENTITY("metadata", PDC_STRING)); + memcpy(metadata + i, metadata_ent->data, sizeof(pdc_metadata_t)); + + // initialize pointers + (metadata + i)->storage_region_list_head = NULL; + (metadata + i)->region_lock_head = NULL; + (metadata + i)->region_map_head = NULL; + (metadata + i)->region_buf_map_head = NULL; + (metadata + i)->bloom = NULL; + (metadata + i)->prev = NULL; + (metadata + i)->next = NULL; + (metadata + i)->kvtag_list_head = NULL; + (metadata + i)->all_storage_region_distributed = 0; + + // restore kv tags + BULKI_Entity *kvtags_array = + BULKI_get(metadata_obj, BULKI_singleton_ENTITY("kvtags", PDC_STRING)); + + if (kvtags_array != NULL && kvtags_array->pdc_type == PDC_BULKI) { + BULKI_Entity_Iterator *kvtag_iter = Bent_iterator_init(kvtags_array, NULL, PDC_BULKI); + + while (Bent_iterator_has_next_BULKI(kvtag_iter)) { + BULKI *kvtag_entry = Bent_iterator_next_BULKI(kvtag_iter); + + pdc_kvtag_list_t *kvtag_list = + (pdc_kvtag_list_t *)PDC_calloc(1, sizeof(pdc_kvtag_list_t)); + kvtag_list->kvtag = (pdc_kvtag_t *)PDC_malloc(sizeof(pdc_kvtag_t)); + + // extract key + BULKI_Entity *key_ent = + BULKI_get(kvtag_entry, BULKI_singleton_ENTITY("key", PDC_STRING)); + int key_len = strlen((char *)key_ent->data) + 1; + kvtag_list->kvtag->name = PDC_malloc(key_len); + memcpy(kvtag_list->kvtag->name, key_ent->data, key_len); + + // extract size + BULKI_Entity *size_ent = + BULKI_get(kvtag_entry, BULKI_singleton_ENTITY("size", PDC_STRING)); + memcpy(&kvtag_list->kvtag->size, size_ent->data, sizeof(uint32_t)); + + // extract type + BULKI_Entity *type_ent = + BULKI_get(kvtag_entry, BULKI_singleton_ENTITY("type", PDC_STRING)); + memcpy(&kvtag_list->kvtag->type, type_ent->data, sizeof(int8_t)); + + // extract value + BULKI_Entity *value_ent = + BULKI_get(kvtag_entry, BULKI_singleton_ENTITY("value", PDC_STRING)); + kvtag_list->kvtag->value = PDC_malloc(kvtag_list->kvtag->size); + memcpy(kvtag_list->kvtag->value, value_ent->data, kvtag_list->kvtag->size); + + DL_APPEND((metadata + i)->kvtag_list_head, kvtag_list); + } } - if (region_list->region_hist->nbin == 0) { - LOG_ERROR("Checkpoint file histogram size is 0\n"); - } - - region_list->region_hist->range = - (double *)PDC_malloc(sizeof(double) * region_list->region_hist->nbin * 2); - region_list->region_hist->bin = - (uint64_t *)PDC_malloc(sizeof(uint64_t) * region_list->region_hist->nbin); - if (fread(region_list->region_hist->range, sizeof(double), - region_list->region_hist->nbin * 2, file) != 1) { - LOG_ERROR("Read failed for region_list->region_hist->range\n"); - } - if (fread(region_list->region_hist->bin, sizeof(uint64_t), region_list->region_hist->nbin, - file) != 1) { - LOG_ERROR("Read failed for region_list->region_hist->bin\n"); - } - if (fread(®ion_list->region_hist->incr, sizeof(double), 1, file) != 1) { - LOG_ERROR("Read failed for region_list->region_hist->incr\n"); + // restore storage regions + BULKI_Entity *regions_array = + BULKI_get(metadata_obj, BULKI_singleton_ENTITY("regions", PDC_STRING)); + + int n_region = 0; + if (regions_array != NULL && regions_array->pdc_type == PDC_BULKI) { + n_region = regions_array->count; + BULKI_Entity_Iterator *region_iter = + Bent_iterator_init(regions_array, NULL, PDC_BULKI); + + while (Bent_iterator_has_next_BULKI(region_iter)) { + BULKI *region_entry = Bent_iterator_next_BULKI(region_iter); + + region_list = (region_list_t *)PDC_malloc(sizeof(region_list_t)); + + // extract region structure + BULKI_Entity *region_ent = + BULKI_get(region_entry, BULKI_singleton_ENTITY("region", PDC_STRING)); + memcpy(region_list, region_ent->data, sizeof(region_list_t)); + + // extract histogram flag + BULKI_Entity *has_hist_ent = + BULKI_get(region_entry, BULKI_singleton_ENTITY("has_hist", PDC_STRING)); + int has_hist; + memcpy(&has_hist, has_hist_ent->data, sizeof(int)); + + if (has_hist == 1) { + BULKI_Entity *histogram_ent = + BULKI_get(region_entry, BULKI_singleton_ENTITY("histogram", PDC_STRING)); + + if (histogram_ent != NULL && histogram_ent->pdc_type == PDC_BULKI) { + BULKI *histogram = (BULKI *)histogram_ent->data; + + region_list->region_hist = + (pdc_histogram_t *)PDC_malloc(sizeof(pdc_histogram_t)); + + BULKI_Entity *dtype_ent = + BULKI_get(histogram, BULKI_singleton_ENTITY("dtype", PDC_STRING)); + memcpy(®ion_list->region_hist->dtype, dtype_ent->data, sizeof(int)); + + BULKI_Entity *nbin_ent = + BULKI_get(histogram, BULKI_singleton_ENTITY("nbin", PDC_STRING)); + memcpy(®ion_list->region_hist->nbin, nbin_ent->data, sizeof(int)); + + if (region_list->region_hist->nbin == 0) { + LOG_ERROR("Checkpoint file histogram size is 0\n"); + } + + BULKI_Entity *range_ent = + BULKI_get(histogram, BULKI_singleton_ENTITY("range", PDC_STRING)); + region_list->region_hist->range = (double *)PDC_malloc( + sizeof(double) * region_list->region_hist->nbin * 2); + memcpy(region_list->region_hist->range, range_ent->data, + sizeof(double) * region_list->region_hist->nbin * 2); + + BULKI_Entity *bin_ent = + BULKI_get(histogram, BULKI_singleton_ENTITY("bin", PDC_STRING)); + region_list->region_hist->bin = (uint64_t *)PDC_malloc( + sizeof(uint64_t) * region_list->region_hist->nbin); + memcpy(region_list->region_hist->bin, bin_ent->data, + sizeof(uint64_t) * region_list->region_hist->nbin); + + BULKI_Entity *incr_ent = + BULKI_get(histogram, BULKI_singleton_ENTITY("incr", PDC_STRING)); + memcpy(®ion_list->region_hist->incr, incr_ent->data, sizeof(double)); + } + } + + // initialize region_list fields + region_list->buf = NULL; + region_list->data_size = 1; + for (idx = 0; idx < region_list->ndim; idx++) + region_list->data_size *= region_list->count[idx]; + region_list->is_data_ready = 0; + region_list->shm_fd = 0; + region_list->meta = (metadata + i); + region_list->prev = NULL; + region_list->next = NULL; + region_list->overlap_storage_regions = NULL; + region_list->n_overlap_storage_region = 0; + hg_atomic_init32(&(region_list->buf_map_refcount), 0); + region_list->reg_dirty_from_buf = 0; + region_list->access_type = PDC_NA; + region_list->bulk_handle = NULL; + region_list->lock_handle = NULL; + region_list->addr = NULL; + region_list->obj_id = (metadata + i)->obj_id; + region_list->reg_id = 0; + region_list->from_obj_id = 0; + region_list->client_id = 0; + region_list->is_io_done = 0; + region_list->is_shm_closed = 0; + region_list->seq_id = 0; + region_list->sent_to_server = 0; + region_list->io_cache_region = NULL; + + memset(region_list->shm_addr, 0, ADDR_MAX); + memset(region_list->client_ids, 0, + PDC_SERVER_MAX_PROC_PER_NODE * sizeof(uint32_t)); + + if (strstr(region_list->storage_location, "/global/cscratch") != NULL) { + region_list->data_loc_type = PDC_LUSTRE; + } + + DL_APPEND((metadata + i)->storage_region_list_head, region_list); + } } - } - - region_list->buf = NULL; - region_list->data_size = 1; - for (idx = 0; idx < region_list->ndim; idx++) - region_list->data_size *= region_list->count[idx]; - region_list->is_data_ready = 0; - region_list->shm_fd = 0; - region_list->meta = (metadata + i); - region_list->prev = NULL; - region_list->next = NULL; - region_list->overlap_storage_regions = NULL; - region_list->n_overlap_storage_region = 0; - hg_atomic_init32(&(region_list->buf_map_refcount), 0); - region_list->reg_dirty_from_buf = 0; - region_list->access_type = PDC_NA; - region_list->bulk_handle = NULL; - region_list->lock_handle = NULL; - region_list->addr = NULL; - region_list->obj_id = (metadata + i)->obj_id; - region_list->reg_id = 0; - region_list->from_obj_id = 0; - region_list->client_id = 0; - region_list->is_io_done = 0; - region_list->is_shm_closed = 0; - region_list->seq_id = 0; - region_list->sent_to_server = 0; - region_list->io_cache_region = NULL; - - memset(region_list->shm_addr, 0, ADDR_MAX); - memset(region_list->client_ids, 0, PDC_SERVER_MAX_PROC_PER_NODE * sizeof(uint32_t)); - - if (strstr(region_list->storage_location, "/global/cscratch") != NULL) { - region_list->data_loc_type = PDC_LUSTRE; - } - DL_APPEND((metadata + i)->storage_region_list_head, region_list); - } // For j - total_region += n_region; + total_region += n_region; + DL_SORT((metadata + i)->storage_region_list_head, region_cmp); - DL_SORT((metadata + i)->storage_region_list_head, region_cmp); - } // For i + i++; + } + } - nobj += count; - total_mem_usage_g += sizeof(pdc_hash_table_entry_head); - total_mem_usage_g += (sizeof(pdc_metadata_t) * count); + nobj += count; + total_mem_usage_g += sizeof(pdc_hash_table_entry_head); + total_mem_usage_g += (sizeof(pdc_metadata_t) * count); - entry->metadata = NULL; + entry->metadata = NULL; - // Insert the previously read metadata to the linked list (hash table entry) - for (i = 0; i < count; i++) { - elt = metadata + i; - // Add to hash list and bloom filter - ret_value = PDC_Server_hash_table_list_insert(entry, elt); - if (ret_value != SUCCEED) - PGOTO_ERROR(FAIL, "Error with hash table recovering from checkpoint file"); + // insert metadata to hash table + for (i = 0; i < count; i++) { + elt = metadata + i; + ret_value = PDC_Server_hash_table_list_insert(entry, elt); + if (ret_value != SUCCEED) + PGOTO_ERROR(FAIL, "Error with hash table recovering from checkpoint file"); + } } - n_entry--; - } - - if (fread(&n_objs, sizeof(int), 1, file) != 1) { - LOG_ERROR("Read failed for n_objs\n"); } - for (i = 0; i < n_objs; ++i) { - data_server_region_t *new_obj_reg = - (data_server_region_t *)PDC_calloc(1, sizeof(struct data_server_region_t)); - new_obj_reg->fd = -1; - new_obj_reg->storage_location = (char *)PDC_malloc(sizeof(char) * ADDR_MAX); - if (fread(&new_obj_reg->obj_id, sizeof(uint64_t), 1, file) != 1) { - LOG_ERROR("Read failed for obj_id\n"); - } - if (fread(&n_region, sizeof(int), 1, file) != 1) { - LOG_ERROR("Read failed for n_region\n"); - } - DL_APPEND(dataserver_region_g, new_obj_reg); - for (j = 0; j < n_region; j++) { - region_list_t *new_region_list = (region_list_t *)PDC_malloc(sizeof(region_list_t)); - if (fread(new_region_list, sizeof(region_list_t), 1, file) != 1) { - LOG_ERROR("Read failed for new_region_list\n"); + // restore data server regions + dataserver_regions_array = + BULKI_get(checkpoint_bulki, BULKI_singleton_ENTITY("dataserver_regions", PDC_STRING)); + + if (dataserver_regions_array != NULL && dataserver_regions_array->pdc_type == PDC_BULKI) { + BULKI_Entity_Iterator *ds_iter = Bent_iterator_init(dataserver_regions_array, NULL, PDC_BULKI); + + while (Bent_iterator_has_next_BULKI(ds_iter)) { + BULKI *dataserver_obj = Bent_iterator_next_BULKI(ds_iter); + + data_server_region_t *new_obj_reg = + (data_server_region_t *)PDC_calloc(1, sizeof(struct data_server_region_t)); + new_obj_reg->fd = -1; + new_obj_reg->storage_location = (char *)PDC_malloc(sizeof(char) * ADDR_MAX); + + // extract obj_id + BULKI_Entity *obj_id_ent = + BULKI_get(dataserver_obj, BULKI_singleton_ENTITY("obj_id", PDC_STRING)); + memcpy(&new_obj_reg->obj_id, obj_id_ent->data, sizeof(uint64_t)); + + // extract regions + BULKI_Entity *ds_regions_array = + BULKI_get(dataserver_obj, BULKI_singleton_ENTITY("regions", PDC_STRING)); + + if (ds_regions_array != NULL && ds_regions_array->pdc_type == PDC_BULKI) { + BULKI_Entity_Iterator *ds_region_iter = Bent_iterator_init(ds_regions_array, NULL, PDC_BULKI); + + while (Bent_iterator_has_next_BULKI(ds_region_iter)) { + BULKI *ds_region = Bent_iterator_next_BULKI(ds_region_iter); + + region_list = (region_list_t *)PDC_malloc(sizeof(region_list_t)); + + BULKI_Entity *region_ent = + BULKI_get(ds_region, BULKI_singleton_ENTITY("region", PDC_STRING)); + memcpy(region_list, region_ent->data, sizeof(region_list_t)); + + // initialize fields (similar to above) + region_list->buf = NULL; + region_list->data_size = 1; + for (idx = 0; idx < region_list->ndim; idx++) + region_list->data_size *= region_list->count[idx]; + region_list->is_data_ready = 0; + region_list->shm_fd = 0; + region_list->meta = NULL; + region_list->prev = NULL; + region_list->next = NULL; + region_list->overlap_storage_regions = NULL; + region_list->n_overlap_storage_region = 0; + hg_atomic_init32(&(region_list->buf_map_refcount), 0); + region_list->reg_dirty_from_buf = 0; + region_list->access_type = PDC_NA; + + DL_APPEND(new_obj_reg->region_storage_head, region_list); + } } - DL_APPEND(new_obj_reg->region_storage_head, new_region_list); + + DL_APPEND(dataserver_region_g, new_obj_reg); } } - if (fread(&checkpoint_size, sizeof(uint64_t), 1, file) != 1) { - LOG_ERROR("Read failed for checkpoint size\n"); + // restore transfer query + BULKI_Entity *transfer_query_ent = + BULKI_get(checkpoint_bulki, BULKI_singleton_ENTITY("transfer_query", PDC_STRING)); + + if (transfer_query_ent != NULL && transfer_query_ent->pdc_type == PDC_BULKI) { + // Extract the nested BULKI containing transfer query data + BULKI *transfer_query_bulki = (BULKI *)transfer_query_ent->data; + + // initialize transfer query system with BULKI checkpoint + ret_value = transfer_request_metadata_query_init_bulki(pdc_server_size_g, transfer_query_bulki); + if (ret_value != SUCCEED) { + LOG_ERROR("Failed to restore transfer query from checkpoint\n"); + PGOTO_ERROR(FAIL, "Transfer query restoration failed"); + } } - checkpoint_buf = (char *)PDC_malloc(checkpoint_size); - if (fread(checkpoint_buf, checkpoint_size, 1, file) != 1) { - LOG_ERROR("Read failed for checkpoint buf\n"); + else { + // note: no transfer query data in checkpoint, initialize fresh + ret_value = transfer_request_metadata_query_init_bulki(pdc_server_size_g, NULL); + if (ret_value != SUCCEED) { + LOG_ERROR("Failed to initialize transfer query system\n"); + PGOTO_ERROR(FAIL, "Transfer query initialization failed"); + } } - transfer_request_metadata_query_init(pdc_server_size_g, checkpoint_buf); - checkpoint_buf = (char *)PDC_free(checkpoint_buf); - fclose(file); - file = NULL; + // clean up + BULKI_free(checkpoint_bulki, 1); #ifdef ENABLE_MPI MPI_Reduce(&nobj, &all_nobj, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD); @@ -1690,6 +1954,17 @@ PDC_Server_restart(char *filename) pdc_server_timings->PDCserver_restart += MPI_Wtime() - start; #endif +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + if (!pdc_server_rank_g) { + LOG_INFO("total restart time = %lf\n", MPI_Wtime() - start); + } +#endif + // ensure file is closed if error occurred before BULKI_deserialize_from_file + if (file != NULL) { + fclose(file); + } + FUNC_LEAVE(ret_value); } diff --git a/src/server/pdc_server_region/include/pdc_server_region_transfer_metadata_query.h b/src/server/pdc_server_region/include/pdc_server_region_transfer_metadata_query.h index fc2ec3a7a..5755d5e4e 100644 --- a/src/server/pdc_server_region/include/pdc_server_region_transfer_metadata_query.h +++ b/src/server/pdc_server_region/include/pdc_server_region_transfer_metadata_query.h @@ -1,9 +1,14 @@ #ifndef PDC_SERVER_REGION_TRANSFER_METADATA_QUERY_H #define PDC_SERVER_REGION_TRANSFER_METADATA_QUERY_H +#include "bulki.h" +#include "bulki_serde.h" + perr_t transfer_request_metadata_query_init(int pdc_server_size_input, char *checkpoint); +perr_t transfer_request_metadata_query_init_bulki(int pdc_server_size_input, BULKI *checkpoint_bulki); perr_t transfer_request_metadata_query_finalize(); perr_t transfer_request_metadata_query_checkpoint(char **checkpoint, uint64_t *checkpoint_size); +perr_t transfer_request_metadata_query_checkpoint_bulki(BULKI **checkpoint_bulki); perr_t transfer_request_metadata_query_lookup_query_buf(uint64_t query_id, char **buf_ptr); uint64_t transfer_request_metadata_query_parse(int32_t n_objs, char *buf, uint8_t partition_type, uint64_t *total_buf_size_ptr); diff --git a/src/server/pdc_server_region/pdc_server_region_transfer_metadata_query.c b/src/server/pdc_server_region/pdc_server_region_transfer_metadata_query.c index c66ecd905..440aa763c 100644 --- a/src/server/pdc_server_region/pdc_server_region_transfer_metadata_query.c +++ b/src/server/pdc_server_region/pdc_server_region_transfer_metadata_query.c @@ -53,6 +53,128 @@ static uint64_t transfer_request_metadata_query_append(uint64_t obj_id, int ndim static uint64_t metadata_query_buf_create(pdc_obj_region_metadata *regions, int size, uint64_t *total_buf_size_ptr); +perr_t +transfer_request_metadata_query_init_bulki(int pdc_server_size_input, BULKI *checkpoint_bulki) +{ + FUNC_ENTER(NULL); + + perr_t ret_value = SUCCEED; + + metadata_server_objs = NULL; + metadata_server_objs_end = NULL; + metadata_query_buf_head = NULL; + metadata_query_buf_end = NULL; + pdc_server_size = pdc_server_size_input; + data_server_bytes = (uint64_t *)PDC_calloc(pdc_server_size, sizeof(uint64_t)); + query_id_g = 100000; + + pthread_mutex_init(&metadata_query_mutex, NULL); + + if (checkpoint_bulki != NULL) { + BULKI_Entity *objects_array = + BULKI_get(checkpoint_bulki, BULKI_singleton_ENTITY("objects", PDC_STRING)); + + if (objects_array == NULL || objects_array->pdc_type != PDC_BULKI) { + LOG_ERROR("Invalid transfer query checkpoint: missing or invalid 'objects' field\n"); + PGOTO_ERROR(FAIL, "Invalid checkpoint format"); + } + + BULKI_Entity_Iterator *obj_iter = Bent_iterator_init(objects_array, NULL, PDC_BULKI); + + while (Bent_iterator_has_next_BULKI(obj_iter)) { + BULKI *obj_bulki = Bent_iterator_next_BULKI(obj_iter); + + pdc_obj_metadata_pkg *obj_pkg = (pdc_obj_metadata_pkg *)PDC_malloc(sizeof(pdc_obj_metadata_pkg)); + + BULKI_Entity *obj_id_ent = BULKI_get(obj_bulki, BULKI_singleton_ENTITY("obj_id", PDC_STRING)); + if (obj_id_ent == NULL) { + LOG_ERROR("Missing obj_id in checkpoint object\n"); + PDC_free(obj_pkg); + continue; + } + memcpy(&obj_pkg->obj_id, obj_id_ent->data, sizeof(uint64_t)); + + BULKI_Entity *ndim_ent = BULKI_get(obj_bulki, BULKI_singleton_ENTITY("ndim", PDC_STRING)); + if (ndim_ent == NULL) { + LOG_ERROR("Missing ndim in checkpoint object\n"); + PDC_free(obj_pkg); + continue; + } + memcpy(&obj_pkg->ndim, ndim_ent->data, sizeof(int)); + + obj_pkg->regions = NULL; + obj_pkg->regions_end = NULL; + obj_pkg->next = NULL; + + BULKI_Entity *regions_array = BULKI_get(obj_bulki, BULKI_singleton_ENTITY("regions", PDC_STRING)); + + if (regions_array != NULL && regions_array->pdc_type == PDC_BULKI) { + BULKI_Entity_Iterator *region_iter = Bent_iterator_init(regions_array, NULL, PDC_BULKI); + + while (Bent_iterator_has_next_BULKI(region_iter)) { + BULKI *region_bulki = Bent_iterator_next_BULKI(region_iter); + + pdc_region_metadata_pkg *region_pkg = + (pdc_region_metadata_pkg *)PDC_malloc(sizeof(pdc_region_metadata_pkg)); + + region_pkg->reg_offset = (uint64_t *)PDC_malloc(sizeof(uint64_t) * obj_pkg->ndim * 2); + region_pkg->reg_size = region_pkg->reg_offset + obj_pkg->ndim; + + BULKI_Entity *server_id_ent = + BULKI_get(region_bulki, BULKI_singleton_ENTITY("data_server_id", PDC_STRING)); + if (server_id_ent != NULL) { + memcpy(®ion_pkg->data_server_id, server_id_ent->data, sizeof(uint32_t)); + } + else { + LOG_ERROR("Missing data_server_id in checkpoint region\n"); + PDC_free(region_pkg->reg_offset); + PDC_free(region_pkg); + continue; + } + + BULKI_Entity *offset_size_ent = + BULKI_get(region_bulki, BULKI_singleton_ENTITY("reg_offset_size", PDC_STRING)); + if (offset_size_ent != NULL) { + memcpy(region_pkg->reg_offset, offset_size_ent->data, + sizeof(uint64_t) * obj_pkg->ndim * 2); + } + else { + LOG_ERROR("Missing reg_offset_size in checkpoint region\n"); + PDC_free(region_pkg->reg_offset); + PDC_free(region_pkg); + continue; + } + + region_pkg->next = NULL; + + if (obj_pkg->regions == NULL) { + obj_pkg->regions = region_pkg; + obj_pkg->regions_end = region_pkg; + } + else { + obj_pkg->regions_end->next = region_pkg; + obj_pkg->regions_end = region_pkg; + } + } + } + + if (metadata_server_objs == NULL) { + metadata_server_objs = obj_pkg; + metadata_server_objs_end = obj_pkg; + } + else { + metadata_server_objs_end->next = obj_pkg; + metadata_server_objs_end = obj_pkg; + } + } + + LOG_DEBUG("Transfer query checkpoint restored successfully\n"); + } + +done: + FUNC_LEAVE(ret_value); +} + /** * Entry function for this class. Should be only called once at the beginning of Server init. * If checkpoint is not NULL, then load previously checkpointed metadata to static variables. @@ -167,6 +289,77 @@ transfer_request_metadata_query_finalize() FUNC_LEAVE(ret_value); } +perr_t +transfer_request_metadata_query_checkpoint_bulki(BULKI **checkpoint_bulki) +{ + FUNC_ENTER(NULL); + + perr_t ret_value = SUCCEED; + pdc_obj_metadata_pkg * obj_temp; + pdc_region_metadata_pkg *region_temp; + int obj_count = 0; + BULKI * bulki = NULL; + + // if (checkpoint_bulki == NULL) { + // LOG_ERROR("checkpoint_bulki output parameter is NULL\n"); + // PGOTO_ERROR(FAIL, "Invalid parameter"); + // } + + pthread_mutex_lock(&metadata_query_mutex); + + bulki = BULKI_init(1); + BULKI_Entity *objects_array = empty_BULKI_Array_Entity(); + + obj_temp = metadata_server_objs; + while (obj_temp) { + BULKI *obj_bulki = BULKI_init(3); + + BULKI_put(obj_bulki, BULKI_singleton_ENTITY("obj_id", PDC_STRING), + BULKI_ENTITY(&obj_temp->obj_id, 1, PDC_UINT64, PDC_CLS_ITEM)); + + BULKI_put(obj_bulki, BULKI_singleton_ENTITY("ndim", PDC_STRING), + BULKI_ENTITY(&obj_temp->ndim, 1, PDC_INT, PDC_CLS_ITEM)); + + BULKI_Entity *regions_array = empty_BULKI_Array_Entity(); + + region_temp = obj_temp->regions; + while (region_temp) { + BULKI *region_bulki = BULKI_init(2); + + BULKI_put(region_bulki, BULKI_singleton_ENTITY("data_server_id", PDC_STRING), + BULKI_ENTITY(®ion_temp->data_server_id, 1, PDC_UINT32, PDC_CLS_ITEM)); + + BULKI_put(region_bulki, BULKI_singleton_ENTITY("reg_offset_size", PDC_STRING), + BULKI_ENTITY(region_temp->reg_offset, obj_temp->ndim * 2, PDC_UINT64, PDC_CLS_ARRAY)); + + BULKI_ENTITY_append_BULKI(regions_array, region_bulki); + region_temp = region_temp->next; + } + + BULKI_put(obj_bulki, BULKI_singleton_ENTITY("regions", PDC_STRING), regions_array); + + BULKI_ENTITY_append_BULKI(objects_array, obj_bulki); + + obj_count++; + obj_temp = obj_temp->next; + } + + BULKI_put(bulki, BULKI_singleton_ENTITY("objects", PDC_STRING), objects_array); + + pthread_mutex_unlock(&metadata_query_mutex); + + *checkpoint_bulki = bulki; + + LOG_DEBUG("Transfer query checkpoint created: %d objects\n", obj_count); + +done: + if (ret_value != SUCCEED && bulki != NULL) { + BULKI_free(bulki, 1); + } + + FUNC_LEAVE(ret_value); +} + /** * Checkpoint static variables in this file into a contiguous buffer. * checkpoint_size is the total number of bytes allocated. diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 6f0482fdc..574d84d9f 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -136,6 +136,8 @@ if(BUILD_MPI_TESTING) tags/kvtag_affix_query_scale tags/kvtag_add_get_benchmark tags/kvtag_add_get_scale + tags/kvtag_add_scale + tags/kvtag_get_verify_scale ) if(PDC_ENABLE_IDIOMS) diff --git a/src/tests/tags/kvtag_add_scale.c b/src/tests/tags/kvtag_add_scale.c new file mode 100644 index 000000000..4e00097f6 --- /dev/null +++ b/src/tests/tags/kvtag_add_scale.c @@ -0,0 +1,216 @@ +/* +* Copyright Notice for +* Proactive Data Containers (PDC) Software Library and Utilities +* ----------------------------------------------------------------------------- + +*** Copyright Notice *** + +* Proactive Data Containers (PDC) Copyright (c) 2017, The Regents of the +* University of California, through Lawrence Berkeley National Laboratory, +* UChicago Argonne, LLC, operator of Argonne National Laboratory, and The HDF +* Group (subject to receipt of any required approvals from the U.S. Dept. of +* Energy). All rights reserved. + +* If you have questions about your rights to use or distribute this software, +* please contact Berkeley Lab's Innovation & Partnerships Office at IPO@lbl.gov. + +* NOTICE. This Software was developed under funding from the U.S. Department of +* Energy and the U.S. Government consequently retains certain rights. As such, the +* U.S. Government has been granted for itself and others acting on its behalf a +* paid-up, nonexclusive, irrevocable, worldwide license in the Software to +* reproduce, distribute copies to the public, prepare derivative works, and +* perform publicly and display publicly, and to permit other to do so. +*/ + +#include +#include +#include +#include +#include +#include "pdc.h" +#include "pdc_client_connect.h" + +int +assign_work_to_rank(int rank, int size, int nwork, int *my_count, int *my_start) +{ + if (rank > size || my_count == NULL || my_start == NULL) { + LOG_INFO("assign_work_to_rank(): Error with input\n"); + return -1; + } + if (nwork < size) { + if (rank < nwork) + *my_count = 1; + else + *my_count = 0; + (*my_start) = rank * (*my_count); + } + else { + (*my_count) = nwork / size; + (*my_start) = rank * (*my_count); + + // Last few ranks may have extra work + if (rank >= size - nwork % size) { + (*my_count)++; + (*my_start) += (rank - (size - nwork % size)); + } + } + + return 1; +} + +void +print_usage(char *name) +{ + // required parameters: n_obj and n_add_tag + LOG_JUST_PRINT("%s n_obj n_add_tag\n", name); +} + +int +main(int argc, char *argv[]) +{ + pdcid_t pdc, cont_prop, cont, obj_prop; + pdcid_t * obj_ids; + int n_obj, n_add_tag, my_obj, my_obj_s, my_add_tag, my_add_tag_s; + int obj_1percent = 0, tag_1percent = 0; + int proc_num, my_rank, i, v; + char obj_name[128]; + double stime, total_time, percent_time; + pdc_kvtag_t kvtag; + int ret_value = SUCCEED; + +#ifdef ENABLE_MPI + MPI_Init(&argc, &argv); + MPI_Comm_size(MPI_COMM_WORLD, &proc_num); + MPI_Comm_rank(MPI_COMM_WORLD, &my_rank); +#endif + if (argc < 3) { + if (my_rank == 0) + print_usage(argv[0]); + PGOTO_DONE(FAIL); + } + n_obj = atoi(argv[1]); + n_add_tag = atoi(argv[2]); + + if (n_add_tag > n_obj) { + if (my_rank == 0) + LOG_ERROR("n_add_tag larger than n_obj! Exiting...\n"); + PGOTO_DONE(FAIL); + } + + assign_work_to_rank(my_rank, proc_num, n_add_tag, &my_add_tag, &my_add_tag_s); + assign_work_to_rank(my_rank, proc_num, n_obj, &my_obj, &my_obj_s); + + obj_1percent = my_obj / 100; + tag_1percent = my_add_tag / 100; + + if (my_rank == 0) + LOG_INFO("Create %d obj, %d tags\n", my_obj, my_add_tag); + + // create a pdc + pdc = PDCinit("pdc"); + + // create a container property + cont_prop = PDCprop_create(PDC_CONT_CREATE, pdc); + if (cont_prop <= 0) + PGOTO_ERROR(FAIL, "Failed to create container property"); + + // create a container + cont = PDCcont_create("c1", cont_prop); + if (cont <= 0) + PGOTO_ERROR(FAIL, "Failed to create container"); + + // create an object property + obj_prop = PDCprop_create(PDC_OBJ_CREATE, pdc); + if (obj_prop <= 0) + PGOTO_ERROR(FAIL, "Failed to create object property"); + + // create a number of objects, add at least one tag to that object + obj_ids = (pdcid_t *)calloc(my_obj, sizeof(pdcid_t)); + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + stime = MPI_Wtime(); +#endif + + for (i = 0; i < my_obj; i++) { + sprintf(obj_name, "obj%d", my_obj_s + i); + obj_ids[i] = PDCobj_create(cont, obj_name, obj_prop); + if (obj_ids[i] <= 0) + PGOTO_ERROR(FAIL, "Failed to create object"); + + if (i > 0 && i % obj_1percent == 0) { +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + percent_time = MPI_Wtime() - stime; + if (my_rank == 0) { + int current_percentage = i / obj_1percent; + int estimated_current_object_number = n_obj / 100 * current_percentage; + double tps = estimated_current_object_number / percent_time; + LOG_INFO("[OBJ PROGRESS %3d%% ] %11d objects, %7.2f seconds, TPS: %10.2f \n", + current_percentage, estimated_current_object_number, percent_time, tps); + } +#endif + } + } + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + total_time = MPI_Wtime() - stime; +#endif + + if (my_rank == 0) + LOG_INFO("Total time to create %11d objects: %7.2f , throughput %10.2f \n", n_obj, total_time, + n_obj / total_time); + + // Add tags + kvtag.name = "Group"; + kvtag.value = (void *)&v; + kvtag.type = PDC_INT; + kvtag.size = sizeof(int); + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + stime = MPI_Wtime(); +#endif + for (i = 0; i < my_add_tag; i++) { + v = i + my_add_tag_s; + if (PDCobj_put_tag(obj_ids[i], kvtag.name, kvtag.value, kvtag.type, kvtag.size) < 0) + PGOTO_ERROR(FAIL, "Failed to add a kvtag to o%d", i + my_obj_s); + + if (i % tag_1percent == 0) { +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + percent_time = MPI_Wtime() - stime; + if (my_rank == 0) { + int current_percentage = i / tag_1percent; + int estimated_current_tag_number = n_obj / 100 * current_percentage; + double tps = estimated_current_tag_number / percent_time; + LOG_INFO("[TAG PROGRESS %3d%% ] %11d tags, %7.2f seconds, TPS: %10.2f \n", current_percentage, + estimated_current_tag_number, percent_time, tps); + } +#endif + } + } + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + total_time = MPI_Wtime() - stime; +#endif + if (my_rank == 0) + LOG_INFO("Total time to add tags to %11d objects: %7.2f , throughput %10.2f \n", n_add_tag, + total_time, n_add_tag / total_time); + + free(obj_ids); + + if (my_rank == 0) { + // confirming tag addition completion + LOG_INFO("Done adding tags\n"); + } + +done: +#ifdef ENABLE_MPI + MPI_Finalize(); +#endif + + return ret_value; +} \ No newline at end of file diff --git a/src/tests/tags/kvtag_get_verify_scale.c b/src/tests/tags/kvtag_get_verify_scale.c new file mode 100644 index 000000000..ab98d093d --- /dev/null +++ b/src/tests/tags/kvtag_get_verify_scale.c @@ -0,0 +1,245 @@ +/* +* Copyright Notice for +* Proactive Data Containers (PDC) Software Library and Utilities +* ----------------------------------------------------------------------------- + +*** Copyright Notice *** + +* Proactive Data Containers (PDC) Copyright (c) 2017, The Regents of the +* University of California, through Lawrence Berkeley National Laboratory, +* UChicago Argonne, LLC, operator of Argonne National Laboratory, and The HDF +* Group (subject to receipt of any required approvals from the U.S. Dept. of +* Energy). All rights reserved. + +* If you have questions about your rights to use or distribute this software, +* please contact Berkeley Lab's Innovation & Partnerships Office at IPO@lbl.gov. + +* NOTICE. This Software was developed under funding from the U.S. Department of +* Energy and the U.S. Government consequently retains certain rights. As such, the +* U.S. Government has been granted for itself and others acting on its behalf a +* paid-up, nonexclusive, irrevocable, worldwide license in the Software to +* reproduce, distribute copies to the public, prepare derivative works, and +* perform publicly and display publicly, and to permit other to do so. +*/ + +#include +#include +#include +#include +#include +#include "pdc.h" +#include "pdc_client_connect.h" + +int +assign_work_to_rank(int rank, int size, int nwork, int *my_count, int *my_start) +{ + if (rank > size || my_count == NULL || my_start == NULL) { + LOG_INFO("assign_work_to_rank(): Error with input\n"); + return -1; + } + if (nwork < size) { + if (rank < nwork) + *my_count = 1; + else + *my_count = 0; + (*my_start) = rank * (*my_count); + } + else { + (*my_count) = nwork / size; + (*my_start) = rank * (*my_count); + + // Last few ranks may have extra work + if (rank >= size - nwork % size) { + (*my_count)++; + (*my_start) += (rank - (size - nwork % size)); + } + } + + return 1; +} + +void +print_usage(char *name) +{ + // required parameters: n_obj and n_query + LOG_JUST_PRINT("%s n_obj n_tag\n", name); +} + +int +main(int argc, char *argv[]) +{ + pdcid_t pdc, cont_prop, cont; + pdcid_t * obj_ids; + int n_obj, n_query, my_obj, my_obj_s, n_tag, n_tag_s; + int obj_1percent = 0; + int proc_num, my_rank, i; + char obj_name[128]; + double stime, total_time, percent_time; + pdc_kvtag_t kvtag; + void ** values; + pdc_var_type_t value_type; + size_t value_size; + int ret_value = SUCCEED; + // counters for verification statistics + int verified_success = 0; + int verified_fail = 0; + int all_verified_success = 0; + int all_verified_fail = 0; + +#ifdef ENABLE_MPI + MPI_Init(&argc, &argv); + MPI_Comm_size(MPI_COMM_WORLD, &proc_num); + MPI_Comm_rank(MPI_COMM_WORLD, &my_rank); +#endif + if (argc < 3) { + if (my_rank == 0) + print_usage(argv[0]); + PGOTO_DONE(FAIL); + } + n_obj = atoi(argv[1]); + n_query = atoi(argv[2]); + + if (n_query > n_obj) { + if (my_rank == 0) + LOG_ERROR("n_query larger than n_obj! Exiting...\n"); + PGOTO_DONE(FAIL); + } + + assign_work_to_rank(my_rank, proc_num, n_query, &n_tag, &n_tag_s); + assign_work_to_rank(my_rank, proc_num, n_obj, &my_obj, &my_obj_s); + + obj_1percent = my_obj / 100; + + if (my_rank == 0) + LOG_INFO("Open %d obj, query %d tags\n", my_obj, n_tag); + + // create a pdc + pdc = PDCinit("pdc"); + + // Open the existing container + cont = PDCcont_open("c1", pdc); + if (cont <= 0) + PGOTO_ERROR(FAIL, "Failed to open container"); + + // Open existing objects + obj_ids = (pdcid_t *)calloc(my_obj, sizeof(pdcid_t)); + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + stime = MPI_Wtime(); +#endif + + // open already created PDC objects and query tags + for (i = 0; i < my_obj; i++) { + sprintf(obj_name, "obj%d", my_obj_s + i); + obj_ids[i] = PDCobj_open(obj_name, pdc); + if (obj_ids[i] <= 0) + PGOTO_ERROR(FAIL, "Failed to open object"); + + // progress reporting for object opening + if (i > 0 && obj_1percent > 0 && i % obj_1percent == 0) { +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + percent_time = MPI_Wtime() - stime; + if (my_rank == 0) { + int current_percentage = i / obj_1percent; + int estimated_current_object_number = n_obj / 100 * current_percentage; + double tps = estimated_current_object_number / percent_time; + LOG_INFO("[OBJ PROGRESS %3d%% ] %11d objects, %7.2f seconds, TPS: %10.2f \n", + current_percentage, estimated_current_object_number, percent_time, tps); + } +#endif + } + } + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + total_time = MPI_Wtime() - stime; +#endif + + if (my_rank == 0) + LOG_INFO("Total time to open %11d objects: %7.2f , throughput %10.2f \n", n_obj, total_time, + n_obj / total_time); + + // Setup kvtag for queries + kvtag.name = "Group"; + + values = (void **)calloc(n_tag, sizeof(void *)); + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + stime = MPI_Wtime(); +#endif + for (i = 0; i < n_tag; i++) { + if (PDCobj_get_tag(obj_ids[i], kvtag.name, (void *)&values[i], (void *)&value_type, + (void *)&value_size) < 0) + PGOTO_ERROR(FAIL, "Failed to get a kvtag from o%d\n", i + n_tag_s); + + int expected_value = i + n_tag_s; // Assuming tags were added in order starting from 0 + + // count successful and failed verifications instead of immediate error + if (*(int *)(values[i]) == expected_value) { + verified_success++; + } + else { + verified_fail++; + // log first 10 failures for debugging + if (verified_fail <= 10) { + LOG_ERROR("Verification failed for obj%d: expected %d, got %d\n", i + n_tag_s, expected_value, + *(int *)(values[i])); + } + } + free(values[i]); + } + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + total_time = MPI_Wtime() - stime; +#endif + if (my_rank == 0) + LOG_INFO("Total time to retrieve %11d tag from %11d objects: %7.2f , throughput %10.2f \n", n_query, + n_obj, total_time, n_query / total_time); + + free(values); + free(obj_ids); + + // aggregate verification statistics across all ranks +#ifdef ENABLE_MPI + MPI_Reduce(&verified_success, &all_verified_success, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD); + MPI_Reduce(&verified_fail, &all_verified_fail, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD); +#else + all_verified_success = verified_success; + all_verified_fail = verified_fail; +#endif + + if (my_rank == 0) { + // completion message with verification statistics + LOG_INFO("==============================================================\n"); + LOG_INFO("Verification Summary:\n"); + LOG_INFO(" Total queries: %11d\n", n_query); + LOG_INFO(" Successfully verified: %11d (%6.2f%%)\n", all_verified_success, + 100.0 * all_verified_success / n_query); + LOG_INFO(" Failed verification: %11d (%6.2f%%)\n", all_verified_fail, + 100.0 * all_verified_fail / n_query); + LOG_INFO("==============================================================\n"); + + if (all_verified_fail > 0) { + LOG_ERROR("WARNING: %d objects failed verification!\n", all_verified_fail); + } + else { + LOG_INFO("SUCCESS: All objects verified correctly!\n"); + } + } + + // set return value based on verification results + if (verified_fail > 0) { + ret_value = FAIL; + } + +done: +#ifdef ENABLE_MPI + MPI_Finalize(); +#endif + + return ret_value; +} \ No newline at end of file