5959import pandas as pd
6060from dagster import AssetExecutionContext , MetadataValue , Output , asset
6161from django .db .models .functions import Lower
62- from upath import UPath
6362
6463from ..configs import BSSMetadataConfig
6564from ..partitions import bss_instances_partitions_def
@@ -394,12 +393,38 @@ def get_all_label_attributes(labels: pd.Series, activity_type: str, country_code
394393 all_label_attributes ["country_id" ] = country_code
395394 all_label_attributes = seasonnamelookup .do_lookup (all_label_attributes , "season" , "season" )
396395 all_label_attributes ["season" ] = all_label_attributes ["season" ].replace (pd .NA , None )
396+
397397 # Make sure we keep the same index so we can match by row number
398398 all_label_attributes .index = labels .index
399+
400+ # Sometimes, particularly for FoodPurchase, OtherExpenditure and OtherCashIncome, the label in column A
401+ # is the name of a Product, or an alias for one, with no other text. Therefore, for labels where we haven't
402+ # identified any attributes, we attempt to match the label as a Product name. If we find a match, we set the
403+ # `activity_label` and the `additional_identifier` to the text of the label and set the `product_id`. We set the
404+ # `additional_identifier` so that we can differentiate between aggregate Livelihood Strategies under a high-level
405+ # Classified Product, e.g. Skilled Labor, without losing the specific text that was provided in the BSS.
406+ product_labels = labels [all_label_attributes ["activity_label" ] == "" ].to_frame (name = "label" )
407+ product_labels = classifiedproductlookup .do_lookup (product_labels , "label" , "product_id" )
408+ # Set the activity_label so that get_instances_from_dataframe() doesn't treat these as unrecognized labels
409+ product_labels .loc [product_labels ["product_id" ].notna (), "activity_label" ] = product_labels .loc [
410+ product_labels ["product_id" ].notna (), "label"
411+ ]
412+ # Set the additional_identifier so that we can differentiate between different labels that map to the same product
413+ product_labels .loc [product_labels ["product_id" ].notna (), "additional_identifier" ] = product_labels .loc [
414+ product_labels ["product_id" ].notna (), "label"
415+ ]
416+ # Labels that contain just a product name or alias and weren't recognized by an Activity Label or regular
417+ # expression always indicate a new Livelihood Strategy with the values in the row containing income or expenditure,
418+ # depending on the Livelihood Strategy.
419+ product_labels .loc [product_labels ["product_id" ].notna (), "is_start" ] = True
420+ product_labels .loc [product_labels ["product_id" ].notna (), "attribute" ] = "income_or_expenditure"
421+ # Copy the product labels back into the main dataframe
422+ all_label_attributes .update (product_labels .drop (columns = ["label" ]))
423+
399424 return all_label_attributes
400425
401426
402- @asset
427+ @asset ( io_manager_key = "dataframe_excel_io_manager" )
403428def livelihood_activity_label_recognition_dataframe (
404429 context : AssetExecutionContext ,
405430 config : BSSMetadataConfig ,
@@ -411,26 +436,33 @@ def livelihood_activity_label_recognition_dataframe(
411436 """
412437 A saved spreadsheet showing how each BSS label is recognized, either from the ActivityLabel model or a regex.
413438 """
414- # Path to the output spreadsheet
415- p = UPath (config .bss_label_recognition_workbook , ** config .bss_label_recognition_storage_options )
416-
417439 all_livelihood_activity_labels_dataframe ["activity_type" ] = (
418440 ActivityLabel .LivelihoodActivityType .LIVELIHOOD_ACTIVITY
419441 )
420442 all_other_cash_income_labels_dataframe ["activity_type" ] = ActivityLabel .LivelihoodActivityType .OTHER_CASH_INCOME
421443 all_wild_foods_labels_dataframe ["activity_type" ] = ActivityLabel .LivelihoodActivityType .WILD_FOODS
422444 all_livelihood_summary_labels_dataframe ["activity_type" ] = ActivityLabel .LivelihoodActivityType .LIVELIHOOD_SUMMARY
423445
424- # Build a dataframe of all the Activity Labels from all BSSs
425- all_labels_df = pd .concat (
426- [
427- all_livelihood_activity_labels_dataframe ,
428- all_other_cash_income_labels_dataframe ,
429- all_wild_foods_labels_dataframe ,
430- all_livelihood_summary_labels_dataframe ,
431- ],
432- ignore_index = True ,
433- )
446+ # Build a dataframe of all the Activity Labels from all BSSs, including the attributes recognized from the labels,
447+ # including any labels that are matched directly as Products.
448+ all_labels_df = pd .DataFrame ()
449+ for summary_label_df in [
450+ all_livelihood_activity_labels_dataframe ,
451+ all_other_cash_income_labels_dataframe ,
452+ all_wild_foods_labels_dataframe ,
453+ all_livelihood_summary_labels_dataframe ,
454+ ]:
455+ recognized_attributes_df = get_all_label_attributes (
456+ summary_label_df ["label" ],
457+ summary_label_df ["activity_type" ].iloc [0 ],
458+ country_code = None , # We don't need the Season lookup for this asset
459+ )
460+ # Join the recognized attributes to the label dataframe
461+ summary_label_df = summary_label_df .join (
462+ recognized_attributes_df ,
463+ how = "left" ,
464+ )
465+ all_labels_df = pd .concat ([all_labels_df , summary_label_df ], ignore_index = True )
434466
435467 # Add the regular expressions
436468 regex_attributes_df = pd .DataFrame .from_records (
@@ -439,6 +471,8 @@ def livelihood_activity_label_recognition_dataframe(
439471 all_labels_df = all_labels_df .join (
440472 regex_attributes_df ,
441473 how = "left" ,
474+ lsuffix = "" ,
475+ rsuffix = "_regex" ,
442476 )
443477
444478 # Add the labels from the database
@@ -462,23 +496,82 @@ def livelihood_activity_label_recognition_dataframe(
462496 db_labels_df .set_index (["label_lower" , "activity_type" ]),
463497 on = ("label_lower" , "activity_type" ),
464498 how = "left" ,
499+ lsuffix = "" ,
465500 rsuffix = "_db" ,
466- lsuffix = "_regex" ,
467501 )
468502
469- # GDriveFS doesn't support updating existing files, it always create a new file with same name.
470- # This leads to multiple files with the same name in the folder, so we delete any existing files first.
471- if p .exists ():
472- # @TODO This doesn't work with the current version of gdrivefs, possibly because of an error
473- # with accessing Shared Drives. For now, we need to manually delete the old files before running
474- # the asset again.
475- # We need to experiment and possibly create a custom gdrivefs that reuses code from KiLuigi's GoogleDriveTarget
476- p .unlink ()
503+ # Create a deduplicated dataframe of all of the labels
504+ summary_label_df = all_labels_df .sort_values (by = ["label_lower" , "row_number" , "bss" ])
505+ summary_label_df = (
506+ summary_label_df .groupby (
507+ "label_lower" ,
508+ )
509+ .agg (
510+ langs = (
511+ "lang" ,
512+ lambda x : ", " .join (x .sort_values ().unique ()),
513+ ), # Create comma-separated list of unique languages
514+ datapoint_count_sum = ("datapoint_count" , "sum" ),
515+ in_summary_sum = ("in_summary" , "sum" ),
516+ unique_bss_count = ("bss" , pd .Series .nunique ),
517+ min_row_number = ("row_number" , "min" ),
518+ max_row_number = ("row_number" , "max" ),
519+ bss_for_min_row = ("bss" , "first" ), # Assuming df is sorted by row_number within each group
520+ bss_for_max_row = ("bss" , "last" ), # Assuming df is sorted by row_number within each group
521+ )
522+ .reset_index ()
523+ )
524+ summary_label_df = summary_label_df .sort_values (
525+ by = ["min_row_number" , "label_lower" , "bss_for_min_row" , "bss_for_max_row" ]
526+ )
527+ summary_label_df = summary_label_df .rename (
528+ columns = {"label_lower" : "label" , "datapoint_count_sum" : "datapoint_count" , "in_summary_sum" : "summary_count" }
529+ )
530+ summary_label_df = summary_label_df .join (
531+ all_labels_df [
532+ [
533+ "label_lower" ,
534+ "activity_type" ,
535+ "activity_label" ,
536+ "strategy_type" ,
537+ "is_start" ,
538+ "product_id_original" ,
539+ "unit_of_measure_id_original" ,
540+ "season" ,
541+ "additional_identifier" ,
542+ "attribute" ,
543+ "notes" ,
544+ "product_id" ,
545+ "unit_of_measure_id" ,
546+ "activity_label_regex" ,
547+ "strategy_type_regex" ,
548+ "is_start_regex" ,
549+ "product_id_regex" ,
550+ "unit_of_measure_id_regex" ,
551+ "season_regex" ,
552+ "additional_identifier_regex" ,
553+ "attribute_regex" ,
554+ "notes_regex" ,
555+ "status" ,
556+ "strategy_type_db" ,
557+ "is_start_db" ,
558+ "product_id_db" ,
559+ "unit_of_measure_id_db" ,
560+ "currency_id" ,
561+ "season_db" ,
562+ "additional_identifier_db" ,
563+ "attribute_db" ,
564+ "notes_db" ,
565+ ]
566+ ]
567+ .drop_duplicates ()
568+ .set_index ("label_lower" ),
569+ on = "label" ,
570+ how = "inner" ,
571+ )
477572
478- # Save the dataframe to an Excel workbook
479- with p .fs .open (p .path , mode = "wb" ) as f :
480- with pd .ExcelWriter (f , engine = "openpyxl" ) as writer :
481- all_labels_df .to_excel (writer , index = False , sheet_name = "All Labels" )
573+ # Save the dataframes to an Excel workbook
574+ return {"Label Summary" : summary_label_df , "All Labels" : all_labels_df }
482575
483576
484577def get_instances_from_dataframe (
@@ -526,6 +619,11 @@ def get_instances_from_dataframe(
526619 # Get a dataframe of the Wealth Groups for each column
527620 wealth_group_df = get_wealth_group_dataframe (df , livelihood_zone_baseline , worksheet_name , partition_key )
528621
622+ # Summary columns are those with a Wealth Group Category but not a Community
623+ summary_columns = wealth_group_df [
624+ (wealth_group_df ["wealth_group_category" ].notnull ()) & (wealth_group_df ["community" ].isnull ())
625+ ]["bss_column" ].tolist ()
626+
529627 # Get a dataframe of the attributes for each label in column A
530628 all_label_attributes = get_all_label_attributes (
531629 df ["A" ], activity_type , livelihood_zone_baseline .livelihood_zone .country_id
@@ -536,19 +634,38 @@ def get_instances_from_dataframe(
536634 # but the matching row in all_label_attributes dataframe has a blank activity_label.
537635 # Group the resulting dataframe so that we have a label and a list of the rows where it occurs.
538636 allow_unrecognized_labels = True
539- unrecognized_labels = (
540- df .iloc [num_header_rows :][
541- (df ["A" ].iloc [num_header_rows :] != "" )
542- & (all_label_attributes .iloc [num_header_rows :]["activity_label" ] == "" )
543- ]
544- .groupby ("A" )
545- .apply (lambda x : ", " .join (x .index .astype (str )), include_groups = False )
546- )
637+ # Identify rows where Column A is non-empty and the label wasn't recognized
638+ unrecognized_labels = df .iloc [num_header_rows :][
639+ (df ["A" ].iloc [num_header_rows :] != "" ) & (all_label_attributes .iloc [num_header_rows :]["activity_label" ] == "" )
640+ ]
547641 if unrecognized_labels .empty :
548- unrecognized_labels = pd .DataFrame (columns = ["label" , "rows" ])
642+ # Keep the same shape as the non-empty case (label, rows, datapoint_count, in_summary)
643+ unrecognized_labels = pd .DataFrame (columns = ["label" , "rows" , "datapoint_count" , "in_summary" ])
549644 else :
550- unrecognized_labels = unrecognized_labels .reset_index ()
551- unrecognized_labels .columns = ["label" , "rows" ]
645+ # Boolean mask of which cells are numeric (coerce non-numeric to NaN then notna)
646+ numeric_mask = unrecognized_labels .loc [:, "B" :].apply (lambda col : pd .to_numeric (col , errors = "coerce" ).notna ())
647+ # Count numeric datapoints per row
648+ unrecognized_labels ["datapoint_count" ] = numeric_mask .sum (axis = 1 )
649+ # Count numeric datapoints per row that are in the summary columns
650+ summary_numeric_mask = unrecognized_labels .loc [:, summary_columns ].apply (
651+ lambda col : pd .to_numeric (col , errors = "coerce" ).notna ()
652+ )
653+ unrecognized_labels ["summary_datapoint_count" ] = summary_numeric_mask .sum (axis = 1 )
654+ # Aggregate datapoint count by label
655+ unrecognized_labels .loc [:, "label" ] = prepare_lookup (unrecognized_labels ["A" ])
656+ unrecognized_labels = (
657+ unrecognized_labels .groupby ("label" )
658+ .agg (
659+ rows = ("A" , lambda x : "," .join (x .index .astype (str ))),
660+ datapoints = ("datapoint_count" , "sum" ),
661+ summary_datapoints = ("summary_datapoint_count" , "sum" ),
662+ )
663+ .reset_index ()
664+ )
665+ # Sort the rows by the first row number where the label occurs
666+ unrecognized_labels = unrecognized_labels .sort_values (
667+ by = "rows" , key = lambda x : x .str .split ("," ).str .get (0 ).astype (int )
668+ )
552669 message = "Unrecognized activity labels:\n \n " + unrecognized_labels .to_markdown (index = False )
553670 if allow_unrecognized_labels :
554671 context .log .warning (message )
@@ -1006,6 +1123,8 @@ def get_instances_from_dataframe(
10061123
10071124 # We are not starting a new Livelihood Strategy, but there may be
10081125 # additional attributes that need to be added to the current one.
1126+
1127+ # If we have attributes but haven't started a livelihood_strategy, then raise an error.
10091128 if not livelihood_strategy :
10101129 additional_attributes = [label_attributes ["attribute" ]] if label_attributes ["attribute" ] else []
10111130 for attribute in [
@@ -1023,10 +1142,12 @@ def get_instances_from_dataframe(
10231142 % (label_attributes ["activity_label" ], row , ", " .join (additional_attributes ))
10241143 )
10251144
1026- # Only update expected keys, and only if we found a value for that attribute.
1145+ # Only update expected keys, and only if we found a value for that attribute that doesn't conflict with
1146+ # an existing value.
10271147 for attribute , value in label_attributes .items ():
10281148 if attribute in livelihood_strategy and attribute not in ["activity_label" , "pattern" ] and value :
10291149 if not livelihood_strategy [attribute ]:
1150+ # Attribute not yet set for the `livelihood_strategy`, so it is safe to set it.
10301151 livelihood_strategy [attribute ] = value
10311152 livelihood_strategy ["attribute_rows" ][attribute ] = row
10321153
@@ -1091,6 +1212,19 @@ def get_instances_from_dataframe(
10911212 % (strategy_type , activity_attribute , label )
10921213 )
10931214 activity_attribute = None
1215+ elif activity_attribute == "income_or_expenditure" :
1216+ if livelihood_strategy ["strategy_type" ] == LivelihoodStrategyType .FOOD_PURCHASE :
1217+ activity_attribute = "expenditure"
1218+ elif livelihood_strategy ["strategy_type" ] == LivelihoodStrategyType .OTHER_PURCHASE :
1219+ activity_attribute = "expenditure"
1220+ elif livelihood_strategy ["strategy_type" ] == LivelihoodStrategyType .OTHER_CASH_INCOME :
1221+ activity_attribute = "income"
1222+ else :
1223+ errors .append (
1224+ "Invalid strategy_type %s for attribute %s from label '%s'"
1225+ % (strategy_type , activity_attribute , label )
1226+ )
1227+ activity_attribute = None
10941228
10951229 # For Payment In Kind and Other Cash Income the attribute for payment_per_time sometimes uses a label
10961230 # that normally matches the price attribute.
0 commit comments