|
| 1 | +import re |
| 2 | +import click |
| 3 | +from pyspark.sql import SparkSession, functions as f |
| 4 | + |
| 5 | +@click.command() |
| 6 | +@click.option('--target_db', default='curated') |
| 7 | +@click.option('--target_table', default='client_communication_preferences_journal') |
| 8 | +@click.option('--as_of', required=True) |
| 9 | +def main(target_db, target_table, as_of): |
| 10 | + # Validate as_of parameter |
| 11 | + if not re.match(r'^\d{8}$', as_of): |
| 12 | + raise ValueError("Invalid as_of format. Expected YYYYMMDD.") |
| 13 | + |
| 14 | + # Initialize SparkSession |
| 15 | + spark = SparkSession.builder \ |
| 16 | + .appName("DBWriteApp") \ |
| 17 | + .enableHiveSupport() \ # enable if you are writing to Hive or Spark SQL catalog |
| 18 | + .getOrCreate() |
| 19 | + |
| 20 | + qry = f""" |
| 21 | + WITH blueshift_active_email_client_agg AS ( |
| 22 | + SELECT client_id, |
| 23 | + MAX(last_opened_at) AS last_opened_at, |
| 24 | + MIN(first_opened_at) AS first_opened_at |
| 25 | + FROM blueshift.campaign_activity_kpis |
| 26 | + WHERE DATE(last_opened_at) <= TO_DATE('{as_of}', 'yyyyMMdd') |
| 27 | + OR last_opened_at IS NULL |
| 28 | + OR DATE(first_opened_at) <= TO_DATE('{as_of}', 'yyyyMMdd') |
| 29 | + GROUP BY 1 |
| 30 | + ) |
| 31 | + -- more CTEs / query goes here |
| 32 | + SELECT * FROM blueshift_active_email_client_agg |
| 33 | + """ |
| 34 | + |
| 35 | + # Run the query |
| 36 | + df = spark.sql(qry).withColumn('start_date', f.col('first_opened_at').cast('timestamp')) |
| 37 | + |
| 38 | + # Prepare full table name |
| 39 | + full_table_name = f"{target_db}.{target_table}" |
| 40 | + |
| 41 | + # Save DataFrame to specified table (overwrite, append, etc. as appropriate) |
| 42 | + df.write.mode('append').saveAsTable(full_table_name) |
| 43 | + |
| 44 | + spark.stop() |
| 45 | + |
| 46 | +if __name__ == "__main__": |
| 47 | + main() |
0 commit comments