4040
4141PUF_SUBSAMPLE_TARGET = 20_000
4242PUF_TOP_PERCENTILE = 99.5
43+ FORBES_SYNTHETIC_FINANCIAL_THRESHOLD = 250_000_000
44+ PUF_METADATA_MISSING_TOP_TAIL_THRESHOLD = 10_000_000
45+ FORBES_METADATA_MARKER_THRESHOLDS = (
46+ ("forbes_unit_id" , 0 ),
47+ ("forbes_replicate_id" , 0 ),
48+ ("forbes_rank" , 1 ),
49+ )
50+ PUF_METADATA_MISSING_TOP_TAIL_VARIABLES = (
51+ "adjusted_gross_income" ,
52+ "qualified_dividend_income" ,
53+ "non_qualified_dividend_income" ,
54+ "taxable_interest_income" ,
55+ "tax_exempt_interest_income" ,
56+ "long_term_capital_gains" ,
57+ "short_term_capital_gains" ,
58+ "non_sch_d_capital_gains" ,
59+ "long_term_capital_gains_on_collectibles" ,
60+ "unrecaptured_section_1250_gain" ,
61+ "partnership_s_corp_income" ,
62+ "self_employment_income" ,
63+ "sstb_self_employment_income" ,
64+ "rental_income" ,
65+ "farm_income" ,
66+ "farm_rent_income" ,
67+ "farm_operations_income" ,
68+ "estate_income" ,
69+ "charitable_cash_donations" ,
70+ "charitable_non_cash_donations" ,
71+ )
4372
4473DEMOGRAPHIC_PREDICTORS = [
4574 "age" ,
@@ -925,6 +954,7 @@ def _run_qrf_imputation(
925954 puf_sim = Microsimulation (dataset = puf_dataset )
926955
927956 puf_agi = puf_sim .calculate ("adjusted_gross_income" , map_to = "person" ).values
957+ puf_data = puf_sim .dataset .load_dataset ()
928958
929959 X_train_full = puf_sim .calculate_dataframe (
930960 DEMOGRAPHIC_PREDICTORS + IMPUTED_VARIABLES
@@ -936,6 +966,63 @@ def _run_qrf_imputation(
936966
937967 del puf_sim
938968
969+ tax_unit_ids = _period_array (puf_data , "tax_unit_id" , time_period )
970+ has_forbes_metadata = _has_forbes_metadata (
971+ puf_data ,
972+ time_period ,
973+ expected_length = 0 if tax_unit_ids is None else len (tax_unit_ids ),
974+ )
975+ forbes_person_mask = _forbes_person_training_mask (
976+ puf_data ,
977+ time_period ,
978+ n_persons = len (puf_agi ),
979+ )
980+ if has_forbes_metadata :
981+ top_tail_threshold = FORBES_SYNTHETIC_FINANCIAL_THRESHOLD
982+ top_tail_label = "Forbes"
983+ low_weight_mask = np .ones_like (forbes_person_mask , dtype = bool )
984+ else :
985+ top_tail_threshold = PUF_METADATA_MISSING_TOP_TAIL_THRESHOLD
986+ top_tail_label = "metadata-missing top-tail"
987+ low_weight_mask = np .ones_like (forbes_person_mask , dtype = bool )
988+
989+ forbes_person_mask |= low_weight_mask & (puf_agi >= top_tail_threshold )
990+ for frame in (X_train_full , X_train_override ):
991+ candidate_columns = (
992+ IMPUTED_VARIABLES + OVERRIDDEN_IMPUTED_VARIABLES
993+ if has_forbes_metadata
994+ else PUF_METADATA_MISSING_TOP_TAIL_VARIABLES
995+ )
996+ financial_columns = [
997+ column for column in candidate_columns if column in frame .columns
998+ ]
999+ if financial_columns :
1000+ forbes_person_mask |= low_weight_mask & (
1001+ frame [financial_columns ].abs ().max (axis = 1 ).to_numpy ()
1002+ >= top_tail_threshold
1003+ )
1004+ if len (forbes_person_mask ) == len (puf_agi ) and forbes_person_mask .any ():
1005+ if len (X_train_full ) != len (forbes_person_mask ) or len (X_train_override ) != len (
1006+ forbes_person_mask
1007+ ):
1008+ logger .warning (
1009+ "Skipping Forbes donor exclusion because QRF training "
1010+ "frames do not match person-level PUF metadata lengths"
1011+ )
1012+ else :
1013+ logger .info (
1014+ "Excluding %d %s person records from PUF QRF training at threshold $%s" ,
1015+ int (forbes_person_mask .sum ()),
1016+ top_tail_label ,
1017+ f"{ top_tail_threshold :,.0f} " ,
1018+ )
1019+ non_forbes_mask = ~ forbes_person_mask
1020+ puf_agi = puf_agi [non_forbes_mask ]
1021+ X_train_full = X_train_full .loc [non_forbes_mask ].reset_index (drop = True )
1022+ X_train_override = X_train_override .loc [non_forbes_mask ].reset_index (
1023+ drop = True
1024+ )
1025+
9391026 sub_idx = _stratified_subsample_index (puf_agi )
9401027 _log_stratified_subsample (
9411028 len (puf_agi ),
@@ -975,6 +1062,79 @@ def _run_qrf_imputation(
9751062 return y_full , y_override
9761063
9771064
1065+ def _period_array (
1066+ data : Dict [str , Dict [int , np .ndarray ]],
1067+ variable : str ,
1068+ time_period : int ,
1069+ ) -> Optional [np .ndarray ]:
1070+ if variable not in data :
1071+ return None
1072+ values = data [variable ]
1073+ if isinstance (values , dict ):
1074+ values = values .get (time_period , values .get (str (time_period )))
1075+ if values is None :
1076+ return None
1077+ return np .asarray (values )
1078+
1079+
1080+ def _has_forbes_metadata (
1081+ puf_data : Dict [str , Dict [int , np .ndarray ]],
1082+ time_period : int ,
1083+ expected_length : int ,
1084+ ) -> bool :
1085+ """Return whether usable Forbes synthetic-record metadata is present."""
1086+ if expected_length <= 0 :
1087+ return False
1088+ for variable , marker_threshold in FORBES_METADATA_MARKER_THRESHOLDS :
1089+ values = _period_array (puf_data , variable , time_period )
1090+ if values is None or len (values ) != expected_length :
1091+ continue
1092+ values = np .asarray (values , dtype = float )
1093+ if np .any (values >= marker_threshold ):
1094+ return True
1095+ return False
1096+
1097+
1098+ def _forbes_person_training_mask (
1099+ puf_data : Dict [str , Dict [int , np .ndarray ]],
1100+ time_period : int ,
1101+ n_persons : int ,
1102+ ) -> np .ndarray :
1103+ """Return person-level mask for synthetic Forbes top-tail PUF records."""
1104+ tax_unit_id = _period_array (puf_data , "tax_unit_id" , time_period )
1105+ person_tax_unit_id = _period_array (puf_data , "person_tax_unit_id" , time_period )
1106+ if tax_unit_id is None or person_tax_unit_id is None :
1107+ return np .zeros (n_persons , dtype = bool )
1108+ if len (person_tax_unit_id ) != n_persons :
1109+ return np .zeros (n_persons , dtype = bool )
1110+
1111+ tax_unit_forbes = np .zeros (len (tax_unit_id ), dtype = bool )
1112+ for variable , default_threshold in FORBES_METADATA_MARKER_THRESHOLDS :
1113+ values = _period_array (puf_data , variable , time_period )
1114+ if values is None or len (values ) != len (tax_unit_id ):
1115+ continue
1116+ values = np .asarray (values , dtype = float )
1117+ if default_threshold == 0 :
1118+ tax_unit_forbes |= values >= 0
1119+ else :
1120+ tax_unit_forbes |= values >= default_threshold
1121+
1122+ if not tax_unit_forbes .any ():
1123+ return np .zeros (n_persons , dtype = bool )
1124+
1125+ sorted_index = np .argsort (tax_unit_id )
1126+ sorted_tax_unit_id = tax_unit_id [sorted_index ]
1127+ sorted_tax_unit_forbes = tax_unit_forbes [sorted_index ]
1128+
1129+ positions = np .searchsorted (sorted_tax_unit_id , person_tax_unit_id )
1130+ valid = positions < len (sorted_tax_unit_id )
1131+ person_mask = np .zeros (n_persons , dtype = bool )
1132+ valid_positions = positions [valid ]
1133+ valid [valid ] = sorted_tax_unit_id [valid_positions ] == person_tax_unit_id [valid ]
1134+ person_mask [valid ] = sorted_tax_unit_forbes [positions [valid ]]
1135+ return person_mask
1136+
1137+
9781138def _stratified_subsample_index (
9791139 income : np .ndarray ,
9801140 target_n : int = PUF_SUBSAMPLE_TARGET ,
0 commit comments