|
58 | 58 | import django |
59 | 59 | import pandas as pd |
60 | 60 | from dagster import AssetExecutionContext, MetadataValue, Output, asset |
| 61 | +from django.db.models.functions import Lower |
| 62 | +from upath import UPath |
61 | 63 |
|
62 | 64 | from ..configs import BSSMetadataConfig |
63 | 65 | from ..partitions import bss_instances_partitions_def |
@@ -255,6 +257,49 @@ def get_livelihood_activity_regexes() -> list: |
255 | 257 | return compiled_regexes |
256 | 258 |
|
257 | 259 |
|
| 260 | +@functools.cache |
| 261 | +def get_livelihood_activity_regular_expression_attributes(label: str) -> dict: |
| 262 | + """ |
| 263 | + Return a dict of the attributes for a well-known Livelihood Activity label using regular expression mactches. |
| 264 | + """ |
| 265 | + label = prepare_lookup(label) |
| 266 | + attributes = { |
| 267 | + "activity_label": None, |
| 268 | + "strategy_type": None, |
| 269 | + "is_start": None, |
| 270 | + "product_id": None, |
| 271 | + "unit_of_measure_id": None, |
| 272 | + "season": None, |
| 273 | + "additional_identifier": None, |
| 274 | + "attribute": None, |
| 275 | + "notes": None, |
| 276 | + } |
| 277 | + for pattern, strategy_type, is_start, attribute in get_livelihood_activity_regexes(): |
| 278 | + match = pattern.fullmatch(label) |
| 279 | + if match: |
| 280 | + attributes.update(match.groupdict()) |
| 281 | + attributes["activity_label"] = label |
| 282 | + attributes["strategy_type"] = strategy_type |
| 283 | + attributes["is_start"] = is_start |
| 284 | + if isinstance(attribute, dict): |
| 285 | + # Attribute contains a dict of attributes, e.g. notes, etc. |
| 286 | + attributes.update(attribute) |
| 287 | + else: |
| 288 | + # Attribute is a string containing the attribute name |
| 289 | + attributes["attribute"] = attribute |
| 290 | + # Save the matched pattern to aid trouble-shooting |
| 291 | + attributes["notes"] = ( |
| 292 | + attributes["notes"] + " " + f' r"{pattern.pattern}"' |
| 293 | + if attributes["notes"] |
| 294 | + else f'r"{pattern.pattern}"' |
| 295 | + ) |
| 296 | + # Return the first matching pattern |
| 297 | + return attributes |
| 298 | + |
| 299 | + # Didn't match any patterns, so return empty attributes |
| 300 | + return attributes |
| 301 | + |
| 302 | + |
258 | 303 | @functools.cache |
259 | 304 | def get_livelihood_activity_label_map(activity_type: str) -> dict[str, dict]: |
260 | 305 | """ |
@@ -306,40 +351,9 @@ def get_label_attributes(label: str, activity_type: str) -> pd.Series: |
306 | 351 | try: |
307 | 352 | return pd.Series(get_livelihood_activity_label_map(activity_type)[label]) |
308 | 353 | except KeyError: |
309 | | - # No entry in the ActivityLabel model for this label, so attempt to match the label against the regexes |
310 | | - attributes = { |
311 | | - "activity_label": None, |
312 | | - "strategy_type": None, |
313 | | - "is_start": None, |
314 | | - "product_id": None, |
315 | | - "unit_of_measure_id": None, |
316 | | - "season": None, |
317 | | - "additional_identifier": None, |
318 | | - "attribute": None, |
319 | | - "notes": None, |
320 | | - } |
321 | | - for pattern, strategy_type, is_start, attribute in get_livelihood_activity_regexes(): |
322 | | - match = pattern.fullmatch(label) |
323 | | - if match: |
324 | | - attributes.update(match.groupdict()) |
325 | | - attributes["activity_label"] = label |
326 | | - attributes["strategy_type"] = strategy_type |
327 | | - attributes["is_start"] = is_start |
328 | | - if isinstance(attribute, dict): |
329 | | - # Attribute contains a dict of attributes, e.g. notes, etc. |
330 | | - attributes.update(attribute) |
331 | | - else: |
332 | | - # Attribute is a string containing the attribute name |
333 | | - attributes["attribute"] = attribute |
334 | | - # Save the matched pattern to aid trouble-shooting |
335 | | - attributes["notes"] = ( |
336 | | - attributes["notes"] + " " + f' r"{pattern.pattern}"' |
337 | | - if attributes["notes"] |
338 | | - else f'r"{pattern.pattern}"' |
339 | | - ) |
340 | | - return pd.Series(attributes) |
341 | | - # No pattern matched |
342 | | - return pd.Series(attributes).fillna(pd.NA) |
| 354 | + # No entry in the ActivityLabel model instance for this label, so attempt to match against the regexes |
| 355 | + attributes = get_livelihood_activity_regular_expression_attributes(label) |
| 356 | + return pd.Series(attributes) |
343 | 357 |
|
344 | 358 |
|
345 | 359 | def get_all_label_attributes(labels: pd.Series, activity_type: str, country_code: str | None) -> pd.DataFrame: |
@@ -385,6 +399,88 @@ def get_all_label_attributes(labels: pd.Series, activity_type: str, country_code |
385 | 399 | return all_label_attributes |
386 | 400 |
|
387 | 401 |
|
| 402 | +@asset |
| 403 | +def livelihood_activity_label_recognition_dataframe( |
| 404 | + context: AssetExecutionContext, |
| 405 | + config: BSSMetadataConfig, |
| 406 | + all_livelihood_activity_labels_dataframe: pd.DataFrame, |
| 407 | + all_other_cash_income_labels_dataframe: pd.DataFrame, |
| 408 | + all_wild_foods_labels_dataframe: pd.DataFrame, |
| 409 | + all_livelihood_summary_labels_dataframe: pd.DataFrame, |
| 410 | +): |
| 411 | + """ |
| 412 | + A saved spreadsheet showing how each BSS label is recognized, either from the ActivityLabel model or a regex. |
| 413 | + """ |
| 414 | + # Path to the output spreadsheet |
| 415 | + p = UPath(config.bss_label_recognition_workbook, **config.bss_label_recognition_storage_options) |
| 416 | + |
| 417 | + all_livelihood_activity_labels_dataframe["activity_type"] = ( |
| 418 | + ActivityLabel.LivelihoodActivityType.LIVELIHOOD_ACTIVITY |
| 419 | + ) |
| 420 | + all_other_cash_income_labels_dataframe["activity_type"] = ActivityLabel.LivelihoodActivityType.OTHER_CASH_INCOME |
| 421 | + all_wild_foods_labels_dataframe["activity_type"] = ActivityLabel.LivelihoodActivityType.WILD_FOODS |
| 422 | + all_livelihood_summary_labels_dataframe["activity_type"] = ActivityLabel.LivelihoodActivityType.LIVELIHOOD_SUMMARY |
| 423 | + |
| 424 | + # Build a dataframe of all the Activity Labels from all BSSs |
| 425 | + all_labels_df = pd.concat( |
| 426 | + [ |
| 427 | + all_livelihood_activity_labels_dataframe, |
| 428 | + all_other_cash_income_labels_dataframe, |
| 429 | + all_wild_foods_labels_dataframe, |
| 430 | + all_livelihood_summary_labels_dataframe, |
| 431 | + ], |
| 432 | + ignore_index=True, |
| 433 | + ) |
| 434 | + |
| 435 | + # Add the regular expressions |
| 436 | + regex_attributes_df = pd.DataFrame.from_records( |
| 437 | + all_labels_df["label"].astype(str).map(get_livelihood_activity_regular_expression_attributes) |
| 438 | + ) |
| 439 | + all_labels_df = all_labels_df.join( |
| 440 | + regex_attributes_df, |
| 441 | + how="left", |
| 442 | + ) |
| 443 | + |
| 444 | + # Add the labels from the database |
| 445 | + db_labels_df = pd.DataFrame.from_records( |
| 446 | + ActivityLabel.objects.annotate(label_lower=Lower("activity_label")).values( |
| 447 | + "label_lower", |
| 448 | + "activity_type", |
| 449 | + "status", |
| 450 | + "strategy_type", |
| 451 | + "is_start", |
| 452 | + "product_id", |
| 453 | + "unit_of_measure_id", |
| 454 | + "currency_id", |
| 455 | + "season", |
| 456 | + "additional_identifier", |
| 457 | + "attribute", |
| 458 | + "notes", |
| 459 | + ) |
| 460 | + ) |
| 461 | + all_labels_df = all_labels_df.join( |
| 462 | + db_labels_df.set_index(["label_lower", "activity_type"]), |
| 463 | + on=("label_lower", "activity_type"), |
| 464 | + how="left", |
| 465 | + rsuffix="_db", |
| 466 | + lsuffix="_regex", |
| 467 | + ) |
| 468 | + |
| 469 | + # GDriveFS doesn't support updating existing files, it always create a new file with same name. |
| 470 | + # This leads to multiple files with the same name in the folder, so we delete any existing files first. |
| 471 | + if p.exists(): |
| 472 | + # @TODO This doesn't work with the current version of gdrivefs, possibly because of an error |
| 473 | + # with accessing Shared Drives. For now, we need to manually delete the old files before running |
| 474 | + # the asset again. |
| 475 | + # We need to experiment and possibly create a custom gdrivefs that reuses code from KiLuigi's GoogleDriveTarget |
| 476 | + p.unlink() |
| 477 | + |
| 478 | + # Save the dataframe to an Excel workbook |
| 479 | + with p.fs.open(p.path, mode="wb") as f: |
| 480 | + with pd.ExcelWriter(f, engine="openpyxl") as writer: |
| 481 | + all_labels_df[:50].to_excel(writer, index=False, sheet_name="All Labels") |
| 482 | + |
| 483 | + |
388 | 484 | def get_instances_from_dataframe( |
389 | 485 | context: AssetExecutionContext, |
390 | 486 | config: BSSMetadataConfig, |
@@ -436,10 +532,14 @@ def get_instances_from_dataframe( |
436 | 532 | ) |
437 | 533 |
|
438 | 534 | # Check that we recognize all of the activity labels |
| 535 | + # The unrecognized labels are rows after the header rows where column A is not blank, |
| 536 | + # but the matching row in all_label_attributes dataframe has a blank activity_label. |
| 537 | + # Group the resulting dataframe so that we have a label and a list of the rows where it occurs. |
439 | 538 | allow_unrecognized_labels = True |
440 | 539 | unrecognized_labels = ( |
441 | 540 | df.iloc[num_header_rows:][ |
442 | | - (df["A"].iloc[num_header_rows:] != "") & (all_label_attributes.iloc[num_header_rows:, 0].isna()) |
| 541 | + (df["A"].iloc[num_header_rows:] != "") |
| 542 | + & (all_label_attributes.iloc[num_header_rows:]["activity_label"] == "") |
443 | 543 | ] |
444 | 544 | .groupby("A") |
445 | 545 | .apply(lambda x: ", ".join(x.index.astype(str)), include_groups=False) |
|
0 commit comments