|
14 | 14 | import re |
15 | 15 | from typing import List, Dict, Optional, Any, Set, Tuple |
16 | 16 |
|
| 17 | +# Import composite key utilities from shared module |
| 18 | +from _composite_keys import ( |
| 19 | + get_composite_key_column_name, |
| 20 | + check_dense_overflow, |
| 21 | + should_use_sparse, |
| 22 | + generate_dense_cpp_expression, |
| 23 | + compute_composite_key_dense, |
| 24 | + compute_composite_key_sparse, |
| 25 | + compute_composite_key_auto, |
| 26 | +) |
| 27 | + |
17 | 28 |
|
18 | 29 | __all__ = [ |
19 | 30 | # Low-level utilities |
|
31 | 42 | 'get_join_columns_for_snapshot', |
32 | 43 | 'cache_to_snapshot', |
33 | 44 |
|
34 | | - # Sparse key support |
| 45 | + # Composite key utilities (re-exported from _composite_keys) |
| 46 | + 'get_composite_key_column_name', |
| 47 | + 'check_dense_overflow', |
35 | 48 | 'should_use_sparse', |
| 49 | + 'generate_dense_cpp_expression', |
36 | 50 | 'compute_composite_key_dense', |
37 | 51 | 'compute_composite_key_sparse', |
38 | 52 | 'compute_composite_key_auto', |
@@ -358,247 +372,6 @@ def extract_dependencies(expr: str, known_names: Set[str] = None) -> List[str]: |
358 | 372 | return sorted(candidates) |
359 | 373 |
|
360 | 374 |
|
361 | | -# ============================================================================= |
362 | | -# Sparse Key Support for Multi-Key Joins |
363 | | -# ============================================================================= |
364 | | - |
365 | | -def should_use_sparse(df, key_columns): |
366 | | - """ |
367 | | - Determine if sparse key mapping should be used instead of compact linearization. |
368 | | - |
369 | | - Use sparse mapping when: |
370 | | - 1. Compact range exceeds int32 (2^31), OR |
371 | | - 2. Compact range is >10x wasteful compared to actual unique combinations |
372 | | - |
373 | | - Parameters |
374 | | - ---------- |
375 | | - df : DataFrame |
376 | | - DataFrame with key columns |
377 | | - key_columns : list of str |
378 | | - Column names forming the composite key |
379 | | - |
380 | | - Returns |
381 | | - ------- |
382 | | - bool |
383 | | - True if sparse mapping should be used |
384 | | - """ |
385 | | - import numpy as np |
386 | | - |
387 | | - max_vals = [int(df[k].max()) + 1 for k in key_columns] |
388 | | - compact_range = np.prod(max_vals, dtype=np.int64) |
389 | | - n_unique = np.prod([df[k].nunique() for k in key_columns]) |
390 | | - |
391 | | - return compact_range > 2**31 or compact_range > 10 * n_unique |
392 | | - |
393 | | - |
394 | | -def get_composite_key_column_name(subframe_name: str) -> str: |
395 | | - """ |
396 | | - Get the standard column name for a composite key. |
397 | | - |
398 | | - Parameters |
399 | | - ---------- |
400 | | - subframe_name : str |
401 | | - Name of the subframe |
402 | | - |
403 | | - Returns |
404 | | - ------- |
405 | | - str |
406 | | - Column name like '__adf_key_DTrack0__' |
407 | | - """ |
408 | | - return f"__adf_key_{subframe_name}__" |
409 | | - |
410 | | - |
411 | | -def check_dense_overflow(max_values: list) -> tuple: |
412 | | - """ |
413 | | - Check if dense linearization would overflow int64. |
414 | | - |
415 | | - Parameters |
416 | | - ---------- |
417 | | - max_values : list of int |
418 | | - Maximum values for each key column (max + 1 for range) |
419 | | - |
420 | | - Returns |
421 | | - ------- |
422 | | - tuple |
423 | | - (is_safe, compact_range) - is_safe is True if no overflow |
424 | | - """ |
425 | | - import numpy as np |
426 | | - |
427 | | - # Calculate product carefully to detect overflow |
428 | | - compact_range = 1 |
429 | | - for mv in max_values: |
430 | | - # Check if multiplication would overflow int64 |
431 | | - if compact_range > 0 and mv > (2**63 - 1) // compact_range: |
432 | | - return False, float('inf') |
433 | | - compact_range *= mv |
434 | | - |
435 | | - return compact_range <= 2**63 - 1, compact_range |
436 | | - |
437 | | - |
438 | | -def generate_dense_cpp_expression(key_columns: list, max_values: list) -> str: |
439 | | - """ |
440 | | - Generate C++ expression for dense composite key computation. |
441 | | - |
442 | | - Used for runtime generation via rdf.Define(). |
443 | | - |
444 | | - Parameters |
445 | | - ---------- |
446 | | - key_columns : list of str |
447 | | - Column names forming the composite key |
448 | | - max_values : list of int |
449 | | - Maximum values for each key column (max + 1 for range) |
450 | | - |
451 | | - Returns |
452 | | - ------- |
453 | | - str |
454 | | - C++ expression like "k0 + k1 * 10 + k2 * 10 * 5" |
455 | | - |
456 | | - Examples |
457 | | - -------- |
458 | | - >>> generate_dense_cpp_expression(['side', 'row'], [2, 152]) |
459 | | - 'side + row * 2' |
460 | | - >>> generate_dense_cpp_expression(['a', 'b', 'c'], [10, 20, 30]) |
461 | | - 'a + b * 10 + c * 10 * 20' |
462 | | - """ |
463 | | - if len(key_columns) == 1: |
464 | | - return key_columns[0] |
465 | | - |
466 | | - # First term: just the first column |
467 | | - parts = [key_columns[0]] |
468 | | - |
469 | | - # Subsequent terms: column * product of previous max values |
470 | | - multiplier_parts = [] |
471 | | - for i in range(1, len(key_columns)): |
472 | | - multiplier_parts.append(str(max_values[i-1])) |
473 | | - multiplier = " * ".join(multiplier_parts) |
474 | | - parts.append(f"{key_columns[i]} * {multiplier}") |
475 | | - |
476 | | - return " + ".join(parts) |
477 | | - |
478 | | - |
479 | | -def compute_composite_key_dense(df, key_columns, max_values=None): |
480 | | - """ |
481 | | - Compute composite key using compact linearization. |
482 | | - |
483 | | - __adf_key__ = k0 + k1*max0 + k2*max0*max1 + ... |
484 | | - |
485 | | - Parameters |
486 | | - ---------- |
487 | | - df : DataFrame |
488 | | - DataFrame with key columns |
489 | | - key_columns : list of str |
490 | | - Column names forming the composite key |
491 | | - max_values : list of int, optional |
492 | | - Maximum values for each key column. If None, computed from data. |
493 | | - |
494 | | - Returns |
495 | | - ------- |
496 | | - np.ndarray |
497 | | - Int64 composite keys |
498 | | - """ |
499 | | - import numpy as np |
500 | | - |
501 | | - if max_values is None: |
502 | | - max_values = [int(df[k].max()) + 1 for k in key_columns] |
503 | | - |
504 | | - key = df[key_columns[0]].values.astype(np.int64) |
505 | | - multiplier = max_values[0] |
506 | | - |
507 | | - for i, col in enumerate(key_columns[1:], 1): |
508 | | - key = key + df[col].values.astype(np.int64) * multiplier |
509 | | - multiplier *= max_values[i] |
510 | | - |
511 | | - return key |
512 | | - |
513 | | - |
514 | | -def compute_composite_key_sparse(main_df, sub_df, key_columns): |
515 | | - """ |
516 | | - Compute composite key using vectorized unique value mapping. |
517 | | - |
518 | | - Works for any key distribution (dense or sparse). |
519 | | - Uses np.unique(axis=0) for efficient vectorized computation. |
520 | | - |
521 | | - Parameters |
522 | | - ---------- |
523 | | - main_df : DataFrame |
524 | | - Main DataFrame with key columns |
525 | | - sub_df : DataFrame |
526 | | - Subframe DataFrame with key columns |
527 | | - key_columns : list of str |
528 | | - Column names forming the composite key |
529 | | - |
530 | | - Returns |
531 | | - ------- |
532 | | - main_keys : np.ndarray |
533 | | - Int64 composite keys for main DataFrame |
534 | | - sub_keys : np.ndarray |
535 | | - Int64 composite keys for subframe DataFrame |
536 | | - |
537 | | - Notes |
538 | | - ----- |
539 | | - Both DataFrames use the same mapping, ensuring keys match for joins. |
540 | | - Complexity: O(n log n) via np.unique, fully vectorized. |
541 | | - """ |
542 | | - import numpy as np |
543 | | - |
544 | | - # Combine main and sub to build shared mapping |
545 | | - main_vals = main_df[key_columns].to_numpy() |
546 | | - sub_vals = sub_df[key_columns].to_numpy() |
547 | | - all_vals = np.vstack([main_vals, sub_vals]) |
548 | | - |
549 | | - # Get unique rows and inverse mapping |
550 | | - _, inverse = np.unique(all_vals, axis=0, return_inverse=True) |
551 | | - |
552 | | - # Split back into main and sub |
553 | | - n_main = len(main_df) |
554 | | - main_keys = inverse[:n_main].astype(np.int64) |
555 | | - sub_keys = inverse[n_main:].astype(np.int64) |
556 | | - |
557 | | - return main_keys, sub_keys |
558 | | - |
559 | | - |
560 | | -def compute_composite_key_auto(main_df, sub_df, key_columns): |
561 | | - """ |
562 | | - Automatically choose dense or sparse key computation. |
563 | | - |
564 | | - Uses dense linearization when key ranges are compact, |
565 | | - sparse mapping when ranges are too large or wasteful. |
566 | | - |
567 | | - Parameters |
568 | | - ---------- |
569 | | - main_df : DataFrame |
570 | | - Main DataFrame with key columns |
571 | | - sub_df : DataFrame |
572 | | - Subframe DataFrame with key columns |
573 | | - key_columns : list of str |
574 | | - Column names forming the composite key |
575 | | - |
576 | | - Returns |
577 | | - ------- |
578 | | - main_keys : np.ndarray |
579 | | - Int64 composite keys for main DataFrame |
580 | | - sub_keys : np.ndarray |
581 | | - Int64 composite keys for subframe DataFrame |
582 | | - method : str |
583 | | - 'dense' or 'sparse' indicating which method was used |
584 | | - """ |
585 | | - import numpy as np |
586 | | - import pandas as pd |
587 | | - |
588 | | - # Check if sparse is needed using combined data |
589 | | - combined = pd.concat([main_df[key_columns], sub_df[key_columns]], ignore_index=True) |
590 | | - |
591 | | - if should_use_sparse(combined, key_columns): |
592 | | - main_keys, sub_keys = compute_composite_key_sparse(main_df, sub_df, key_columns) |
593 | | - return main_keys, sub_keys, 'sparse' |
594 | | - else: |
595 | | - # Compute shared max values from union |
596 | | - max_values = [int(combined[k].max()) + 1 for k in key_columns] |
597 | | - main_keys = compute_composite_key_dense(main_df, key_columns, max_values) |
598 | | - sub_keys = compute_composite_key_dense(sub_df, key_columns, max_values) |
599 | | - return main_keys, sub_keys, 'dense' |
600 | | - |
601 | | - |
602 | 375 | # ============================================================================= |
603 | 376 | # Dependency Resolution |
604 | 377 | # ============================================================================= |
|
0 commit comments