|
| 1 | +import asyncio |
| 2 | +import json |
| 3 | + |
| 4 | +import httpx |
1 | 5 | from fastapi import FastAPI, HTTPException |
2 | 6 | from fastapi.staticfiles import StaticFiles |
3 | 7 | from posit import connect |
|
6 | 10 |
|
7 | 11 | client = connect.Client() |
8 | 12 |
|
| 13 | + |
9 | 14 | @app.get("/api/content") |
10 | 15 | async def search_content(show_all: bool = False): |
11 | 16 | if show_all: |
12 | 17 | return client.content.find() |
13 | 18 | return client.me.content.find() |
14 | 19 |
|
| 20 | + |
15 | 21 | @app.get("/api/packages/{guid}") |
16 | 22 | async def get_packages(guid: str): |
17 | 23 | try: |
18 | 24 | content = client.content.get(guid) |
19 | 25 | packages = list(content.packages) |
20 | 26 | return packages |
21 | 27 | except Exception as e: |
22 | | - raise HTTPException(status_code=404, detail=f"Content not found or error fetching packages: {str(e)}") |
23 | | - |
| 28 | + raise HTTPException( |
| 29 | + status_code=404, |
| 30 | + detail=f"Content not found or error fetching packages: {str(e)}", |
| 31 | + ) |
| 32 | + |
| 33 | + |
24 | 34 | @app.get("/api/vulns") |
25 | 35 | async def get_vulnerabilities(): |
26 | | - import httpx |
27 | | - import asyncio |
28 | | - |
29 | 36 | repositories = ["pypi", "cran"] |
30 | 37 | results = {} |
31 | | - |
| 38 | + |
32 | 39 | async def fetch_repo_vulns(repo): |
33 | | - async with httpx.AsyncClient() as client: |
34 | | - url = f"https://packagemanager.posit.co/__api__/repos/{repo}/vulns" |
35 | | - response = await client.get(url) |
| 40 | + async with httpx.AsyncClient() as http: |
| 41 | + # This fetches vulnerabilities from the public Posit Package Manager, |
| 42 | + # which is always up to date. If customizing this to reference your |
| 43 | + # own PPM instance, note that the has_vulns parameter requires |
| 44 | + # PPM 2026.04 or later. Older versions may need to use the vulns |
| 45 | + # parameter instead, or the legacy endpoint: |
| 46 | + # |
| 47 | + # url = f"https://your-ppm-instance/__api__/repos/{repo}/vulns" |
| 48 | + # response = await http.get(url) |
| 49 | + # return repo, response.json() |
| 50 | + # |
| 51 | + # tasks = [fetch_repo_vulns(repo) for repo in repositories] |
| 52 | + # for repo, data in await asyncio.gather(*tasks): |
| 53 | + # results[repo] = data |
| 54 | + url = "https://packagemanager.posit.co/__api__/filter/packages" |
| 55 | + payload = { |
| 56 | + "repo": repo, |
| 57 | + "has_vulns": True, |
| 58 | + "omit_downloads": True, |
| 59 | + "omit_dependencies": True, |
| 60 | + } |
| 61 | + response = await http.post(url, json=payload) |
36 | 62 | response.raise_for_status() |
37 | | - return repo, response.json() |
38 | | - |
| 63 | + packages = [ |
| 64 | + json.loads(line) for line in response.text.strip().split("\n") if line |
| 65 | + ] |
| 66 | + return repo, packages |
| 67 | + |
39 | 68 | tasks = [fetch_repo_vulns(repo) for repo in repositories] |
40 | | - for repo, data in await asyncio.gather(*tasks): |
41 | | - results[repo] = data |
42 | | - |
| 69 | + for repo, packages in await asyncio.gather(*tasks): |
| 70 | + results[repo] = {pkg["name"]: pkg["vulns"] for pkg in packages} |
| 71 | + |
43 | 72 | return results |
44 | 73 |
|
| 74 | + |
45 | 75 | @app.get("/api/user") |
46 | 76 | async def get_current_user(): |
47 | 77 | return client.me |
48 | 78 |
|
| 79 | + |
49 | 80 | app.mount("/", StaticFiles(directory="dist", html=True), name="static") |
0 commit comments