Skip to content

Commit e3d094b

Browse files
authored
Merge pull request #161 from FEWS-NET/HEA-372/better_lookups_in_livelihood_activities
Hea 372/better lookups in livelihood activities
2 parents 7ee28bc + a5405d6 commit e3d094b

2 files changed

Lines changed: 106 additions & 94 deletions

File tree

pipelines/assets/livelihood_activity.py

Lines changed: 81 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161

6262
from ..configs import BSSMetadataConfig
6363
from ..partitions import bss_instances_partitions_def
64-
from ..utils import class_from_name, prepare_lookup
64+
from ..utils import class_from_name, get_sample_data, prepare_lookup
6565
from .base import (
6666
get_all_bss_labels_dataframe,
6767
get_bss_dataframe,
@@ -285,6 +285,43 @@ def get_label_attributes(label: str, activity_type: str) -> pd.Series:
285285
return pd.Series(attributes).fillna(pd.NA)
286286

287287

288+
def get_all_label_attributes(labels: pd.Series, activity_type: str, country_code: str | None) -> pd.DataFrame:
289+
"""
290+
Return a DataFrame of the attributes for all of the labels in the supplied Series.
291+
292+
The Product, Unit of Measure and Season attributes are processed using the relevant Lookup classes so that the
293+
resulting DataFrame contains the correct identifiers for these attributes.
294+
295+
The country_code parameter is optional so that this function can be used to test individual labels,
296+
but it should be provided when processing a BSS because the Season lookup is country-specific.
297+
"""
298+
# Prepare the lookups, so they cache the individual results
299+
classifiedproductlookup = ClassifiedProductLookup()
300+
unitofmeasurelookup = UnitOfMeasureLookup(
301+
require_match=False # It is possible that there won't be any Unit of Measure matches, e.g. for OtherCashIncome
302+
)
303+
seasonnamelookup = SeasonNameLookup(
304+
require_match=False # It is possible that there won't be any Season matches, e.g. for OtherCashIncome
305+
)
306+
307+
# Build a dataframe of the attributes for each Label, including lookups for Product, Unit of Measure and Season
308+
all_label_attributes = labels.apply(lambda x: get_label_attributes(x, activity_type)).fillna("")
309+
all_label_attributes = classifiedproductlookup.do_lookup(all_label_attributes, "product_id", "product_id")
310+
all_label_attributes["product_id"] = all_label_attributes["product_id"].replace(pd.NA, None)
311+
all_label_attributes = unitofmeasurelookup.do_lookup(
312+
all_label_attributes, "unit_of_measure_id", "unit_of_measure_id"
313+
)
314+
all_label_attributes["unit_of_measure_id"] = all_label_attributes["unit_of_measure_id"].replace(pd.NA, None)
315+
# Add the country_id because it is required for the Season lookup
316+
if country_code:
317+
all_label_attributes["country_id"] = country_code
318+
all_label_attributes = seasonnamelookup.do_lookup(all_label_attributes, "season", "season")
319+
all_label_attributes["season"] = all_label_attributes["season"].replace(pd.NA, None)
320+
# Make sure we keep the same index so we can match by row number
321+
all_label_attributes.index = labels.index
322+
return all_label_attributes
323+
324+
288325
def get_instances_from_dataframe(
289326
context: AssetExecutionContext,
290327
df: pd.DataFrame,
@@ -320,36 +357,19 @@ def get_instances_from_dataframe(
320357
livelihood_zone_baseline.reference_year_end_date.isoformat(),
321358
]
322359

323-
# Prepare the lookups, so they cache the individual results
324-
seasonnamelookup = SeasonNameLookup()
325-
unitofmeasurelookup = UnitOfMeasureLookup()
360+
# Save the identifier for Season 2 because we need it when creating MilkProduction instances
361+
season2_name = SeasonNameLookup().get("Season 2", country_id=livelihood_zone_baseline.livelihood_zone.country_id)
362+
363+
# Prepare a lookup for ClassifiedProduct, so it caches and reuses the results of .get() lookups
326364
classifiedproductlookup = ClassifiedProductLookup()
327365

328366
# Get a dataframe of the Wealth Groups for each column
329367
wealth_group_df = get_wealth_group_dataframe(df, livelihood_zone_baseline, worksheet_name, partition_key)
330368

331-
# Prepare the label column for matching against the label_map
332-
all_label_attributes = df["A"].apply(lambda x: get_label_attributes(x, activity_type)).fillna("")
333-
all_label_attributes = classifiedproductlookup.do_lookup(all_label_attributes, "product_id", "product_id")
334-
all_label_attributes["product_id"] = all_label_attributes["product_id"].replace(pd.NA, None)
335-
try:
336-
all_label_attributes = unitofmeasurelookup.do_lookup(
337-
all_label_attributes, "unit_of_measure_id", "unit_of_measure_id"
338-
)
339-
all_label_attributes["unit_of_measure_id"] = all_label_attributes["unit_of_measure_id"].replace(pd.NA, None)
340-
except ValueError:
341-
# It is possible that there won't be any Unit of Measure matches, e.g. for OtherCashIncome
342-
pass
343-
# Add the country_id because it is required for the Season lookup
344-
all_label_attributes["country_id"] = livelihood_zone_baseline.livelihood_zone.country_id
345-
try:
346-
all_label_attributes = seasonnamelookup.do_lookup(all_label_attributes, "season", "season")
347-
all_label_attributes["season"] = all_label_attributes["season"].replace(pd.NA, None)
348-
except ValueError:
349-
# It is possible that there won't be any Season matches, e.g. for OtherCashIncome
350-
pass
351-
# Make sure we keep the same index so we can match by row number
352-
all_label_attributes.index = df.index
369+
# Get a dataframe of the attributes for each label in column A
370+
all_label_attributes = get_all_label_attributes(
371+
df["A"], activity_type, livelihood_zone_baseline.livelihood_zone.country_id
372+
)
353373

354374
# Check that we recognize all of the activity labels
355375
allow_unrecognized_labels = True
@@ -458,31 +478,23 @@ def get_instances_from_dataframe(
458478
"Found Livelihood Activities from row %s, but there is no Livelihood Strategy defined." % row
459479
)
460480

461-
# Copy the product_id for MilkProduction and ButterProduction the previous livelihood strategy if
462-
# necessary.
481+
# Copy the product_id for MilkProduction and ButterProduction from the previous livelihood strategy
482+
# if necessary.
463483
if (
464484
livelihood_strategy["strategy_type"] in ["MilkProduction", "ButterProduction"]
465485
and ("product_id" not in livelihood_strategy or not livelihood_strategy["product_id"])
466-
and livelihood_strategy["season"]
467-
== seasonnamelookup.get(
468-
"Season 2",
469-
country_id=livelihood_zone_baseline.livelihood_zone.country_id,
470-
)
486+
and livelihood_strategy["season"] == season2_name
471487
and previous_livelihood_strategy
472488
and previous_livelihood_strategy["product_id"]
473489
):
474490
livelihood_strategy["attribute_rows"]["product_id"] = row
475491
livelihood_strategy["product_id"] = previous_livelihood_strategy["product_id"]
476492

477-
# Copy the milking_animals for camels and cattle from the previous livelihood activities if
478-
# necessary.
493+
# Copy the milking_animals for camels and cattle from the previous livelihood activities
494+
# if necessary.
479495
if (
480496
livelihood_strategy["strategy_type"] == "MilkProduction"
481-
and livelihood_strategy["season"]
482-
== seasonnamelookup.get(
483-
"Season 2",
484-
country_id=livelihood_zone_baseline.livelihood_zone.country_id,
485-
)
497+
and livelihood_strategy["season"] == season2_name
486498
and previous_livelihood_activities_for_strategy
487499
):
488500
# Assertion to prevent linting from complaining about possible None values
@@ -663,20 +675,15 @@ def get_instances_from_dataframe(
663675
"season" not in livelihood_strategy or not livelihood_strategy["season"]
664676
):
665677
strategy_is_valid = False
678+
# Include the header rows so that we can see which Wealth Groups are affected
666679
rows = df.index[:num_header_rows].tolist() + [livelihood_strategy["row"]]
667-
error_message = "Cannot determine season from %s for %s %s on row %s for label '%s':\n%s" % (
680+
error_message = "Cannot determine season from '%s' for %s %s on row %s for label '%s':\n%s" % (
668681
livelihood_strategy["season_original"],
669682
"summary" if non_empty_summary_activities else "non-summary",
670683
livelihood_strategy["strategy_type"],
671684
livelihood_strategy["row"],
672685
livelihood_strategy["activity_label"],
673-
# Use replace/dropna/fillna so that the error message only includes the columns that
674-
# contain unwanted data.
675-
df.loc[rows]
676-
.replace("", pd.NA)
677-
.dropna(axis="columns", subset=livelihood_strategy["row"])
678-
.fillna("")
679-
.to_markdown(),
686+
get_sample_data(df, rows).to_markdown(),
680687
)
681688
if not non_empty_summary_activities:
682689
# No summary activities so we don't need to log an error, a warning is sufficient
@@ -690,20 +697,18 @@ def get_instances_from_dataframe(
690697
"product_id" not in livelihood_strategy or not livelihood_strategy["product_id"]
691698
):
692699
strategy_is_valid = False
700+
# Include the header rows so that we can see which Wealth Groups are affected
693701
rows = df.index[:num_header_rows].tolist() + [livelihood_strategy["row"]]
694-
error_message = "Cannot determine product_id from %s for %s %s on row %s for label '%s':\n%s" % (
695-
livelihood_strategy["product_id_original"],
696-
"summary" if non_empty_summary_activities else "non-summary",
697-
livelihood_strategy["strategy_type"],
698-
livelihood_strategy["row"],
699-
livelihood_strategy["activity_label"],
700-
# Use replace/dropna/fillna so that the error message only includes the columns that
701-
# contain unwanted data.
702-
df.loc[rows]
703-
.replace("", pd.NA)
704-
.dropna(axis="columns", subset=livelihood_strategy["row"])
705-
.fillna("")
706-
.to_markdown(),
702+
error_message = (
703+
"Cannot determine product_id from '%s' for %s %s on row %s for label '%s':\n%s"
704+
% (
705+
livelihood_strategy["product_id_original"],
706+
"summary" if non_empty_summary_activities else "non-summary",
707+
livelihood_strategy["strategy_type"],
708+
livelihood_strategy["row"],
709+
livelihood_strategy["activity_label"],
710+
get_sample_data(df, rows).to_markdown(),
711+
)
707712
)
708713
if not non_empty_summary_activities:
709714
# No summary activities so we don't need to log an error, a warning is sufficient
@@ -841,8 +846,8 @@ def get_instances_from_dataframe(
841846
and not livelihood_strategy["product_id"].startswith(value)
842847
and not value == "L02129AA"
843848
):
844-
raise ValueError(
845-
"Found duplicate value %s from row %s for existing attribute %s with value %s from row %s"
849+
errors.append(
850+
"Found different value '%s' from row %s for existing attribute '%s' with value '%s' from row %s"
846851
% (
847852
value,
848853
row,
@@ -918,6 +923,7 @@ def get_instances_from_dataframe(
918923
and livelihood_strategy["product_id"]
919924
!= product_name_df["product_id"].dropna().iloc[0]
920925
):
926+
# Include the header rows so that we can see which Wealth Groups are affected
921927
rows = df.index[:num_header_rows].tolist() + [row]
922928
errors.append(
923929
"Found different products %s and %s in label and other columns on row %s for label '%s':\n%s"
@@ -926,32 +932,21 @@ def get_instances_from_dataframe(
926932
product_name_df["product_id"].iloc[0],
927933
row,
928934
label,
929-
# Use replace/dropna/fillna so that the error message only includes the columns that
930-
# contain unwanted data.
931-
df.loc[rows]
932-
.replace("", pd.NA)
933-
.dropna(axis="columns", subset=row)
934-
.fillna("")
935-
.to_markdown(),
935+
get_sample_data(df, rows).to_markdown(),
936936
)
937937
)
938938

939939
except ValueError:
940940
if not livelihood_strategy["product_id"]:
941+
# Include the header rows so that we can see which Wealth Groups are affected
941942
rows = df.index[:num_header_rows].tolist() + [row]
942943
errors.append(
943944
"Failed to identify product from %s on row %s for label '%s':\n%s"
944945
% (
945946
", ".join(product_name_df["product__name"].dropna().astype(str).unique()),
946947
row,
947948
label,
948-
# Use replace/dropna/fillna so that the error message only includes the columns that
949-
# contain unwanted data.
950-
df.loc[rows]
951-
.replace("", pd.NA)
952-
.dropna(axis="columns", subset=row)
953-
.fillna("")
954-
.to_markdown(),
949+
get_sample_data(df, rows).to_markdown(),
955950
)
956951
)
957952

@@ -984,43 +979,35 @@ def get_instances_from_dataframe(
984979
# Livelihood Activity without containing any actual data.
985980
values = df.loc[row, "B":].replace("", pd.NA).dropna().astype(str).str.strip().unique()
986981
if values.size > 1 or values[0] not in ["0", "1"]:
987-
# Include the header rows as well as the current row in the error message to aid trouble-shooting
982+
# Include the header rows so that we can see which Wealth Groups are affected
988983
rows = df.index[:num_header_rows].tolist() + [row]
989984
errors.append(
990985
"Found values %s without an identified attribute on row %s for label '%s':\n%s"
991986
% (
992987
", ".join(values),
993988
row,
994989
label,
995-
# Use replace/dropna/fillna so that the error message only includes the columns that
996-
# contain unwanted data.
997-
df.loc[rows]
998-
.replace("", pd.NA)
999-
.dropna(axis="columns", subset=row)
1000-
.fillna("")
1001-
.to_markdown(),
990+
get_sample_data(df, rows).to_markdown(),
1002991
)
1003992
)
1004993

1005994
# If the activity label that marks the start of a Livelihood Strategy is not
1006-
# returned by `ActivityLabel.objects.filter(status=LabelStatus.COMPLETE)`,
1007-
# and hence is not in the `activity_label_map`, then repeated labels like
995+
# recognized by the get_label_attributes lookup, then repeated labels like
1008996
# `kcals (%)` will appear to be duplicate attributes for the previous
1009-
# `1ivelihood_strategy`. Therefore, if we have `allow_unrecognized_labels` we
1010-
# need to ignore the duplicates, but if we don't, we should raise an error.
997+
# LivelihoodStrategy. Therefore, if we have `allow_unrecognized_labels` we
998+
# need to ignore the duplicates, and if we don't we should raise an error.
1011999
elif activity_attribute in livelihood_strategy["attribute_rows"]:
10121000
if allow_unrecognized_labels:
10131001
# Skip to the next row
10141002
continue
10151003
else:
10161004
errors.append(
1017-
"Found duplicate value %s for existing attribute %s with value %s on row %s for label '%s'"
1005+
"Found duplicate attribute '%s' for label '%s' in row %s that was already found in row %s for this LivelihoodStrategy"
10181006
% (
1019-
value,
1020-
attribute,
1021-
livelihood_strategy[attribute],
1022-
row,
1007+
activity_attribute,
10231008
label,
1009+
row,
1010+
livelihood_strategy["attribute_rows"][activity_attribute],
10241011
)
10251012
)
10261013

pipelines/utils.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,31 @@ def get_index(search_text: str | list[str], data: pd.Series, offset: int = 0) ->
5959
return result
6060

6161

62+
def get_sample_data(df: pd.DataFrame, rows: list[int], max_columns: int = 10) -> pd.DataFrame:
63+
"""
64+
Get a sample of the data to include in an error message.
65+
66+
The BSS contains many columns, and including all of them in an error
67+
message makes the message unreadable, so return the first and last columns
68+
up to the specified number of columns.
69+
"""
70+
# Use replace/dropna/fillna so that the sample only includes the columns that have data
71+
sample_data = df.loc[rows].replace("", pd.NA).dropna(axis="columns", subset=rows[-1]).fillna("")
72+
# Don't show more than max_columns in the error message, to make the message more readable
73+
leading_cols = max_columns // 2
74+
if sample_data.shape[1] > max_columns:
75+
sample_data = pd.concat(
76+
[
77+
sample_data.iloc[:, :leading_cols],
78+
# Add a column of "..." to indicate that there are more columns
79+
pd.DataFrame(["..."] * sample_data.shape[0], columns=["..."], index=sample_data.index),
80+
sample_data.iloc[:, (leading_cols + 1 - max_columns) :],
81+
],
82+
axis="columns",
83+
)
84+
return sample_data
85+
86+
6287
def prepare_lookup(data: str | list[str] | pd.Series | pd.DataFrame) -> pd.Series | pd.DataFrame:
6388
"""
6489
Prepare a Series or DataFrame for lookup operations by converting to lowercase strings and stripping whitespace.

0 commit comments

Comments
 (0)