Skip to content

Sync Repos to D1 Cache #103

Sync Repos to D1 Cache

Sync Repos to D1 Cache #103

Workflow file for this run

name: Sync Repos to D1 Cache
on:
schedule:
- cron: "0 2 * * *" # Run daily at 2am
workflow_dispatch: # Manual trigger
jobs:
sync-to-d1:
name: Analyze & Upsert Repos
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install Dependencies
run: pip install openai pydantic requests pygithub
- name: Run Sync Agent
env:
# GitHub & Worker Auth
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
# Cloudflare AI Gateway (AI Agent)
CF_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }}
CF_GATEWAY_ID: ${{ secrets.CLOUDFLARE_GATEWAY_ID }}
CF_API_TOKEN: ${{ secrets.CLOUDFLARE_AI_GATEWAY_TOKEN }}
WORKER_API_URL: ${{ secrets.WORKER_API_URL_BASE }}
WORKER_API_KEY: ${{ secrets.WORKER_API_KEY }}
QUERY: ${{ github.event.inputs.query }}
# Target User
TARGET_USER: ${{ github.repository_owner }}
run: |
cat <<EOF > sync_repos.py
import os
import json
import asyncio
import requests
from typing import List, Set
from pydantic import BaseModel, Field
from openai import AsyncOpenAI
from github import Github
# --- CONFIGURATION ---
API_BASE = os.environ["WORKER_API_URL"]
WORKER_KEY = os.environ["WORKER_API_KEY"]
TARGET_USER = os.environ["TARGET_USER"]
# Cloudflare AI Gateway
client = AsyncOpenAI(
api_key=os.environ["CF_API_TOKEN"],
base_url=f"https://gateway.ai.cloudflare.com/v1/{os.environ['CF_ACCOUNT_ID']}/{os.environ['CF_GATEWAY_ID']}/compat"
)
MODEL_ID = "@cf/openai/gpt-oss-120b"
# --- DATA MODELS ---
class RepoAnalysis(BaseModel):
summary: str = Field(..., description="A concise, technical summary of what the repo does (max 2 sentences).")
tags: List[str] = Field(..., description="List of tech stack tags e.g. ['cloudflare-workers', 'astro', 'python', 'd1', 'shadcn']")
# --- STEPS ---
def get_existing_repos() -> Set[str]:
"""Step 1: Ask D1 what we already have to avoid AI costs/dupes."""
try:
url = f"{API_BASE}/api/repos/list"
print(f"Checking existing D1 records: {url}")
headers = {"X-API-Key": WORKER_KEY}
resp = requests.get(url, headers=headers)
if resp.status_code == 200:
# Expecting ["jmbish04/repo-a", "jmbish04/repo-b"]
data = resp.json()
print(f"Found {len(data)} existing repos in D1.")
return set(data)
else:
print(f"::warning::Failed to fetch existing list: {resp.status_code}")
return set()
except Exception as e:
print(f"::warning::D1 check failed: {e}")
return set()
async def analyze_repo(repo) -> dict:
"""Step 2: AI Agent analyzes code to generate metadata."""
print(f"Analyzing {repo.full_name}...")
# Gather Context (Readme + File List)
try:
readme = repo.get_readme().decoded_content.decode("utf-8")[:6000]
except:
readme = "No README."
# Get file structure to detect frameworks (e.g. verify if it's Next.js vs Astro)
try:
contents = repo.get_contents("")
files = [c.name for c in contents]
file_str = ", ".join(files)
except:
file_str = "Unknown"
prompt = f"""
Analyze this GitHub repository.
Repo: {repo.full_name}
Files: {file_str}
Readme Snippet:
{readme}
Task:
1. Summarize the project.
2. Extract technology tags (e.g. 'cloudflare', 'hono', 'drizzle', 'react', 'python').
3. Return JSON matching the schema.
"""
try:
response = await client.chat.completions.create(
model=MODEL_ID,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
max_tokens=500
)
analysis = RepoAnalysis.model_validate_json(response.choices[0].message.content)
return {
"owner": repo.owner.login,
"name": repo.name,
"full_name": repo.full_name,
"description": repo.description or "",
"url": repo.html_url,
"language": repo.language,
"stars": repo.stargazers_count,
"ai_summary": analysis.summary,
"tags": analysis.tags,
"last_updated": repo.updated_at.isoformat()
}
except Exception as e:
print(f"AI Analysis failed for {repo.name}: {e}")
return None
async def main():
# 1. Get D1 Cache
existing_keys = get_existing_repos()
# 2. Get GitHub Repos
g = Github(os.environ["GITHUB_TOKEN"])
user = g.get_user(TARGET_USER)
repos_to_process = []
print(f"Fetching repos for {TARGET_USER}...")
for repo in user.get_repos(sort="updated", direction="desc"):
# Skip if private (optional, remove check if you want private synced)
# if repo.private: continue
# THE DEDUPLICATION LOGIC
key = f"{repo.owner.login}/{repo.name}"
if key in existing_keys:
print(f"Skipping {key} (Already in D1)")
continue
repos_to_process.append(repo)
if len(repos_to_process) >= 5: # Batch limit to save run time/tokens per run
break
print(f"Processing {len(repos_to_process)} new/stale repositories...")
# 3. Analyze & Upsert
for repo in repos_to_process:
data = await analyze_repo(repo)
if data:
# POST to Worker
print(f"Upserting {data['full_name']} to D1...")
try:
r = requests.post(
f"{API_BASE}/api/repos/upsert",
json=data,
headers={"X-API-Key": WORKER_KEY}
)
r.raise_for_status()
print("Success.")
except Exception as e:
print(f"::error::Failed to upload {data['name']}: {e}")
if __name__ == "__main__":
asyncio.run(main())
EOF
python sync_repos.py