From 59f4dc8d4ebd814294408956f28793aae73321c0 Mon Sep 17 00:00:00 2001 From: Harsh Mishra Date: Mon, 5 May 2025 00:11:58 +0530 Subject: [PATCH 1/8] revamp the application ci & makefile --- .github/workflows/ci.yml | 6 +++--- .gitignore | 3 +++ Makefile | 33 ++++++++++++++++++++++++--------- app.py | 2 +- 4 files changed, 31 insertions(+), 13 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b786a9c..04f1969 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -25,12 +25,12 @@ jobs: - name: Setup Node.js uses: actions/setup-node@v3 with: - node-version: 20 + node-version: 22 - name: Install Python uses: actions/setup-python@v4 with: - python-version: '3.10' + python-version: '3.11' - name: Install CDK run: | @@ -55,4 +55,4 @@ jobs: - name: Run tests run: | - make run + make test diff --git a/.gitignore b/.gitignore index d93c91b..e7d301a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ .venv/ volume/ +cdk.out +dms_sample +__pycache__/ diff --git a/Makefile b/Makefile index f9465dc..718648f 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,5 @@ +SHELL := /bin/bash + VENV_BIN ?= python3 -m venv VENV_DIR ?= .venv PIP_CMD ?= pip3 @@ -33,28 +35,41 @@ $(VENV_ACTIVATE): venv: $(VENV_ACTIVATE) ## Create a new (empty) virtual environment -start: - $(LOCAL_ENV) docker compose up --build --detach --wait +check: ## Check if all required prerequisites are available + @command -v docker > /dev/null 2>&1 || { echo "Docker is not installed."; exit 1; } + @command -v localstack > /dev/null 2>&1 || { echo "LocalStack is not installed."; exit 1; } + @command -v python > /dev/null 2>&1 || { echo "Python is not installed."; exit 1; } + @command -v cdk > /dev/null 2>&1 || { echo "AWS CDK is not installed."; exit 1; } + @command -v cdklocal > /dev/null 2>&1 || { echo "CDK Local is not installed."; exit 1; } + @echo "All required prerequisites are available." + +start: ## Start localstack + $(LOCAL_ENV) @LOCALSTACK_AUTH_TOKEN=$(LOCALSTACK_AUTH_TOKEN) docker compose up --build --detach --wait -install: venv +install: venv ## Install dependencies $(VENV_RUN); $(PIP_CMD) install -r requirements.txt -deploy: +deploy: ## Deploy the stack on LocalStack $(VENV_RUN); $(LOCAL_ENV) cdklocal bootstrap --output ./cdk.local.out $(VENV_RUN); $(LOCAL_ENV) cdklocal deploy --require-approval never --output ./cdk.local.out -deploy-aws: +deploy-aws: ## Deploy the stack on AWS $(VENV_RUN); $(CLOUD_ENV) cdk bootstrap $(VENV_RUN); $(CLOUD_ENV) cdk deploy --require-approval never -destroy: +destroy: ## Destroy the stack on LocalStack docker-compose down -destroy-aws: venv +destroy-aws: venv ## Destroy the stack on AWS $(VENV_RUN); $(CLOUD_ENV) cdk destroy --require-approval never -run: +test: ## Test the application on LocalStack $(VENV_RUN); $(LOCAL_ENV) python run.py -run-aws: +test-aws: ## Test the application on AWS $(VENV_RUN); $(CLOUD_ENV) python run.py + +logs: ## Show logs from LocalStack + @docker logs -f localstack_main > logs.txt + +.PHONY: usage install start deploy test logs destroy deploy-aws test-aws destroy-aws diff --git a/app.py b/app.py index 722c8fa..a58236b 100644 --- a/app.py +++ b/app.py @@ -3,7 +3,7 @@ from dms_sample.stack import DmsSampleStack -STACK_NAME = os.getenv("STACK_NAME", "") +STACK_NAME = os.getenv("STACK_NAME", "DMsSampleSetupStack") app = cdk.App() DmsSampleStack(app, STACK_NAME) From d5003c30cfd012a16d9867ee4bae3ead283f52f3 Mon Sep 17 00:00:00 2001 From: Harsh Mishra Date: Mon, 5 May 2025 00:15:07 +0530 Subject: [PATCH 2/8] fix auth token makefile --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 718648f..75eb173 100644 --- a/Makefile +++ b/Makefile @@ -44,7 +44,7 @@ check: ## Check if all required prerequisites are available @echo "All required prerequisites are available." start: ## Start localstack - $(LOCAL_ENV) @LOCALSTACK_AUTH_TOKEN=$(LOCALSTACK_AUTH_TOKEN) docker compose up --build --detach --wait + $(LOCAL_ENV) LOCALSTACK_AUTH_TOKEN=$(LOCALSTACK_AUTH_TOKEN) docker compose up --build --detach --wait install: venv ## Install dependencies $(VENV_RUN); $(PIP_CMD) install -r requirements.txt From bf7eac5abd1af078e9394ae4cb83e9c9d4e2df6e Mon Sep 17 00:00:00 2001 From: Harsh Mishra Date: Mon, 5 May 2025 00:21:16 +0530 Subject: [PATCH 3/8] extra steps to report failure and show logs --- .github/workflows/ci.yml | 31 +++++++++++++++++++++++++++++++ .gitignore | 1 + Makefile | 6 +++--- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 04f1969..7104669 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -56,3 +56,34 @@ jobs: - name: Run tests run: | make test + + - name: Show localstack logs + if: always() + run: | + make logs + cat logs.txt + + - name: Send a Slack notification + if: failure() || github.event_name != 'pull_request' + uses: ravsamhq/notify-slack-action@v2 + with: + status: ${{ job.status }} + token: ${{ secrets.GITHUB_TOKEN }} + notification_title: "{workflow} has {status_message}" + message_format: "{emoji} *{workflow}* {status_message} in <{repo_url}|{repo}>" + footer: "Linked Repo <{repo_url}|{repo}> | <{run_url}|View Workflow run>" + notify_when: "failure" + env: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} + + - name: Generate a Diagnostic Report + if: failure() + run: | + curl -s localhost:4566/_localstack/diagnose | gzip -cf > diagnose.json.gz + + - name: Upload the Diagnostic Report + if: failure() + uses: actions/upload-artifact@v4 + with: + name: diagnose.json.gz + path: ./diagnose.json.gz diff --git a/.gitignore b/.gitignore index e7d301a..33bbd29 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ volume/ cdk.out dms_sample __pycache__/ +cdk.local.out/ diff --git a/Makefile b/Makefile index 75eb173..23af1a0 100644 --- a/Makefile +++ b/Makefile @@ -57,7 +57,7 @@ deploy-aws: ## Deploy the stack on AWS $(VENV_RUN); $(CLOUD_ENV) cdk bootstrap $(VENV_RUN); $(CLOUD_ENV) cdk deploy --require-approval never -destroy: ## Destroy the stack on LocalStack +stop: ## Stop LocalStack docker-compose down destroy-aws: venv ## Destroy the stack on AWS @@ -70,6 +70,6 @@ test-aws: ## Test the application on AWS $(VENV_RUN); $(CLOUD_ENV) python run.py logs: ## Show logs from LocalStack - @docker logs -f localstack_main > logs.txt + @docker logs localstack-main > logs.txt -.PHONY: usage install start deploy test logs destroy deploy-aws test-aws destroy-aws +.PHONY: usage install start deploy test logs stop deploy-aws test-aws destroy-aws From 3f45ee4e3b50529da42140fd2d06159715ea2b50 Mon Sep 17 00:00:00 2001 From: Harsh Mishra Date: Mon, 5 May 2025 13:18:14 +0530 Subject: [PATCH 4/8] add proper tests using pytest --- Makefile | 7 +- requirements.txt | 3 +- tests/test_infra.py | 694 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 701 insertions(+), 3 deletions(-) create mode 100644 tests/test_infra.py diff --git a/Makefile b/Makefile index 23af1a0..74a1233 100644 --- a/Makefile +++ b/Makefile @@ -63,12 +63,15 @@ stop: ## Stop LocalStack destroy-aws: venv ## Destroy the stack on AWS $(VENV_RUN); $(CLOUD_ENV) cdk destroy --require-approval never -test: ## Test the application on LocalStack +run: ## Run the application on LocalStack $(VENV_RUN); $(LOCAL_ENV) python run.py -test-aws: ## Test the application on AWS +run-aws: ## Run the application on AWS $(VENV_RUN); $(CLOUD_ENV) python run.py +test: ## Test the application on LocalStack + $(VENV_RUN); $(LOCAL_ENV) pytest + logs: ## Show logs from LocalStack @docker logs localstack-main > logs.txt diff --git a/requirements.txt b/requirements.txt index 743055d..1bb1c11 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,5 @@ aws-cdk-lib==2.138.0 boto3==1.34.96 constructs>=10.0.0,<11.0.0 cryptography==42.0.5 -pymysql==1.1.0 \ No newline at end of file +pymysql==1.1.0 +pytest diff --git a/tests/test_infra.py b/tests/test_infra.py new file mode 100644 index 0000000..2d55d34 --- /dev/null +++ b/tests/test_infra.py @@ -0,0 +1,694 @@ +import json +import os +import time +from pprint import pprint +from time import sleep +from typing import Callable, TypedDict, TypeVar + +import pymysql.cursors +import pytest +from boto3 import client + +STACK_NAME = os.getenv("STACK_NAME", "") +ENDPOINT_URL = os.getenv("ENDPOINT_URL") + +cfn = client("cloudformation", endpoint_url=ENDPOINT_URL) +dms = client("dms", endpoint_url=ENDPOINT_URL) +kinesis = client("kinesis", endpoint_url=ENDPOINT_URL) +secretsmanager = client("secretsmanager", endpoint_url=ENDPOINT_URL) + +retries = 100 if not ENDPOINT_URL else 10 +retry_sleep = 5 if not ENDPOINT_URL else 1 + +# SQL Queries from query.py +SQL_CREATE_ACCOUNTS_TABLE = """CREATE TABLE accounts ( + id INT AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL, + age TINYINT UNSIGNED, + birth_date DATE, + account_balance DECIMAL(10, 2), + is_active BOOLEAN, + signup_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + last_login DATETIME, + bio TEXT, + profile_picture BLOB, + favorite_color ENUM('red', 'green', 'blue'), + height FLOAT, + weight DOUBLE + );""" + +SQL_INSERT_ACCOUNTS_SAMPLE_DATA = """INSERT INTO accounts +(name, age, birth_date, account_balance, is_active, signup_time, last_login, bio, profile_picture, favorite_color, height, weight) +VALUES +('Alice', 30, '1991-05-21', 1500.00, TRUE, '2021-01-08 09:00:00', '2021-03-10 08:00:00', 'Bio of Alice', NULL, 'red', 1.70, 60.5);""" + +SQL_CREATE_AUTHORS_TABLE = """CREATE TABLE authors ( + author_id INT AUTO_INCREMENT PRIMARY KEY, + first_name VARCHAR(100) NOT NULL, + last_name VARCHAR(100) NOT NULL, + date_of_birth DATE, + nationality VARCHAR(50), + biography TEXT, + email VARCHAR(255), + phone_number VARCHAR(20), + active BOOLEAN DEFAULT TRUE, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +);""" + +SQL_INSERT_AUTHORS_SAMPLE_DATA = """INSERT INTO authors (first_name, last_name, date_of_birth, nationality, biography, email, phone_number) +VALUES +('John', 'Doe', '1980-01-01', 'American', 'Biography of John Doe.', 'john.doe@example.com', '123-456-7890');""" + +SQL_CREATE_NOVELS_TABLE = """CREATE TABLE novels ( + novel_id INT AUTO_INCREMENT PRIMARY KEY, + title VARCHAR(255) NOT NULL, + author_id INT, + publish_date DATE, + isbn VARCHAR(20), + genre VARCHAR(100), + page_count INT, + publisher VARCHAR(100), + language VARCHAR(50), + available_copies INT, + total_copies INT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (author_id) REFERENCES authors(author_id) +); +""" + +SQL_INSERT_NOVELS_SAMPLE_DATA = """INSERT INTO novels (title, author_id, publish_date, isbn, genre, page_count, publisher, language, available_copies, total_copies) +VALUES +('The Great Adventure', 1, '2020-06-01', '978-3-16-148410-0', 'Adventure', 300, 'Adventure Press', 'English', 10, 20), +('Journey to the Stars', 1, '2021-04-10', '978-0-11-322456-7', 'Science Fiction', 350, 'SciFi Universe', 'English', 12, 25);""" + +ALTER_TABLES = [ + # control: column-type-change -> authors + "ALTER TABLE authors MODIFY COLUMN email VARCHAR(100)", + # control: drop-column -> accounts + "ALTER TABLE accounts DROP COLUMN profile_picture;", + # control: add-column with default value -> novels + "ALTER TABLE novels ADD COLUMN is_stock BOOLEAN DEFAULT TRUE;", +] + +CREATE_TABLES = [ + SQL_CREATE_AUTHORS_TABLE, + SQL_CREATE_ACCOUNTS_TABLE, + SQL_CREATE_NOVELS_TABLE, +] + +DROP_TABLES = [ + "DROP TABLE IF EXISTS novels;", + "DROP TABLE IF EXISTS accounts;", + "DROP TABLE IF EXISTS authors;", +] + +PRESEED_DATA = [ + SQL_INSERT_AUTHORS_SAMPLE_DATA, + SQL_INSERT_ACCOUNTS_SAMPLE_DATA, + SQL_INSERT_NOVELS_SAMPLE_DATA, +] + +class CfnOutput(TypedDict): + cdcTaskSecret: str + cdcTask1: str + cdcTask2: str + + fullTaskSecret: str + fullTask1: str + fullTask2: str + + kinesisStream: str + + +class Credentials(TypedDict): + host: str + port: int + username: str + password: str + dbname: str + + +def get_cfn_output(): + stacks = cfn.describe_stacks()["Stacks"] + stack = None + for s in stacks: + if s["StackName"] == STACK_NAME: + stack = s + break + if not stack: + raise Exception(f"Stack {STACK_NAME} Not found") + + outputs = stack["Outputs"] + cfn_output = CfnOutput() + for output in outputs: + cfn_output[output["OutputKey"]] = output["OutputValue"] + return cfn_output + + +def get_credentials(secret_arn: str) -> Credentials: + secret_value = secretsmanager.get_secret_value(SecretId=secret_arn) + credentials = Credentials(**json.loads(secret_value["SecretString"])) + if credentials["host"] == "mariadb_server": + credentials["host"] = "localhost" + return credentials + + +T = TypeVar("T") + + +def retry( + function: Callable[..., T], retries=retries, sleep=retry_sleep, **kwargs +) -> T: + raise_error = None + retries = int(retries) + for i in range(0, retries + 1): + try: + return function(**kwargs) + except Exception as error: + raise_error = error + time.sleep(sleep) + raise raise_error + + +def run_queries_on_mysql( + credentials: Credentials, + queries: list[str], +): + cursor = None + cnx = None + try: + cnx = pymysql.connect( + user=credentials["username"], + password=credentials["password"], + host=credentials["host"], + database=credentials["dbname"], + cursorclass=pymysql.cursors.DictCursor, + port=int(credentials["port"]), + ) + cursor = cnx.cursor() + for query in queries: + cursor.execute(query) + cnx.commit() + finally: + if cursor: + cursor.close() + if cnx: + cnx.close() + + +def get_query_result( + credentials: Credentials, + query: str, +): + cursor = None + cnx = None + try: + cnx = pymysql.connect( + user=credentials["username"], + password=credentials["password"], + host=credentials["host"], + database=credentials["dbname"], + cursorclass=pymysql.cursors.DictCursor, + port=int(credentials["port"]), + ) + cursor = cnx.cursor() + cursor.execute(query) + return cursor.fetchall() + finally: + if cursor: + cursor.close() + if cnx: + cnx.close() + + +def start_task(task: str): + response = dms.start_replication_task( + ReplicationTaskArn=task, StartReplicationTaskType="start-replication" + ) + status = response["ReplicationTask"].get("Status") + print(f"Replication Task {task} status: {status}") + + +def stop_task(task: str): + response = dms.stop_replication_task(ReplicationTaskArn=task) + status = response["ReplicationTask"].get("Status") + print(f"\n Replication Task {task} status: {status}") + + +def wait_for_task_status(task: str, expected_status: str): + print(f"Waiting for task status {expected_status}") + + def _wait_for_status(): + status = dms.describe_replication_tasks( + Filters=[{"Name": "replication-task-arn", "Values": [task]}], + WithoutSettings=True, + )["ReplicationTasks"][0].get("Status") + print(f"{task=} {status=}") + assert status == expected_status + + retry(_wait_for_status) + + +def get_table_counts(credentials: Credentials) -> dict: + """Get row counts for all tables""" + tables = ["authors", "accounts", "novels"] + counts = {} + for table in tables: + try: + result = get_query_result(credentials, f"SELECT COUNT(*) as count FROM {table}") + counts[table] = result[0]["count"] if result else 0 + except Exception: + counts[table] = 0 + print("\n=== Table Row Counts ===") + pprint(counts) + return counts + + +def get_table_schemas(credentials: Credentials) -> dict: + """Get schema information for all tables""" + tables = ["authors", "accounts", "novels"] + schemas = {} + for table in tables: + try: + result = get_query_result(credentials, f"DESCRIBE {table}") + schemas[table] = result + except Exception: + schemas[table] = None + print("\n=== Table Schemas ===") + pprint(schemas) + return schemas + + +def get_all_table_data(credentials: Credentials) -> dict: + """Get all data from all tables""" + tables = ["authors", "accounts", "novels"] + data = {} + for table in tables: + try: + result = get_query_result(credentials, f"SELECT * FROM {table}") + data[table] = result + except Exception: + data[table] = [] + print("\n=== Table Contents ===") + pprint(data) + return data + + +def execute_full_load(cfn_output: CfnOutput): + credentials = get_credentials(cfn_output["fullTaskSecret"]) + print("\n=== Starting Full Load Test ===") + print(f"Credentials: {credentials}") + + # Full load Flow + threshold_timestamp = int(time.time()) + task_1 = cfn_output["fullTask1"] + task_2 = cfn_output["fullTask2"] + stream = cfn_output["kinesisStream"] + + print(f"\nTask ARNs:") + print(f"Task 1: {task_1}") + print(f"Task 2: {task_2}") + print(f"Kinesis Stream: {stream}") + + print("*" * 12) + print("STARTING FULL LOAD FLOW") + print("*" * 12) + print(f"db endpoint: {credentials['host']}:{credentials['port']}\n") + + print("\n=== Initial State ===") + initial_counts = get_table_counts(credentials) + initial_schemas = get_table_schemas(credentials) + + print("\tCleaning tables") + run_queries_on_mysql(credentials, DROP_TABLES) + print("\tCreating tables") + run_queries_on_mysql(credentials, CREATE_TABLES) + print("\tInserting data") + run_queries_on_mysql(credentials, PRESEED_DATA) + + print("\n=== After Data Load ===") + post_load_counts = get_table_counts(credentials) + post_load_schemas = get_table_schemas(credentials) + post_load_data = get_all_table_data(credentials) + + print("\n****Full Task 1****\n") + print("\n\tStarting Full load task 1 a%") + start_task(task_1) + wait_for_task_status(task_1, "stopped") + + print("\n=== Task 1 Statistics ===") + task1_stats = describe_table_statistics(task_1) + pprint(task1_stats) + + # 2 drops, 2 create, 1 authors, 1 accounts = 6 + kinesis_records = wait_for_kinesis(stream, 6, threshold_timestamp) + print("\n=== Task 1 Kinesis Records ===") + pprint(kinesis_records) + print("\n****End of Full Task 1****\n") + + sleep(1) + print("\n****Full Task 2****\n") + threshold_timestamp = int(time.time()) + print("\tStarting Full load task 2 novels") + start_task(task_2) + wait_for_task_status(task_2, "stopped") + + print("\n=== Task 2 Statistics ===") + task2_stats = describe_table_statistics(task_2) + pprint(task2_stats) + + # 1 drop, 1 create, 2 novels = 4 + kinesis_records = wait_for_kinesis(stream, 4, threshold_timestamp) + print("\n=== Task 2 Kinesis Records ===") + pprint(kinesis_records) + print("\n****End of Full Task 2****\n") + + print("\n=== Final State ===") + final_counts = get_table_counts(credentials) + final_schemas = get_table_schemas(credentials) + final_data = get_all_table_data(credentials) + + print("\tCleaning tables") + run_queries_on_mysql(credentials, DROP_TABLES) + + return { + "initial_state": { + "counts": initial_counts, + "schemas": initial_schemas + }, + "post_load_state": { + "counts": post_load_counts, + "schemas": post_load_schemas, + "data": post_load_data + }, + "final_state": { + "counts": final_counts, + "schemas": final_schemas, + "data": final_data + }, + "task1_stats": task1_stats, + "task2_stats": task2_stats + } + + +def execute_cdc(cfn_output: CfnOutput): + credentials = get_credentials(cfn_output["cdcTaskSecret"]) + print("\n=== Starting CDC Test ===") + print(f"Credentials: {credentials}") + + task_1 = cfn_output["cdcTask1"] + task_2 = cfn_output["cdcTask2"] + stream = cfn_output["kinesisStream"] + + print(f"\nTask ARNs:") + print(f"Task 1: {task_1}") + print(f"Task 2: {task_2}") + print(f"Kinesis Stream: {stream}") + + print("*" * 12) + print("STARTING CDC FLOW") + print("*" * 12) + print(f"db endpoint: {credentials['host']}:{credentials['port']}\n") + + print("\n=== Initial State ===") + initial_counts = get_table_counts(credentials) + initial_schemas = get_table_schemas(credentials) + + run_queries_on_mysql(credentials, DROP_TABLES) + print("\tCreating tables") + run_queries_on_mysql(credentials, CREATE_TABLES) + + print("\n=== After Table Creation ===") + post_create_counts = get_table_counts(credentials) + post_create_schemas = get_table_schemas(credentials) + + threshold_timestamp = int(time.time()) + print("Starting cdc tasks 1 table a%") + start_task(task_1) + print("Starting cdc tasks 2 table novels") + start_task(task_2) + wait_for_task_status(task_1, "running") + wait_for_task_status(task_2, "running") + + print("\n****Create table events****\n") + # 2 create apply_dms_exception, 3 create + kinesis_records = wait_for_kinesis(stream, 5, threshold_timestamp) + print("\n=== Create Table Events ===") + pprint(kinesis_records) + print("\n****End create table events****\n") + + print("\n****INSERT events****\n") + sleep(1) + threshold_timestamp = int(time.time()) + sleep(1) + run_queries_on_mysql(credentials, PRESEED_DATA) + + print("\n=== After Data Insert ===") + post_insert_counts = get_table_counts(credentials) + post_insert_data = get_all_table_data(credentials) + + # 1 authors, 1 accounts, 2 novels + kinesis_records = wait_for_kinesis(stream, 4, threshold_timestamp) + print("\n=== Insert Events ===") + pprint(kinesis_records) + print("\n****End of INSERT events****\n") + + print("\n****ALTER tables events****\n") + sleep(1) + threshold_timestamp = int(time.time()) + sleep(1) + run_queries_on_mysql(credentials, ALTER_TABLES) + + print("\n=== After Schema Changes ===") + post_alter_schemas = get_table_schemas(credentials) + + kinesis_records = wait_for_kinesis(stream, 3, threshold_timestamp) + print("\n=== Alter Table Events ===") + pprint(kinesis_records) + print("\n****End of ALTER tables events****\n") + + print("\n=== Task Statistics ===") + print("\tTable Statistics tasks 1") + task1_stats = describe_table_statistics(task_1) + pprint(task1_stats) + print("\n\tTable Statistics tasks 2") + task2_stats = describe_table_statistics(task_2) + pprint(task2_stats) + + stop_task(task_1) + stop_task(task_2) + wait_for_task_status(task_1, "stopped") + wait_for_task_status(task_2, "stopped") + + print("\n=== Final State ===") + final_counts = get_table_counts(credentials) + final_schemas = get_table_schemas(credentials) + final_data = get_all_table_data(credentials) + + print("\n\tDrop tables") + run_queries_on_mysql(credentials, DROP_TABLES) + + return { + "initial_state": { + "counts": initial_counts, + "schemas": initial_schemas + }, + "post_create_state": { + "counts": post_create_counts, + "schemas": post_create_schemas + }, + "post_insert_state": { + "counts": post_insert_counts, + "data": post_insert_data + }, + "post_alter_state": { + "schemas": post_alter_schemas + }, + "final_state": { + "counts": final_counts, + "schemas": final_schemas, + "data": final_data + }, + "task1_stats": task1_stats, + "task2_stats": task2_stats + } + + +def wait_for_kinesis(stream: str, expected_count: int, threshold_timestamp: int): + print("\n\tKinesis events\n") + print("fetching Kinesis event") + + shard_id = kinesis.describe_stream(StreamARN=stream)["StreamDescription"]["Shards"][ + 0 + ]["ShardId"] + shard_iterator = kinesis.get_shard_iterator( + StreamARN=stream, + ShardId=shard_id, + ShardIteratorType="TRIM_HORIZON", + ) + shard_iter = shard_iterator["ShardIterator"] + all_records = [] + while shard_iter is not None: + res = kinesis.get_records(ShardIterator=shard_iter, Limit=50) + shard_iter = res["NextShardIterator"] + records = res["Records"] + for r in records: + if r["ApproximateArrivalTimestamp"].timestamp() > threshold_timestamp: + all_records.append(r) + if len(all_records) >= expected_count: + break + print(f"found {len(all_records)}, {expected_count=}") + sleep(retry_sleep) + print(f"Received: {len(all_records)} events") + records_data = [ + {**json.loads(record["Data"]), "partition_key": record["PartitionKey"]} + for record in all_records + ] + pprint(records_data) + return records_data + + +def describe_table_statistics(task_arn: str): + res = dms.describe_table_statistics( + ReplicationTaskArn=task_arn, + ) + res["TableStatistics"] = sorted( + res["TableStatistics"], key=lambda x: (x["SchemaName"], x["TableName"]) + ) + return res + + +@pytest.fixture(scope="module") +def cfn_output(): + return get_cfn_output() + + +def test_full_load(cfn_output): + credentials = get_credentials(cfn_output["fullTaskSecret"]) + threshold_timestamp = int(time.time()) + task_1 = cfn_output["fullTask1"] + task_2 = cfn_output["fullTask2"] + stream = cfn_output["kinesisStream"] + + # Clean and setup tables + run_queries_on_mysql(credentials, DROP_TABLES) + run_queries_on_mysql(credentials, CREATE_TABLES) + run_queries_on_mysql(credentials, PRESEED_DATA) + + # Verify initial data load + table_counts = get_table_counts(credentials) + assert table_counts["authors"] == 1, "Expected 1 author record" + assert table_counts["accounts"] == 1, "Expected 1 account record" + assert table_counts["novels"] == 2, "Expected 2 novel records" + + # Execute and verify Task 1 + start_task(task_1) + wait_for_task_status(task_1, "stopped") + task1_records = wait_for_kinesis(stream, 6, threshold_timestamp) + assert len(task1_records) == 6, "Expected 6 Kinesis records for Task 1" + sleep(5) + + # Verify Task 1 statistics + task1_stats = describe_table_statistics(task_1) + authors_stats = next(stat for stat in task1_stats["TableStatistics"] if stat["TableName"] == "authors") + accounts_stats = next(stat for stat in task1_stats["TableStatistics"] if stat["TableName"] == "accounts") + + # Check full load rows + assert authors_stats["FullLoadRows"] == 1, "Expected 1 full load row in authors table" + assert accounts_stats["FullLoadRows"] == 1, "Expected 1 full load row in accounts table" + + # Check table state + assert authors_stats["TableState"] == "Table completed", "Authors table should be completed" + assert accounts_stats["TableState"] == "Table completed", "Accounts table should be completed" + + # Check error counts + assert authors_stats["FullLoadErrorRows"] == 0, "Should have no errors in authors table load" + assert accounts_stats["FullLoadErrorRows"] == 0, "Should have no errors in accounts table load" + + # Execute and verify Task 2 + sleep(5) + threshold_timestamp = int(time.time()) + start_task(task_2) + wait_for_task_status(task_2, "stopped") + task2_records = wait_for_kinesis(stream, 4, threshold_timestamp) + assert len(task2_records) == 4, "Expected 4 Kinesis records for Task 2" + + # Verify Task 2 statistics + task2_stats = describe_table_statistics(task_2) + novels_stats = next(stat for stat in task2_stats["TableStatistics"] if stat["TableName"] == "novels") + + # Check full load rows and state for novels + assert novels_stats["FullLoadRows"] == 2, "Expected 2 full load rows in novels table" + assert novels_stats["TableState"] == "Table completed", "Novels table should be completed" + assert novels_stats["FullLoadErrorRows"] == 0, "Should have no errors in novels table load" + + # Cleanup + run_queries_on_mysql(credentials, DROP_TABLES) + + +def test_cdc(cfn_output): + credentials = get_credentials(cfn_output["cdcTaskSecret"]) + task_1 = cfn_output["cdcTask1"] + task_2 = cfn_output["cdcTask2"] + stream = cfn_output["kinesisStream"] + + # Setup tables + run_queries_on_mysql(credentials, DROP_TABLES) + run_queries_on_mysql(credentials, CREATE_TABLES) + + # Start CDC tasks + threshold_timestamp = int(time.time()) + start_task(task_1) + start_task(task_2) + wait_for_task_status(task_1, "running") + wait_for_task_status(task_2, "running") + + # Verify table creation events + create_events = wait_for_kinesis(stream, 5, threshold_timestamp) + assert len(create_events) == 5, "Expected 5 table creation events" + + # Test INSERT operations + sleep(1) + threshold_timestamp = int(time.time()) + sleep(1) + run_queries_on_mysql(credentials, PRESEED_DATA) + insert_events = wait_for_kinesis(stream, 4, threshold_timestamp) + assert len(insert_events) == 4, "Expected 4 insert events" + + # Verify data after inserts + table_counts = get_table_counts(credentials) + assert table_counts["authors"] == 1, "Expected 1 author after CDC inserts" + assert table_counts["accounts"] == 1, "Expected 1 account after CDC inserts" + assert table_counts["novels"] == 2, "Expected 2 novels after CDC inserts" + + # Test ALTER operations + sleep(1) + threshold_timestamp = int(time.time()) + sleep(1) + run_queries_on_mysql(credentials, ALTER_TABLES) + alter_events = wait_for_kinesis(stream, 3, threshold_timestamp) + assert len(alter_events) == 3, "Expected 3 alter events" + + # Verify schema changes + schemas = get_table_schemas(credentials) + authors_email_field = next(field for field in schemas["authors"] if field["Field"] == "email") + assert authors_email_field["Type"] == "varchar(100)", "Expected email field type to be varchar(100)" + + # Verify accounts table modification + accounts_fields = [field["Field"] for field in schemas["accounts"]] + assert "profile_picture" not in accounts_fields, "profile_picture should be dropped" + + # Verify novels table modification + novels_fields = [field["Field"] for field in schemas["novels"]] + assert "is_stock" in novels_fields, "is_stock field should be added" + + # Stop tasks and cleanup + stop_task(task_1) + stop_task(task_2) + wait_for_task_status(task_1, "stopped") + wait_for_task_status(task_2, "stopped") + run_queries_on_mysql(credentials, DROP_TABLES) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) From 95acf0dc3dc71daa0132199ed25564d2429c26d0 Mon Sep 17 00:00:00 2001 From: Harsh Mishra Date: Mon, 5 May 2025 13:19:31 +0530 Subject: [PATCH 5/8] add a keepalive workflow --- .github/workflows/keepalive.yml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 .github/workflows/keepalive.yml diff --git a/.github/workflows/keepalive.yml b/.github/workflows/keepalive.yml new file mode 100644 index 0000000..76b4b40 --- /dev/null +++ b/.github/workflows/keepalive.yml @@ -0,0 +1,17 @@ +name: Keep Alive +on: + schedule: + - cron: "0 0 * * *" +jobs: + main-job: + name: Main Job + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + workflow-keepalive: + if: github.event_name == 'schedule' + runs-on: ubuntu-latest + permissions: + actions: write + steps: + - uses: liskin/gh-workflow-keepalive@v1 From 32a577d47c96d22b1d2c78c2a405cd72c79fdf62 Mon Sep 17 00:00:00 2001 From: Harsh Mishra Date: Mon, 5 May 2025 13:22:23 +0530 Subject: [PATCH 6/8] stupid fix --- .github/workflows/ci.yml | 2 +- Makefile | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7104669..a10dfde 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,7 +57,7 @@ jobs: run: | make test - - name: Show localstack logs + - name: Show LocalStack logs if: always() run: | make logs diff --git a/Makefile b/Makefile index 74a1233..4959cea 100644 --- a/Makefile +++ b/Makefile @@ -70,7 +70,7 @@ run-aws: ## Run the application on AWS $(VENV_RUN); $(CLOUD_ENV) python run.py test: ## Test the application on LocalStack - $(VENV_RUN); $(LOCAL_ENV) pytest + $(VENV_RUN); $(LOCAL_ENV) pytest tests/test_infra.py logs: ## Show logs from LocalStack @docker logs localstack-main > logs.txt From 07d61649b13d40bc1694c4638463d61e036e449a Mon Sep 17 00:00:00 2001 From: Harsh Mishra Date: Mon, 5 May 2025 13:28:59 +0530 Subject: [PATCH 7/8] reformat --- tests/test_infra.py | 142 ++++++++++++++++++++++++++------------------ 1 file changed, 83 insertions(+), 59 deletions(-) diff --git a/tests/test_infra.py b/tests/test_infra.py index 2d55d34..c837c74 100644 --- a/tests/test_infra.py +++ b/tests/test_infra.py @@ -108,6 +108,7 @@ SQL_INSERT_NOVELS_SAMPLE_DATA, ] + class CfnOutput(TypedDict): cdcTaskSecret: str cdcTask1: str @@ -255,7 +256,9 @@ def get_table_counts(credentials: Credentials) -> dict: counts = {} for table in tables: try: - result = get_query_result(credentials, f"SELECT COUNT(*) as count FROM {table}") + result = get_query_result( + credentials, f"SELECT COUNT(*) as count FROM {table}" + ) counts[table] = result[0]["count"] if result else 0 except Exception: counts[table] = 0 @@ -298,13 +301,13 @@ def execute_full_load(cfn_output: CfnOutput): credentials = get_credentials(cfn_output["fullTaskSecret"]) print("\n=== Starting Full Load Test ===") print(f"Credentials: {credentials}") - + # Full load Flow threshold_timestamp = int(time.time()) task_1 = cfn_output["fullTask1"] task_2 = cfn_output["fullTask2"] stream = cfn_output["kinesisStream"] - + print(f"\nTask ARNs:") print(f"Task 1: {task_1}") print(f"Task 2: {task_2}") @@ -314,11 +317,11 @@ def execute_full_load(cfn_output: CfnOutput): print("STARTING FULL LOAD FLOW") print("*" * 12) print(f"db endpoint: {credentials['host']}:{credentials['port']}\n") - + print("\n=== Initial State ===") initial_counts = get_table_counts(credentials) initial_schemas = get_table_schemas(credentials) - + print("\tCleaning tables") run_queries_on_mysql(credentials, DROP_TABLES) print("\tCreating tables") @@ -335,11 +338,11 @@ def execute_full_load(cfn_output: CfnOutput): print("\n\tStarting Full load task 1 a%") start_task(task_1) wait_for_task_status(task_1, "stopped") - + print("\n=== Task 1 Statistics ===") task1_stats = describe_table_statistics(task_1) pprint(task1_stats) - + # 2 drops, 2 create, 1 authors, 1 accounts = 6 kinesis_records = wait_for_kinesis(stream, 6, threshold_timestamp) print("\n=== Task 1 Kinesis Records ===") @@ -352,11 +355,11 @@ def execute_full_load(cfn_output: CfnOutput): print("\tStarting Full load task 2 novels") start_task(task_2) wait_for_task_status(task_2, "stopped") - + print("\n=== Task 2 Statistics ===") task2_stats = describe_table_statistics(task_2) pprint(task2_stats) - + # 1 drop, 1 create, 2 novels = 4 kinesis_records = wait_for_kinesis(stream, 4, threshold_timestamp) print("\n=== Task 2 Kinesis Records ===") @@ -370,24 +373,21 @@ def execute_full_load(cfn_output: CfnOutput): print("\tCleaning tables") run_queries_on_mysql(credentials, DROP_TABLES) - + return { - "initial_state": { - "counts": initial_counts, - "schemas": initial_schemas - }, + "initial_state": {"counts": initial_counts, "schemas": initial_schemas}, "post_load_state": { "counts": post_load_counts, "schemas": post_load_schemas, - "data": post_load_data + "data": post_load_data, }, "final_state": { "counts": final_counts, "schemas": final_schemas, - "data": final_data + "data": final_data, }, "task1_stats": task1_stats, - "task2_stats": task2_stats + "task2_stats": task2_stats, } @@ -395,16 +395,16 @@ def execute_cdc(cfn_output: CfnOutput): credentials = get_credentials(cfn_output["cdcTaskSecret"]) print("\n=== Starting CDC Test ===") print(f"Credentials: {credentials}") - + task_1 = cfn_output["cdcTask1"] task_2 = cfn_output["cdcTask2"] stream = cfn_output["kinesisStream"] - + print(f"\nTask ARNs:") print(f"Task 1: {task_1}") print(f"Task 2: {task_2}") print(f"Kinesis Stream: {stream}") - + print("*" * 12) print("STARTING CDC FLOW") print("*" * 12) @@ -442,11 +442,11 @@ def execute_cdc(cfn_output: CfnOutput): threshold_timestamp = int(time.time()) sleep(1) run_queries_on_mysql(credentials, PRESEED_DATA) - + print("\n=== After Data Insert ===") post_insert_counts = get_table_counts(credentials) post_insert_data = get_all_table_data(credentials) - + # 1 authors, 1 accounts, 2 novels kinesis_records = wait_for_kinesis(stream, 4, threshold_timestamp) print("\n=== Insert Events ===") @@ -458,10 +458,10 @@ def execute_cdc(cfn_output: CfnOutput): threshold_timestamp = int(time.time()) sleep(1) run_queries_on_mysql(credentials, ALTER_TABLES) - + print("\n=== After Schema Changes ===") post_alter_schemas = get_table_schemas(credentials) - + kinesis_records = wait_for_kinesis(stream, 3, threshold_timestamp) print("\n=== Alter Table Events ===") pprint(kinesis_records) @@ -487,30 +487,22 @@ def execute_cdc(cfn_output: CfnOutput): print("\n\tDrop tables") run_queries_on_mysql(credentials, DROP_TABLES) - + return { - "initial_state": { - "counts": initial_counts, - "schemas": initial_schemas - }, + "initial_state": {"counts": initial_counts, "schemas": initial_schemas}, "post_create_state": { "counts": post_create_counts, - "schemas": post_create_schemas - }, - "post_insert_state": { - "counts": post_insert_counts, - "data": post_insert_data - }, - "post_alter_state": { - "schemas": post_alter_schemas + "schemas": post_create_schemas, }, + "post_insert_state": {"counts": post_insert_counts, "data": post_insert_data}, + "post_alter_state": {"schemas": post_alter_schemas}, "final_state": { "counts": final_counts, "schemas": final_schemas, - "data": final_data + "data": final_data, }, "task1_stats": task1_stats, - "task2_stats": task2_stats + "task2_stats": task2_stats, } @@ -587,23 +579,43 @@ def test_full_load(cfn_output): task1_records = wait_for_kinesis(stream, 6, threshold_timestamp) assert len(task1_records) == 6, "Expected 6 Kinesis records for Task 1" sleep(5) - + # Verify Task 1 statistics task1_stats = describe_table_statistics(task_1) - authors_stats = next(stat for stat in task1_stats["TableStatistics"] if stat["TableName"] == "authors") - accounts_stats = next(stat for stat in task1_stats["TableStatistics"] if stat["TableName"] == "accounts") - + authors_stats = next( + stat + for stat in task1_stats["TableStatistics"] + if stat["TableName"] == "authors" + ) + accounts_stats = next( + stat + for stat in task1_stats["TableStatistics"] + if stat["TableName"] == "accounts" + ) + # Check full load rows - assert authors_stats["FullLoadRows"] == 1, "Expected 1 full load row in authors table" - assert accounts_stats["FullLoadRows"] == 1, "Expected 1 full load row in accounts table" - + assert ( + authors_stats["FullLoadRows"] == 1 + ), "Expected 1 full load row in authors table" + assert ( + accounts_stats["FullLoadRows"] == 1 + ), "Expected 1 full load row in accounts table" + # Check table state - assert authors_stats["TableState"] == "Table completed", "Authors table should be completed" - assert accounts_stats["TableState"] == "Table completed", "Accounts table should be completed" - + assert ( + authors_stats["TableState"] == "Table completed" + ), "Authors table should be completed" + assert ( + accounts_stats["TableState"] == "Table completed" + ), "Accounts table should be completed" + # Check error counts - assert authors_stats["FullLoadErrorRows"] == 0, "Should have no errors in authors table load" - assert accounts_stats["FullLoadErrorRows"] == 0, "Should have no errors in accounts table load" + assert ( + authors_stats["FullLoadErrorRows"] == 0 + ), "Should have no errors in authors table load" + assert ( + accounts_stats["FullLoadErrorRows"] == 0 + ), "Should have no errors in accounts table load" # Execute and verify Task 2 sleep(5) @@ -615,12 +627,20 @@ def test_full_load(cfn_output): # Verify Task 2 statistics task2_stats = describe_table_statistics(task_2) - novels_stats = next(stat for stat in task2_stats["TableStatistics"] if stat["TableName"] == "novels") - + novels_stats = next( + stat for stat in task2_stats["TableStatistics"] if stat["TableName"] == "novels" + ) + # Check full load rows and state for novels - assert novels_stats["FullLoadRows"] == 2, "Expected 2 full load rows in novels table" - assert novels_stats["TableState"] == "Table completed", "Novels table should be completed" - assert novels_stats["FullLoadErrorRows"] == 0, "Should have no errors in novels table load" + assert ( + novels_stats["FullLoadRows"] == 2 + ), "Expected 2 full load rows in novels table" + assert ( + novels_stats["TableState"] == "Table completed" + ), "Novels table should be completed" + assert ( + novels_stats["FullLoadErrorRows"] == 0 + ), "Should have no errors in novels table load" # Cleanup run_queries_on_mysql(credentials, DROP_TABLES) @@ -671,9 +691,13 @@ def test_cdc(cfn_output): # Verify schema changes schemas = get_table_schemas(credentials) - authors_email_field = next(field for field in schemas["authors"] if field["Field"] == "email") - assert authors_email_field["Type"] == "varchar(100)", "Expected email field type to be varchar(100)" - + authors_email_field = next( + field for field in schemas["authors"] if field["Field"] == "email" + ) + assert ( + authors_email_field["Type"] == "varchar(100)" + ), "Expected email field type to be varchar(100)" + # Verify accounts table modification accounts_fields = [field["Field"] for field in schemas["accounts"]] assert "profile_picture" not in accounts_fields, "profile_picture should be dropped" From c8c1e539265655eac6e2950a56f242406ccfda64 Mon Sep 17 00:00:00 2001 From: Harsh Mishra Date: Mon, 5 May 2025 13:30:41 +0530 Subject: [PATCH 8/8] rename the workflow --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a10dfde..a9c8ef3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,4 +1,4 @@ -name: Deploy on LocalStack +name: Setup infrastructure using CDK on: push: @@ -16,7 +16,7 @@ on: jobs: cdk: - name: Setup infrastructure using CDK + name: Run Integration Tests runs-on: ubuntu-latest steps: - name: Checkout