22# End-to-end test: Service (config resolution) → Engine (sync execution)
33#
44# Tests the same flow that Temporal activities execute:
5- # 1. Create sync via service API
5+ # 1. Create syncs via service API (Postgres + optionally Google Sheets)
66# 2. GET /syncs/{id}?include_credentials=true → resolved config
77# 3. POST /setup, /sync, /teardown on engine with X-Sync-Params
8- # 4. Verify data in Postgres , verify teardown
8+ # 4. Verify data landed , verify teardown
99#
10- # Requires:
11- # - STRIPE_API_KEY (or .env)
12- # - Postgres on localhost:5432 (or POSTGRES_URL)
13- # - pnpm build must have been run
10+ # Env vars:
11+ # STRIPE_API_KEY (required)
12+ # POSTGRES_URL (default: postgresql://postgres:postgres@localhost:5432/postgres)
13+ # GOOGLE_CLIENT_ID (optional — enables Sheets sync)
14+ # GOOGLE_CLIENT_SECRET (optional — enables Sheets sync)
15+ # GOOGLE_REFRESH_TOKEN (optional — enables Sheets sync)
16+ # GOOGLE_SPREADSHEET_ID (optional — enables Sheets sync)
17+ # SKIP_DELETE=1 skip teardown + cleanup (leave data for inspection)
1418set -euo pipefail
1519
1620ROOT=" $( cd " $( dirname " $0 " ) /.." && pwd) "
@@ -22,6 +26,7 @@ cd "$ROOT"
2226: " ${STRIPE_API_KEY:? Set STRIPE_API_KEY} "
2327POSTGRES_URL=" ${POSTGRES_URL:- postgresql:// postgres: postgres@ localhost: 5432/ postgres} "
2428SCHEMA=" temporal_sh_$( date +%Y%m%d%H%M%S) _$$ "
29+ SKIP_DELETE=" ${SKIP_DELETE:- } "
2530
2631SERVICE_PORT=0
2732ENGINE_PORT=0
@@ -33,8 +38,10 @@ cleanup() {
3338 echo " --- Cleanup ---"
3439 [ -n " $SERVICE_PID " ] && kill " $SERVICE_PID " 2> /dev/null && echo " Stopped service ($SERVICE_PID )"
3540 [ -n " $ENGINE_PID " ] && kill " $ENGINE_PID " 2> /dev/null && echo " Stopped engine ($ENGINE_PID )"
36- if [ " ${KEEP_TEST_DATA :- } " != " 1 " ]; then
41+ if [ -z " $SKIP_DELETE " ]; then
3742 psql " $POSTGRES_URL " -c " DROP SCHEMA IF EXISTS \" $SCHEMA \" CASCADE" 2> /dev/null && echo " Dropped schema $SCHEMA "
43+ else
44+ echo " SKIP_DELETE: keeping schema $SCHEMA "
3845 fi
3946 [ -n " ${DATA_DIR:- } " ] && rm -rf " $DATA_DIR " && echo " Removed $DATA_DIR "
4047}
@@ -57,17 +64,90 @@ wait_for_port() {
5764 exit 1
5865}
5966
60- # --- Start engine (from its package dir so connector bins are findable) ---
67+ # Resolve a sync's config and build X-Sync-Params header value
68+ resolve_params () {
69+ local sync_id=$1
70+ curl -sf " $SERVICE_URL /syncs/$sync_id ?include_credentials=true" | python3 -c "
71+ import sys, json
72+ c = json.load(sys.stdin)
73+ src = {k:v for k,v in c['source'].items() if k != 'type'}
74+ dst = {k:v for k,v in c['destination'].items() if k != 'type'}
75+ print(json.dumps({
76+ 'source_name': c['source']['type'],
77+ 'source_config': src,
78+ 'destination_name': c['destination']['type'],
79+ 'destination_config': dst,
80+ 'streams': c.get('streams', [])
81+ }))
82+ "
83+ }
84+
85+ # Run the full setup → sync → verify → teardown cycle for a sync
86+ run_sync_cycle () {
87+ local label=$1 sync_id=$2 verify_fn=$3
88+
89+ echo " "
90+ echo " === $label ==="
91+
92+ # Resolve
93+ local params
94+ params=$( resolve_params " $sync_id " )
95+ echo " Resolved config ($( echo " $params " | wc -c | tr -d ' ' ) bytes)"
96+
97+ # Setup
98+ local status
99+ status=$( curl -sf -o /dev/null -w " %{http_code}" -X POST " $ENGINE_URL /setup" \
100+ -H " X-Sync-Params: $params " )
101+ echo " Setup: HTTP $status "
102+ [ " $status " = " 204" ] || { echo " FAIL: expected 204, got $status " ; exit 1; }
103+
104+ # Sync
105+ local output
106+ output=$( curl -sf -X POST " $ENGINE_URL /sync" -H " X-Sync-Params: $params " )
107+ local lines
108+ lines=$( echo " $output " | wc -l | tr -d ' ' )
109+ echo " Sync: $lines NDJSON lines"
110+
111+ local errors
112+ errors=$( echo " $output " | python3 -c "
113+ import sys, json
114+ n = 0
115+ for line in sys.stdin:
116+ line = line.strip()
117+ if not line: continue
118+ msg = json.loads(line)
119+ if msg.get('type') == 'error':
120+ n += 1
121+ print(f' ERROR: {msg.get(\" message\" , \" unknown\" )}', file=sys.stderr)
122+ print(n)
123+ " )
124+ [ " $errors " = " 0" ] || echo " ⚠ $errors error(s)"
125+
126+ # Verify (caller-provided function)
127+ $verify_fn
128+
129+ # Teardown
130+ if [ -z " $SKIP_DELETE " ]; then
131+ status=$( curl -sf -o /dev/null -w " %{http_code}" -X POST " $ENGINE_URL /teardown" \
132+ -H " X-Sync-Params: $params " )
133+ echo " Teardown: HTTP $status "
134+ [ " $status " = " 204" ] || { echo " FAIL: expected 204, got $status " ; exit 1; }
135+ else
136+ echo " Teardown: skipped (SKIP_DELETE)"
137+ fi
138+ }
139+
140+ # ── Start servers ──────────────────────────────────────────────────
141+
61142ENGINE_PORT=$( find_free_port)
62143echo " Starting engine on port $ENGINE_PORT ..."
63144(cd " $ROOT /apps/engine" && PORT=$ENGINE_PORT node dist/api/index.js) & > /dev/null &
64145ENGINE_PID=$!
65146wait_for_port " $ENGINE_PORT " " Engine"
66147
67- # --- Start service (no Temporal — just config CRUD) ---
68148SERVICE_PORT=$( find_free_port)
69149DATA_DIR=$( mktemp -d)
70- echo " Starting service on port $SERVICE_PORT (data: $DATA_DIR ) ..."
150+ echo " Starting service on port $SERVICE_PORT ..."
71151node " $ROOT /apps/service/dist/bin/cli.js" serve \
72152 --port " $SERVICE_PORT " \
73153 --data-dir " $DATA_DIR " & > /dev/null &
@@ -78,103 +158,119 @@ SERVICE_URL="http://localhost:$SERVICE_PORT"
78158ENGINE_URL=" http://localhost:$ENGINE_PORT "
79159
80160echo " "
81- echo " === Service → Engine E2E Test ==="
82161echo " Service: $SERVICE_URL "
83162echo " Engine: $ENGINE_URL "
84163echo " Postgres: $POSTGRES_URL "
85- echo " Schema: $SCHEMA "
86- echo " "
164+ [ -n " $SKIP_DELETE " ] && echo " Mode: SKIP_DELETE (data preserved)"
165+
166+ # ── Sync 1: Stripe → Postgres ─────────────────────────────────────
87167
88- # --- Create sync ---
89- echo " --- 1. Create sync ---"
90- SYNC_RESP =$( curl -sf -X POST " $SERVICE_URL /syncs" \
168+ echo " "
169+ echo " --- Creating Postgres sync ---"
170+ PG_SYNC_RESP =$( curl -sf -X POST " $SERVICE_URL /syncs" \
91171 -H ' Content-Type: application/json' \
92172 -d " {
93173 \" source\" : { \" type\" : \" stripe\" , \" api_key\" : \" $STRIPE_API_KEY \" , \" backfill_limit\" : 5 },
94174 \" destination\" : { \" type\" : \" postgres\" , \" connection_string\" : \" $POSTGRES_URL \" , \" schema\" : \" $SCHEMA \" },
95175 \" streams\" : [{ \" name\" : \" products\" }]
96176 }" )
97- SYNC_ID =$( echo " $SYNC_RESP " | python3 -c " import sys,json; print(json.load(sys.stdin)['id'])" )
98- echo " Sync: $SYNC_ID "
177+ PG_SYNC_ID =$( echo " $PG_SYNC_RESP " | python3 -c " import sys,json; print(json.load(sys.stdin)['id'])" )
178+ echo " Sync: $PG_SYNC_ID (schema: $SCHEMA ) "
99179
100- # --- Resolve config ---
101- echo " "
102- echo " --- 2. Resolve config (include_credentials=true) ---"
103- RESOLVED=$( curl -sf " $SERVICE_URL /syncs/$SYNC_ID ?include_credentials=true" )
104- SRC_TYPE=$( echo " $RESOLVED " | python3 -c " import sys,json; print(json.load(sys.stdin)['source']['type'])" )
105- HAS_KEY=$( echo " $RESOLVED " | python3 -c " import sys,json; print('api_key' in json.load(sys.stdin)['source'])" )
106- echo " source.type: $SRC_TYPE "
107- echo " has api_key: $HAS_KEY "
108- [ " $HAS_KEY " = " True" ] || { echo " FAIL: expected api_key in resolved config" ; exit 1; }
109-
110- # Build X-Sync-Params (same as what activities do)
111- PARAMS=$( echo " $RESOLVED " | python3 -c "
112- import sys, json
113- c = json.load(sys.stdin)
114- src = {k:v for k,v in c['source'].items() if k != 'type'}
115- dst = {k:v for k,v in c['destination'].items() if k != 'type'}
116- print(json.dumps({
117- 'source_name': c['source']['type'],
118- 'source_config': src,
119- 'destination_name': c['destination']['type'],
120- 'destination_config': dst,
121- 'streams': c.get('streams', [])
122- }))
123- " )
124- echo " X-Sync-Params built ($( echo " $PARAMS " | wc -c | tr -d ' ' ) bytes)"
180+ verify_postgres () {
181+ local count
182+ count=$( psql " $POSTGRES_URL " -t -c " SELECT count(*) FROM \" $SCHEMA \" .\" products\" " | tr -d ' ' )
183+ echo " Verify: $count rows in $SCHEMA .products"
184+ [ " $count " -gt 0 ] || { echo " FAIL: expected > 0 rows" ; exit 1; }
125185
126- # --- Setup ---
127- echo " "
128- echo " --- 3. Engine: setup ---"
129- SETUP_STATUS=$( curl -sf -o /dev/null -w " %{http_code}" -X POST " $ENGINE_URL /setup" \
130- -H " X-Sync-Params: $PARAMS " )
131- echo " HTTP $SETUP_STATUS "
132- [ " $SETUP_STATUS " = " 204" ] || { echo " FAIL: expected 204, got $SETUP_STATUS " ; exit 1; }
186+ local sample
187+ sample=$( psql " $POSTGRES_URL " -t -c " SELECT id FROM \" $SCHEMA \" .\" products\" LIMIT 1" | tr -d ' ' )
188+ echo " Sample: $sample "
189+ [[ " $sample " == prod_* ]] || { echo " FAIL: expected prod_ prefix" ; exit 1; }
133190
134- # --- Sync ---
135- echo " "
136- echo " --- 4. Engine: sync ---"
137- SYNC_OUTPUT=$( curl -sf -X POST " $ENGINE_URL /sync" -H " X-Sync-Params: $PARAMS " )
138- LINE_COUNT=$( echo " $SYNC_OUTPUT " | wc -l | tr -d ' ' )
139- echo " NDJSON lines: $LINE_COUNT "
191+ if [ -z " $SKIP_DELETE " ]; then
192+ # Will be verified after teardown
193+ :
194+ else
195+ echo " Data preserved: psql $POSTGRES_URL -c 'SELECT * FROM \" $SCHEMA \" .\" products\" LIMIT 5'"
196+ fi
197+ }
140198
141- ERROR_COUNT=$( echo " $SYNC_OUTPUT " | python3 -c "
142- import sys, json
143- errors = 0
144- for line in sys.stdin:
145- line = line.strip()
146- if not line: continue
147- msg = json.loads(line)
148- if msg.get('type') == 'error':
149- errors += 1
150- print(f' ERROR: {msg.get(\" message\" , \" unknown\" )}', file=sys.stderr)
151- print(errors)
152- " )
153- echo " Errors: $ERROR_COUNT "
199+ run_sync_cycle " Stripe → Postgres" " $PG_SYNC_ID " verify_postgres
154200
155- # --- Verify Postgres ---
156- echo " "
157- echo " --- 5. Verify Postgres ---"
158- ROW_COUNT=$( psql " $POSTGRES_URL " -t -c " SELECT count(*) FROM \" $SCHEMA \" .\" products\" " | tr -d ' ' )
159- echo " products: $ROW_COUNT rows"
160- [ " $ROW_COUNT " -gt 0 ] || { echo " FAIL: expected > 0 rows" ; exit 1; }
201+ # Verify teardown actually dropped the schema
202+ if [ -z " $SKIP_DELETE " ]; then
203+ TABLE_COUNT=$( psql " $POSTGRES_URL " -t -c \
204+ " SELECT count(*) FROM information_schema.tables WHERE table_schema = '$SCHEMA '" | tr -d ' ' )
205+ echo " Post-teardown: $TABLE_COUNT tables remaining"
206+ [ " $TABLE_COUNT " -eq 0 ] || { echo " FAIL: expected 0 tables after teardown" ; exit 1; }
207+ fi
161208
162- SAMPLE=$( psql " $POSTGRES_URL " -t -c " SELECT id FROM \" $SCHEMA \" .\" products\" LIMIT 1" | tr -d ' ' )
163- echo " sample: $SAMPLE "
164- [[ " $SAMPLE " == prod_* ]] || { echo " FAIL: expected prod_ prefix, got $SAMPLE " ; exit 1; }
209+ # ── Sync 2: Stripe → Google Sheets (optional) ─────────────────────
165210
166- # --- Teardown ---
167- echo " "
168- echo " --- 6. Engine: teardown ---"
169- TEARDOWN_STATUS=$( curl -sf -o /dev/null -w " %{http_code}" -X POST " $ENGINE_URL /teardown" \
170- -H " X-Sync-Params: $PARAMS " )
171- echo " HTTP $TEARDOWN_STATUS "
172- [ " $TEARDOWN_STATUS " = " 204" ] || { echo " FAIL: expected 204, got $TEARDOWN_STATUS " ; exit 1; }
173-
174- TABLE_COUNT=$( psql " $POSTGRES_URL " -t -c \
175- " SELECT count(*) FROM information_schema.tables WHERE table_schema = '$SCHEMA '" | tr -d ' ' )
176- echo " Tables remaining: $TABLE_COUNT "
177- [ " $TABLE_COUNT " -eq 0 ] || { echo " FAIL: expected 0 tables after teardown" ; exit 1; }
211+ # Google Sheets connector doesn't support subprocess mode (setup/teardown commands).
212+ # The engine uses subprocess mode when running connectors as binaries.
213+ # Sheets e2e is tested in vitest (temporal.test.ts) where connectors run in-process.
214+ SHEETS_ENABLED=" ${SHEETS_ENABLED:- } "
215+ if [ -n " $SHEETS_ENABLED " ] && [ -n " ${GOOGLE_CLIENT_ID:- } " ] && [ -n " ${GOOGLE_CLIENT_SECRET:- } " ] && \
216+ [ -n " ${GOOGLE_REFRESH_TOKEN:- } " ] && [ -n " ${GOOGLE_SPREADSHEET_ID:- } " ]; then
217+
218+ echo " "
219+ echo " --- Creating Google Sheets sync ---"
220+ SHEETS_SYNC_RESP=$( curl -sf -X POST " $SERVICE_URL /syncs" \
221+ -H ' Content-Type: application/json' \
222+ -d " {
223+ \" source\" : { \" type\" : \" stripe\" , \" api_key\" : \" $STRIPE_API_KEY \" , \" backfill_limit\" : 3 },
224+ \" destination\" : {
225+ \" type\" : \" google-sheets\" ,
226+ \" client_id\" : \" $GOOGLE_CLIENT_ID \" ,
227+ \" client_secret\" : \" $GOOGLE_CLIENT_SECRET \" ,
228+ \" refresh_token\" : \" $GOOGLE_REFRESH_TOKEN \" ,
229+ \" access_token\" : \" placeholder\" ,
230+ \" spreadsheet_id\" : \" $GOOGLE_SPREADSHEET_ID \"
231+ },
232+ \" streams\" : [{ \" name\" : \" products\" }]
233+ }" )
234+ SHEETS_SYNC_ID=$( echo " $SHEETS_SYNC_RESP " | python3 -c " import sys,json; print(json.load(sys.stdin)['id'])" )
235+ echo " Sync: $SHEETS_SYNC_ID (spreadsheet: $GOOGLE_SPREADSHEET_ID )"
236+
237+ verify_sheets () {
238+ # Read back via Sheets API using python + google-auth
239+ local row_count
240+ row_count=$( python3 -c "
241+ import json, urllib.request, urllib.parse
242+
243+ # Get access token via refresh
244+ data = urllib.parse.urlencode({
245+ 'client_id': '$GOOGLE_CLIENT_ID ',
246+ 'client_secret': '$GOOGLE_CLIENT_SECRET ',
247+ 'refresh_token': '$GOOGLE_REFRESH_TOKEN ',
248+ 'grant_type': 'refresh_token',
249+ }).encode()
250+ req = urllib.request.Request('https://oauth2.googleapis.com/token', data)
251+ token = json.loads(urllib.request.urlopen(req).read())['access_token']
252+
253+ # Read sheet
254+ url = f'https://sheets.googleapis.com/v4/spreadsheets/$GOOGLE_SPREADSHEET_ID /values/products'
255+ req = urllib.request.Request(url, headers={'Authorization': f'Bearer {token}'})
256+ resp = json.loads(urllib.request.urlopen(req).read())
257+ rows = resp.get('values', [])
258+ print(len(rows) - 1 if len(rows) > 1 else 0) # minus header
259+ " )
260+ echo " Verify: $row_count data rows in 'products' tab"
261+ [ " $row_count " -gt 0 ] || { echo " FAIL: expected > 0 rows in sheet" ; exit 1; }
262+
263+ if [ -n " $SKIP_DELETE " ]; then
264+ echo " Data preserved: https://docs.google.com/spreadsheets/d/$GOOGLE_SPREADSHEET_ID "
265+ fi
266+ }
267+
268+ run_sync_cycle " Stripe → Google Sheets" " $SHEETS_SYNC_ID " verify_sheets
269+ else
270+ echo " "
271+ echo " --- Skipping Google Sheets sync (set SHEETS_ENABLED=1 + Google env vars) ---"
272+ echo " Note: Sheets connector requires in-process mode; use vitest (temporal.test.ts) for full Sheets e2e"
273+ fi
178274
179275echo " "
180276echo " === All checks passed ==="
0 commit comments