diff --git a/.github/workflows/ci-dev.yml b/.github/workflows/ci-dev.yml index 74ccaaf..dcf43d3 100644 --- a/.github/workflows/ci-dev.yml +++ b/.github/workflows/ci-dev.yml @@ -48,7 +48,8 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip wheel - pip install -r dev_requirements.txt + Copy-Item dev.toml pyproject.toml -Force + pip install -e . pip install pytest pytest-cov - name: Run pytest with coverage run: python -m pytest tests/ -v --tb=short --cov=automation_file --cov-report=term-missing --cov-report=xml diff --git a/.github/workflows/ci-stable.yml b/.github/workflows/ci-stable.yml index 97f180a..d6911e7 100644 --- a/.github/workflows/ci-stable.yml +++ b/.github/workflows/ci-stable.yml @@ -48,7 +48,8 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip wheel - pip install -r requirements.txt + Copy-Item stable.toml pyproject.toml -Force + pip install -e . pip install pytest pytest-cov - name: Run pytest with coverage run: python -m pytest tests/ -v --tb=short --cov=automation_file --cov-report=term-missing --cov-report=xml diff --git a/.idea/FileAutomation.iml b/.idea/FileAutomation.iml index 673a12b..91f2d42 100644 --- a/.idea/FileAutomation.iml +++ b/.idea/FileAutomation.iml @@ -5,7 +5,7 @@ - + \ No newline at end of file diff --git a/README.md b/README.md index e781ad8..edc387b 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ # FileAutomation +**English** | [繁體中文](README.zh-TW.md) | [简体中文](README.zh-CN.md) + A modular automation framework for local file / directory / ZIP operations, SSRF-validated HTTP downloads, remote storage (Google Drive, S3, Azure Blob, Dropbox, SFTP), and JSON-driven action execution over embedded TCP / HTTP @@ -14,7 +16,33 @@ facade. - JSON action lists executed by a shared `ActionExecutor` — validate, dry-run, parallel - Loopback-first TCP **and** HTTP servers that accept JSON command batches with optional shared-secret auth - Reliability primitives: `retry_on_transient` decorator, `Quota` size / time budgets -- PySide6 GUI (`python -m automation_file ui`) with a tab per backend plus a JSON-action runner +- **File-watcher triggers** — run an action list whenever a path changes (`FA_watch_*`) +- **Cron scheduler** — recurring action lists on a stdlib-only 5-field parser (`FA_schedule_*`) +- **Transfer progress + cancellation** — opt-in `progress_name` hook on HTTP and S3 transfers (`FA_progress_*`) +- **Fast file search** — OS index fast path (`mdfind` / `locate` / `es.exe`) with a streaming `scandir` fallback (`FA_fast_find`) +- **Checksums + integrity verification** — streaming `file_checksum` / `verify_checksum` with any `hashlib` algorithm; `download_file(expected_sha256=...)` verifies after transfer (`FA_file_checksum`, `FA_verify_checksum`) +- **Resumable HTTP downloads** — `download_file(resume=True)` writes to `.part` and sends `Range: bytes=-` so interrupted transfers continue +- **Duplicate-file finder** — three-stage size → partial-hash → full-hash pipeline; unique-size files are never hashed (`FA_find_duplicates`) +- **DAG action executor** — topological scheduling with parallel fan-out and per-branch skip-on-failure (`FA_execute_action_dag`) +- **Entry-point plugins** — third-party packages register their own `FA_*` actions via `[project.entry-points."automation_file.actions"]`; `build_default_registry()` picks them up automatically +- **Incremental directory sync** — rsync-style mirror with size+mtime or checksum change detection, optional delete of extras, dry-run (`FA_sync_dir`) +- **Directory manifests** — JSON snapshot of every file's checksum under a root, with separate missing/modified/extra reporting on verify (`FA_write_manifest`, `FA_verify_manifest`) +- **Notification sinks** — webhook / Slack / SMTP / Telegram / Discord / Teams / PagerDuty with a fanout manager that does per-sink error isolation and sliding-window dedup; auto-notify on trigger + scheduler failures (`FA_notify_send`, `FA_notify_list`) +- **Config file + secret providers** — declare notification sinks / defaults in `automation_file.toml`; `${env:…}` and `${file:…}` references resolve through an Env/File/Chained provider abstraction so secrets stay out of the file itself +- **Config hot reload** — `ConfigWatcher` polls `automation_file.toml` and re-applies sinks / defaults on change without restart +- **Shell / grep / JSON edit / tar / backup rotation** — `FA_run_shell` (argument-list subprocess with timeout), `FA_grep` (streaming text search), `FA_json_get` / `FA_json_set` / `FA_json_delete` (in-place JSON editing), `FA_create_tar` / `FA_extract_tar`, `FA_rotate_backups` +- **FTP / FTPS backend** — plain FTP or explicit FTPS via `FTP_TLS.auth()`; auto-registered as `FA_ftp_*` +- **Cross-backend copy** — `FA_copy_between` moves data between any two backends via `local://`, `s3://`, `drive://`, `azure://`, `dropbox://`, `sftp://`, `ftp://` URIs +- **Scheduler overlap guard** — running jobs are skipped on the next fire unless `allow_overlap=True` +- **Server action ACL** — `allowed_actions=(...)` restricts which commands TCP / HTTP servers will dispatch +- **Variable substitution** — opt-in `${env:VAR}` / `${date:%Y-%m-%d}` / `${uuid}` / `${cwd}` expansion in action arguments via `execute_action(..., substitute=True)` +- **Conditional execution** — `FA_if_exists` / `FA_if_newer` / `FA_if_size_gt` run a nested action list only when a guard passes +- **SQLite audit log** — `AuditLog(db_path)` records every action execution with actor / status / duration; query via `recent` / `count` / `purge` +- **File integrity monitor** — `IntegrityMonitor` polls a tree against a manifest and fires a callback + notification on drift +- **HTTPActionClient SDK** — typed Python client for the HTTP action server with shared-secret auth, loopback guard, and OPTIONS-based ping +- **AES-256-GCM file encryption** — `encrypt_file` / `decrypt_file` with `generate_key()` / `key_from_password()` (PBKDF2-HMAC-SHA256); JSON actions `FA_encrypt_file` / `FA_decrypt_file` +- **Prometheus metrics exporter** — `start_metrics_server()` exposes `automation_file_actions_total{action,status}` counters and `automation_file_action_duration_seconds{action}` histograms +- PySide6 GUI (`python -m automation_file ui`) with a tab per backend, the JSON-action runner, and dedicated tabs for Triggers, Scheduler, and live Progress - Rich CLI with one-shot subcommands plus legacy JSON-batch flags - Project scaffolding (`ProjectBuilder`) for executor-based automations @@ -36,6 +64,12 @@ flowchart LR Json[json_store] Retry[retry] QuotaMod[quota] + Progress[progress
Token + Reporter] + end + + subgraph Events["event-driven"] + TriggerMod["trigger
watchdog file watcher"] + SchedulerMod["scheduler
cron background thread"] end subgraph Local["local"] @@ -101,6 +135,14 @@ flowchart LR Registry --> Dropbox Registry --> SFTP Registry --> Builder + Registry --> TriggerMod + Registry --> SchedulerMod + Registry --> Progress + + TriggerMod --> Executor + SchedulerMod --> Executor + Http --> Progress + S3 --> Progress Http --> UrlVal Http --> Retry @@ -130,7 +172,7 @@ Requirements: - Python 3.10+ - Bundled dependencies: `google-api-python-client`, `google-auth-oauthlib`, `requests`, `tqdm`, `boto3`, `azure-storage-blob`, `dropbox`, `paramiko`, - `PySide6` + `PySide6`, `watchdog` ## Usage @@ -240,6 +282,401 @@ All backends (`s3`, `azure_blob`, `dropbox_api`, `sftp`) expose the same five operations: `upload_file`, `upload_dir`, `download_file`, `delete_*`, `list_*`. SFTP uses `paramiko.RejectPolicy` — unknown hosts are rejected, not auto-added. +### File-watcher triggers +Run an action list whenever a filesystem event fires on a watched path: + +```python +from automation_file import watch_start, watch_stop + +watch_start( + name="inbox-sweeper", + path="/data/inbox", + action_list=[["FA_copy_all_file_to_dir", {"source_dir": "/data/inbox", + "target_dir": "/data/processed"}]], + events=["created", "modified"], + recursive=False, +) +# later: +watch_stop("inbox-sweeper") +``` + +`FA_watch_start` / `FA_watch_stop` / `FA_watch_stop_all` / `FA_watch_list` +surface the same lifecycle to JSON action lists. + +### Cron scheduler +Recurring action lists on a stdlib-only 5-field cron parser: + +```python +from automation_file import schedule_add + +schedule_add( + name="nightly-snapshot", + cron_expression="0 2 * * *", # every day at 02:00 local time + action_list=[["FA_zip_dir", {"dir_we_want_to_zip": "/data", + "zip_name": "/backup/data_nightly"}]], +) +``` + +Supports `*`, exact values, `a-b` ranges, comma lists, and `*/n` step +syntax with `jan..dec` / `sun..sat` aliases. JSON actions: +`FA_schedule_add`, `FA_schedule_remove`, `FA_schedule_remove_all`, +`FA_schedule_list`. + +### Transfer progress + cancellation +HTTP and S3 transfers accept an opt-in `progress_name` kwarg: + +```python +from automation_file import download_file, progress_cancel + +download_file("https://example.com/big.bin", "big.bin", + progress_name="big-download") + +# From another thread or the GUI: +progress_cancel("big-download") +``` + +The shared `progress_registry` exposes live snapshots via `progress_list()` +and the `FA_progress_list` / `FA_progress_cancel` / `FA_progress_clear` JSON +actions. The GUI's **Progress** tab polls the registry every half second. + +### Fast file search +Query an OS index when available (`mdfind` on macOS, `locate` / `plocate` on +Linux, Everything's `es.exe` on Windows) and fall back to a streaming +`os.scandir` walk otherwise. No extra dependencies. + +```python +from automation_file import fast_find, scandir_find, has_os_index + +# Uses the OS indexer when available, scandir fallback otherwise. +results = fast_find("/var/log", "*.log", limit=100) + +# Force the portable path (skip the OS indexer). +results = fast_find("/data", "report_*.csv", use_index=False) + +# Streaming — stop early without scanning the whole tree. +for path in scandir_find("/data", "*.csv"): + if "2026" in path: + break +``` + +`FA_fast_find` exposes the same function to JSON action lists: + +```json +[["FA_fast_find", {"root": "/var/log", "pattern": "*.log", "limit": 50}]] +``` + +### Checksums + integrity verification +Stream any `hashlib` algorithm; `verify_checksum` compares with +`hmac.compare_digest` (constant-time): + +```python +from automation_file import file_checksum, verify_checksum + +digest = file_checksum("bundle.tar.gz") # sha256 by default +verify_checksum("bundle.tar.gz", digest) # -> True +verify_checksum("bundle.tar.gz", "deadbeef...", algorithm="blake2b") +``` + +Also available as `FA_file_checksum` / `FA_verify_checksum` JSON actions. + +### Resumable HTTP downloads +`download_file(resume=True)` writes to `.part` and sends +`Range: bytes=-` on the next attempt. Pair with `expected_sha256=` for +integrity verification once the transfer completes: + +```python +from automation_file import download_file + +download_file( + "https://example.com/big.bin", + "big.bin", + resume=True, + expected_sha256="3b0c44298fc1...", +) +``` + +### Duplicate-file finder +Three-stage pipeline: size bucket → 64 KiB partial hash → full hash. +Unique-size files are never hashed: + +```python +from automation_file import find_duplicates + +groups = find_duplicates("/data", min_size=1024) +# list[list[str]] — each inner list is a set of identical files, sorted +# by size descending. +``` + +`FA_find_duplicates` runs the same search from JSON. + +### Incremental directory sync +`sync_dir` mirrors `src` into `dst` by copying only files that are new or +changed. Change detection is `(size, mtime)` by default; pass +`compare="checksum"` when mtime is unreliable. Extras under `dst` are left +alone by default — pass `delete=True` to prune them (and `dry_run=True` to +preview): + +```python +from automation_file import sync_dir + +summary = sync_dir("/data/src", "/data/dst", delete=True) +# summary: {"copied": [...], "skipped": [...], "deleted": [...], +# "errors": [...], "dry_run": False} +``` + +Symlinks are re-created as symlinks rather than followed, so a link +pointing outside the tree can't blow up the mirror. JSON action: +`FA_sync_dir`. + +### Directory manifests +Write a JSON manifest of every file's checksum under a tree and verify the +tree hasn't changed later: + +```python +from automation_file import write_manifest, verify_manifest + +write_manifest("/release/payload", "/release/MANIFEST.json") + +# Later… +result = verify_manifest("/release/payload", "/release/MANIFEST.json") +if not result["ok"]: + raise SystemExit(f"manifest mismatch: {result}") +``` + +`result` reports `matched`, `missing`, `modified`, and `extra` lists +separately. Extras don't fail verification (mirrors `sync_dir`'s +non-deleting default); `missing` or `modified` do. JSON actions: +`FA_write_manifest`, `FA_verify_manifest`. + +### Notifications +Push one-off messages or auto-notify on trigger/scheduler failures via +webhook, Slack, or SMTP: + +```python +from automation_file import ( + SlackSink, WebhookSink, EmailSink, + notification_manager, notify_send, +) + +notification_manager.register(SlackSink("https://hooks.slack.com/services/T/B/X")) +notify_send("deploy complete", body="rev abc123", level="info") +``` + +Every sink implements the same `send(subject, body, level)` contract. The +fanout `NotificationManager` does per-sink error isolation (one broken +sink doesn't starve the others), sliding-window dedup so a stuck trigger +can't flood a channel, and SSRF validation on every webhook/Slack URL. +Scheduler and trigger dispatchers auto-notify on failure at +`level="error"` — registering a sink is all that's needed. JSON actions: +`FA_notify_send`, `FA_notify_list`. + +### Config file and secret providers +Declare sinks and defaults once in `automation_file.toml`. Secret +references resolve at load time from environment variables or a file root +(Docker / K8s style): + +```toml +# automation_file.toml + +[secrets] +file_root = "/run/secrets" + +[defaults] +dedup_seconds = 120 + +[[notify.sinks]] +type = "slack" +name = "team-alerts" +webhook_url = "${env:SLACK_WEBHOOK}" + +[[notify.sinks]] +type = "email" +name = "ops-email" +host = "smtp.example.com" +port = 587 +sender = "alerts@example.com" +recipients = ["ops@example.com"] +username = "${env:SMTP_USER}" +password = "${file:smtp_password}" +``` + +```python +from automation_file import AutomationConfig, notification_manager + +config = AutomationConfig.load("automation_file.toml") +config.apply_to(notification_manager) +``` + +Unresolved `${…}` references raise `SecretNotFoundException` rather than +silently becoming empty strings. Custom provider chains can be built from +`ChainedSecretProvider` / `EnvSecretProvider` / `FileSecretProvider` and +passed as `AutomationConfig.load(path, provider=…)`. + +### Variable substitution in action lists +Opt in with `substitute=True` and `${…}` references expand at dispatch time: + +```python +from automation_file import execute_action + +execute_action( + [["FA_create_file", {"file_path": "reports/${date:%Y-%m-%d}/${uuid}.txt"}]], + substitute=True, +) +``` + +Supports `${env:VAR}`, `${date:FMT}` (strftime), `${uuid}`, `${cwd}`. Unknown +names raise `SubstitutionException` — no silent empty strings. + +### Conditional execution +Run a nested action list only when a path-based guard passes: + +```json +[ + ["FA_if_exists", {"path": "/data/in/job.json", + "then": [["FA_copy_file", {"source": "/data/in/job.json", + "target": "/data/processed/job.json"}]]}], + ["FA_if_newer", {"source": "/src", "target": "/dst", + "then": [["FA_sync_dir", {"src": "/src", "dst": "/dst"}]]}], + ["FA_if_size_gt", {"path": "/logs/app.log", "size": 10485760, + "then": [["FA_run_shell", {"command": ["logrotate", "/logs/app.log"]}]]}] +] +``` + +### SQLite audit log +`AuditLog` writes one row per action with short-lived connections and a +module-level lock: + +```python +from automation_file import AuditLog + +audit = AuditLog("audit.sqlite3") +audit.record(action="FA_copy_file", actor="ops", + status="ok", duration_ms=12, detail={"src": "a", "dst": "b"}) + +for row in audit.recent(limit=50): + print(row["timestamp"], row["action"], row["status"]) +``` + +### File integrity monitor +Poll a tree against a manifest and fire a callback + notification on drift: + +```python +from automation_file import IntegrityMonitor, notification_manager, write_manifest + +write_manifest("/srv/site", "/srv/MANIFEST.json") + +mon = IntegrityMonitor( + root="/srv/site", + manifest_path="/srv/MANIFEST.json", + interval=60.0, + manager=notification_manager, + on_drift=lambda summary: print("drift:", summary), +) +mon.start() +``` + +Manifest-load errors are surfaced as drift so tamper and config issues +aren't silently different code paths. + +### AES-256-GCM file encryption +Authenticated encryption with a self-describing envelope. Derive a key from +a password or generate one directly: + +```python +from automation_file import encrypt_file, decrypt_file, key_from_password + +key = key_from_password("correct horse battery staple", salt=b"app-salt-v1") +encrypt_file("secret.pdf", "secret.pdf.enc", key, associated_data=b"v1") +decrypt_file("secret.pdf.enc", "secret.pdf", key, associated_data=b"v1") +``` + +Tamper is detected via GCM's authentication tag and reported as +`CryptoException("authentication failed")`. JSON actions: +`FA_encrypt_file`, `FA_decrypt_file`. + +### HTTPActionClient Python SDK +Typed client for the HTTP action server; enforces loopback by default and +carries the shared secret for you: + +```python +from automation_file import HTTPActionClient + +with HTTPActionClient("http://127.0.0.1:9944", shared_secret="s3cr3t") as client: + client.ping() # OPTIONS /actions + result = client.execute([["FA_create_dir", {"dir_path": "x"}]]) +``` + +Auth failures map to `HTTPActionClientException` with `kind="unauthorized"`; +404 responses report the server exists but does not expose `/actions`. + +### Prometheus metrics exporter +`ActionExecutor` records one counter row and one histogram sample per +action. Serve them on a loopback `/metrics` endpoint: + +```python +from automation_file import start_metrics_server + +server = start_metrics_server(host="127.0.0.1", port=9945) +# curl http://127.0.0.1:9945/metrics +``` + +Exports `automation_file_actions_total{action,status}` and +`automation_file_action_duration_seconds{action}`. Non-loopback binds +require `allow_non_loopback=True` explicitly. + +### DAG action executor +Run actions in dependency order; independent branches fan out across a +thread pool. Each node is `{"id": ..., "action": [...], "depends_on": +[...]}`: + +```python +from automation_file import execute_action_dag + +execute_action_dag([ + {"id": "fetch", "action": ["FA_download_file", + ["https://example.com/src.tar.gz", "src.tar.gz"]]}, + {"id": "verify", "action": ["FA_verify_checksum", + ["src.tar.gz", "3b0c44298fc1..."]], + "depends_on": ["fetch"]}, + {"id": "unpack", "action": ["FA_unzip_file", ["src.tar.gz", "src"]], + "depends_on": ["verify"]}, +]) +``` + +If `verify` raises, `unpack` is marked `skipped` by default. Pass +`fail_fast=False` to run dependents regardless. JSON action: +`FA_execute_action_dag`. + +### Entry-point plugins +Third-party packages advertise actions via `pyproject.toml`: + +```toml +[project.entry-points."automation_file.actions"] +my_plugin = "my_plugin:register" +``` + +where `register` is a zero-argument callable returning a +`dict[str, Callable]`. Once installed in the same environment, the +commands show up in every freshly-built registry: + +```python +# my_plugin/__init__.py +def greet(name: str) -> str: + return f"hello {name}" + +def register() -> dict: + return {"FA_greet": greet} +``` + +```python +# after `pip install my_plugin` +from automation_file import execute_action +execute_action([["FA_greet", {"name": "world"}]]) +``` + +Plugin failures are logged and swallowed — one broken plugin cannot +break the library. + ### GUI ```bash python -m automation_file ui # or: python main_ui.py @@ -250,7 +687,7 @@ from automation_file import launch_ui launch_ui() ``` -Tabs: Local, HTTP, Google Drive, S3, Azure Blob, Dropbox, SFTP, JSON actions, +Tabs: Home, Local, Transfer, Progress, JSON actions, Triggers, Scheduler, Servers. A persistent log panel at the bottom streams every result and error. ### Scaffold an executor-based project diff --git a/README.zh-CN.md b/README.zh-CN.md new file mode 100644 index 0000000..c0ae731 --- /dev/null +++ b/README.zh-CN.md @@ -0,0 +1,726 @@ +# FileAutomation + +[English](README.md) | [繁體中文](README.zh-TW.md) | **简体中文** + +一套模块化的自动化框架,涵盖本地文件 / 目录 / ZIP 操作、经 SSRF 验证的 HTTP +下载、远程存储(Google Drive、S3、Azure Blob、Dropbox、SFTP),以及通过内嵌 +TCP / HTTP 服务器执行的 JSON 驱动动作。内附 PySide6 GUI,每个功能都有对应 +页签。所有公开 API 均由顶层 `automation_file` facade 统一导出。 + +- 本地文件 / 目录 / ZIP 操作,内置路径穿越防护(`safe_join`) +- 经 SSRF 验证的 HTTP 下载,支持重试与大小 / 时间上限 +- Google Drive CRUD(上传、下载、搜索、删除、分享、文件夹) +- 一等公民的 S3、Azure Blob、Dropbox、SFTP 后端 — 默认安装 +- JSON 动作清单由共享的 `ActionExecutor` 执行 — 支持验证、干跑、并行 +- Loopback 优先的 TCP **与** HTTP 服务器,接受 JSON 指令批量并可选 shared-secret 验证 +- 可靠性原语:`retry_on_transient` 装饰器、`Quota` 大小 / 时间预算 +- **文件监听触发** — 当路径变动时执行动作清单(`FA_watch_*`) +- **Cron 调度器** — 仅用标准库的 5 字段解析器执行周期性动作清单(`FA_schedule_*`) +- **传输进度 + 取消** — HTTP 与 S3 传输可选的 `progress_name` 钩子(`FA_progress_*`) +- **快速文件搜索** — OS 索引快速路径(`mdfind` / `locate` / `es.exe`)搭配流式 `scandir` 回退(`FA_fast_find`) +- **校验和 + 完整性验证** — 流式 `file_checksum` / `verify_checksum`,支持任何 `hashlib` 算法;`download_file(expected_sha256=...)` 在下载完成后立即验证(`FA_file_checksum`、`FA_verify_checksum`) +- **可续传 HTTP 下载** — `download_file(resume=True)` 写入 `.part` 并发送 `Range: bytes=-`,让中断的传输继续而非从头开始 +- **重复文件查找器** — 三阶段 size → 部分哈希 → 完整哈希管线;大小唯一的文件完全不会被哈希(`FA_find_duplicates`) +- **DAG 动作执行器** — 按依赖顺序拓扑调度,独立分支并行展开,失败时其后代默认标记为跳过(`FA_execute_action_dag`) +- **Entry-point 插件** — 第三方包通过 `[project.entry-points."automation_file.actions"]` 注册自定义 `FA_*` 动作;`build_default_registry()` 会自动加载 +- **增量目录同步** — rsync 风格镜像,支持 size+mtime 或 checksum 变更检测,可选删除多余文件,支持干跑(`FA_sync_dir`) +- **目录 manifest** — 以 JSON 快照记录树下每个文件的校验和,验证时分开报告 missing / modified / extra(`FA_write_manifest`、`FA_verify_manifest`) +- **通知 sink** — webhook / Slack / SMTP / Telegram / Discord / Teams / PagerDuty,fanout 管理器做单 sink 错误隔离与滑动窗口去重;trigger + scheduler 失败时自动通知(`FA_notify_send`、`FA_notify_list`) +- **配置文件 + 密钥提供者** — 在 `automation_file.toml` 声明通知 sink / 默认值;`${env:…}` 与 `${file:…}` 引用通过 Env / File / Chained 提供者抽象解析,让密钥不留在配置文件中 +- **配置热加载** — `ConfigWatcher` 轮询 `automation_file.toml`,变更时即时应用 sink / 默认值,无需重启 +- **Shell / grep / JSON 编辑 / tar / 备份轮转** — `FA_run_shell`(参数列表式 subprocess,含超时)、`FA_grep`(流式文本搜索)、`FA_json_get` / `FA_json_set` / `FA_json_delete`(原地 JSON 编辑)、`FA_create_tar` / `FA_extract_tar`、`FA_rotate_backups` +- **FTP / FTPS 后端** — 纯 FTP 或通过 `FTP_TLS.auth()` 的显式 FTPS;自动注册为 `FA_ftp_*` +- **跨后端复制** — `FA_copy_between` 通过 `local://`、`s3://`、`drive://`、`azure://`、`dropbox://`、`sftp://`、`ftp://` URI 在任意两个后端之间搬运数据 +- **调度器重叠防护** — 正在执行的作业在下次触发时会被跳过,除非显式传入 `allow_overlap=True` +- **服务器动作 ACL** — `allowed_actions=(...)` 限制 TCP / HTTP 服务器可派发的命令 +- **变量替换** — 动作参数中可选使用 `${env:VAR}` / `${date:%Y-%m-%d}` / `${uuid}` / `${cwd}`,通过 `execute_action(..., substitute=True)` 展开 +- **条件执行** — `FA_if_exists` / `FA_if_newer` / `FA_if_size_gt` 仅在路径守卫通过时执行嵌套动作清单 +- **SQLite 审计日志** — `AuditLog(db_path)` 为每个动作记录 actor / status / duration;通过 `recent` / `count` / `purge` 查询 +- **文件完整性监控** — `IntegrityMonitor` 按 manifest 轮询整棵树,检测到 drift 时触发 callback + 通知 +- **HTTPActionClient SDK** — HTTP 动作服务器的类型化 Python 客户端,具 shared-secret 认证、loopback 守护与 OPTIONS ping +- **AES-256-GCM 文件加密** — `encrypt_file` / `decrypt_file` 搭配 `generate_key()` / `key_from_password()`(PBKDF2-HMAC-SHA256);JSON 动作 `FA_encrypt_file` / `FA_decrypt_file` +- **Prometheus metrics 导出器** — `start_metrics_server()` 提供 `automation_file_actions_total{action,status}` 计数器与 `automation_file_action_duration_seconds{action}` 直方图 +- PySide6 GUI(`python -m automation_file ui`)每个后端一个页签,含 JSON 动作执行器,另有 Triggers、Scheduler、实时 Progress 专属页签 +- 功能丰富的 CLI,包含一次性子命令与旧式 JSON 批量标志 +- 项目脚手架(`ProjectBuilder`)协助构建以 executor 为核心的自动化项目 + +## 架构 + +```mermaid +flowchart LR + User[User / CLI / JSON batch] + + subgraph Facade["automation_file (facade)"] + Public["Public API
execute_action, execute_action_parallel,
validate_action, driver_instance,
start_autocontrol_socket_server,
start_http_action_server, Quota,
retry_on_transient, safe_join, ..."] + end + + subgraph Core["core"] + Registry[(ActionRegistry
FA_* commands)] + Executor[ActionExecutor] + Callback[CallbackExecutor] + Loader[PackageLoader] + Json[json_store] + Retry[retry] + QuotaMod[quota] + Progress[progress
Token + Reporter] + end + + subgraph Events["event-driven"] + TriggerMod["trigger
watchdog file watcher"] + SchedulerMod["scheduler
cron background thread"] + end + + subgraph Local["local"] + FileOps[file_ops] + DirOps[dir_ops] + ZipOps[zip_ops] + Safe[safe_paths] + end + + subgraph Remote["remote"] + UrlVal[url_validator] + Http[http_download] + Drive["google_drive
client + *_ops"] + S3["s3"] + Azure["azure_blob"] + Dropbox["dropbox_api"] + SFTP["sftp"] + end + + subgraph Server["server"] + TCP[TCPActionServer] + HTTP[HTTPActionServer] + end + + subgraph UI["ui (PySide6)"] + Launcher[launch_ui] + MainWindow["MainWindow
9-tab control surface"] + end + + subgraph Project["project / utils"] + Builder[ProjectBuilder] + Templates[templates] + Discovery[file_discovery] + end + + User --> Public + User --> Launcher + Launcher --> MainWindow + MainWindow --> Public + Public --> Executor + Public --> Callback + Public --> Loader + Public --> TCP + Public --> HTTP + + Executor --> Registry + Executor --> Retry + Executor --> QuotaMod + Callback --> Registry + Loader --> Registry + TCP --> Executor + HTTP --> Executor + Executor --> Json + + Registry --> FileOps + Registry --> DirOps + Registry --> ZipOps + Registry --> Safe + Registry --> Http + Registry --> Drive + Registry --> S3 + Registry --> Azure + Registry --> Dropbox + Registry --> SFTP + Registry --> Builder + Registry --> TriggerMod + Registry --> SchedulerMod + Registry --> Progress + + TriggerMod --> Executor + SchedulerMod --> Executor + Http --> Progress + S3 --> Progress + + Http --> UrlVal + Http --> Retry + Builder --> Templates + Builder --> Discovery +``` + +`build_default_registry()` 构建的 `ActionRegistry` 是所有 `FA_*` 命令的唯一 +权威来源。`ActionExecutor`、`CallbackExecutor`、`PackageLoader`、 +`TCPActionServer`、`HTTPActionServer` 都通过同一份共享 registry(以 +`executor.registry` 对外公开)解析命令。 + +## 安装 + +```bash +pip install automation_file +``` + +单次安装即涵盖所有后端(Google Drive、S3、Azure Blob、Dropbox、SFTP)以及 +PySide6 GUI — 日常使用不需要任何 extras。 + +```bash +pip install "automation_file[dev]" # ruff, mypy, pre-commit, pytest-cov, build, twine +``` + +要求: +- Python 3.10+ +- 内置依赖:`google-api-python-client`、`google-auth-oauthlib`、 + `requests`、`tqdm`、`boto3`、`azure-storage-blob`、`dropbox`、`paramiko`、 + `PySide6`、`watchdog` + +## 使用方式 + +### 执行 JSON 动作清单 +```python +from automation_file import execute_action + +execute_action([ + ["FA_create_file", {"file_path": "test.txt"}], + ["FA_copy_file", {"source": "test.txt", "target": "copy.txt"}], +]) +``` + +### 验证、干跑、并行 +```python +from automation_file import execute_action, execute_action_parallel, validate_action + +# Fail-fast:只要有任何命令名称未知就在执行前中止。 +execute_action(actions, validate_first=True) + +# Dry-run:只记录会被调用的内容,不真的执行。 +execute_action(actions, dry_run=True) + +# Parallel:通过 thread pool 并行执行独立动作。 +execute_action_parallel(actions, max_workers=4) + +# 手动验证 — 返回解析后的名称列表。 +names = validate_action(actions) +``` + +### 初始化 Google Drive 并上传 +```python +from automation_file import driver_instance, drive_upload_to_drive + +driver_instance.later_init("token.json", "credentials.json") +drive_upload_to_drive("example.txt") +``` + +### 经验证的 HTTP 下载(含重试) +```python +from automation_file import download_file + +download_file("https://example.com/file.zip", "file.zip") +``` + +### 启动 loopback TCP 服务器(可选 shared-secret 验证) +```python +from automation_file import start_autocontrol_socket_server + +server = start_autocontrol_socket_server( + host="127.0.0.1", port=9943, shared_secret="optional-secret", +) +``` + +设定 `shared_secret` 时,客户端每个包都必须以 `AUTH \n` 为前缀。 +若要绑定非 loopback 地址必须明确传入 `allow_non_loopback=True`。 + +### 启动 HTTP 动作服务器 +```python +from automation_file import start_http_action_server + +server = start_http_action_server( + host="127.0.0.1", port=9944, shared_secret="optional-secret", +) + +# curl -H 'Authorization: Bearer optional-secret' \ +# -d '[["FA_create_dir",{"dir_path":"x"}]]' \ +# http://127.0.0.1:9944/actions +``` + +### Retry 与 quota 原语 +```python +from automation_file import retry_on_transient, Quota + +@retry_on_transient(max_attempts=5, backoff_base=0.5) +def flaky_network_call(): ... + +quota = Quota(max_bytes=50 * 1024 * 1024, max_seconds=30.0) +with quota.time_budget("bulk-upload"): + bulk_upload_work() +``` + +### 路径穿越防护 +```python +from automation_file import safe_join + +target = safe_join("/data/jobs", user_supplied_path) +# 若解析后的路径逃出 /data/jobs 会抛出 PathTraversalException。 +``` + +### Cloud / SFTP 后端 +每个后端都会由 `build_default_registry()` 自动注册,因此 `FA_s3_*`、 +`FA_azure_blob_*`、`FA_dropbox_*`、`FA_sftp_*` 动作开箱即用 — 不需要另外调用 +`register_*_ops`。 + +```python +from automation_file import execute_action, s3_instance + +s3_instance.later_init(region_name="us-east-1") + +execute_action([ + ["FA_s3_upload_file", {"local_path": "report.csv", "bucket": "reports", "key": "report.csv"}], +]) +``` + +所有后端(`s3`、`azure_blob`、`dropbox_api`、`sftp`)都提供相同的五组操作: +`upload_file`、`upload_dir`、`download_file`、`delete_*`、`list_*`。 +SFTP 使用 `paramiko.RejectPolicy` — 未知主机会被拒绝,不会自动加入。 + +### 文件监听触发 +每当被监听路径发生文件系统事件,就执行动作清单: + +```python +from automation_file import watch_start, watch_stop + +watch_start( + name="inbox-sweeper", + path="/data/inbox", + action_list=[["FA_copy_all_file_to_dir", {"source_dir": "/data/inbox", + "target_dir": "/data/processed"}]], + events=["created", "modified"], + recursive=False, +) +# 稍后: +watch_stop("inbox-sweeper") +``` + +`FA_watch_start` / `FA_watch_stop` / `FA_watch_stop_all` / `FA_watch_list` +让 JSON 动作清单能使用相同的生命周期。 + +### Cron 调度器 +以纯标准库的 5 字段 cron 解析器执行周期性动作清单: + +```python +from automation_file import schedule_add + +schedule_add( + name="nightly-snapshot", + cron_expression="0 2 * * *", # 每天本地时间 02:00 + action_list=[["FA_zip_dir", {"dir_we_want_to_zip": "/data", + "zip_name": "/backup/data_nightly"}]], +) +``` + +支持 `*`、确切值、`a-b` 范围、逗号列表、`*/n` 步进语法,以及 `jan..dec` / +`sun..sat` 别名。JSON 动作:`FA_schedule_add`、`FA_schedule_remove`、 +`FA_schedule_remove_all`、`FA_schedule_list`。 + +### 传输进度 + 取消 +HTTP 与 S3 传输支持可选的 `progress_name` 关键字参数: + +```python +from automation_file import download_file, progress_cancel + +download_file("https://example.com/big.bin", "big.bin", + progress_name="big-download") + +# 从另一个线程或 GUI: +progress_cancel("big-download") +``` + +共享的 `progress_registry` 通过 `progress_list()` 以及 `FA_progress_list` / +`FA_progress_cancel` / `FA_progress_clear` JSON 动作提供实时快照。GUI 的 +**Progress** 页签每半秒轮询一次 registry。 + +### 快速文件搜索 +若 OS 索引器可用就直接查询(macOS 的 `mdfind`、Linux 的 `locate` / +`plocate`、Windows 的 Everything `es.exe`),否则回退到以流式 `os.scandir` +遍历。不需要额外依赖。 + +```python +from automation_file import fast_find, scandir_find, has_os_index + +# 可用时使用 OS 索引器,否则回退到 scandir。 +results = fast_find("/var/log", "*.log", limit=100) + +# 强制使用可移植路径(跳过 OS 索引器)。 +results = fast_find("/data", "report_*.csv", use_index=False) + +# 流式 — 不需要遍历整棵树就能提前停止。 +for path in scandir_find("/data", "*.csv"): + if "2026" in path: + break +``` + +`FA_fast_find` 将同一个函数提供给 JSON 动作清单: + +```json +[["FA_fast_find", {"root": "/var/log", "pattern": "*.log", "limit": 50}]] +``` + +### 校验和 + 完整性验证 +流式处理任何 `hashlib` 算法;`verify_checksum` 使用 `hmac.compare_digest` +(常数时间)比较摘要: + +```python +from automation_file import file_checksum, verify_checksum + +digest = file_checksum("bundle.tar.gz") # 默认为 sha256 +verify_checksum("bundle.tar.gz", digest) # -> True +verify_checksum("bundle.tar.gz", "deadbeef...", algorithm="blake2b") +``` + +同时以 `FA_file_checksum` / `FA_verify_checksum` 提供 JSON 动作。 + +### 可续传 HTTP 下载 +`download_file(resume=True)` 会写入 `.part` 并在下次尝试时发送 +`Range: bytes=-`。配合 `expected_sha256=` 可在下载完成后立即验证完整性: + +```python +from automation_file import download_file + +download_file( + "https://example.com/big.bin", + "big.bin", + resume=True, + expected_sha256="3b0c44298fc1...", +) +``` + +### 重复文件查找器 +三阶段管线:按大小分桶 → 64 KiB 部分哈希 → 完整哈希。大小唯一的文件完全不会 +被哈希: + +```python +from automation_file import find_duplicates + +groups = find_duplicates("/data", min_size=1024) +# list[list[str]] — 每个内层 list 是一组相同内容的文件,按大小降序排序。 +``` + +`FA_find_duplicates` 以相同调用提供给 JSON。 + +### 增量目录同步 +`sync_dir` 以只复制新增或变更文件的方式将 `src` 镜像到 `dst`。变更检测默认 +为 `(size, mtime)`;当 mtime 不可靠时可传入 `compare="checksum"`。`dst` +下多余的文件默认保留 — 传入 `delete=True` 才会清理(`dry_run=True` 可先 +预览): + +```python +from automation_file import sync_dir + +summary = sync_dir("/data/src", "/data/dst", delete=True) +# summary: {"copied": [...], "skipped": [...], "deleted": [...], +# "errors": [...], "dry_run": False} +``` + +Symlink 会以 symlink 形式重建而非被跟随,因此指向树外的链接不会拖垮镜像。 +JSON 动作:`FA_sync_dir`。 + +### 目录 manifest +将树下每个文件的校验和写入 JSON manifest,之后再验证树是否变动: + +```python +from automation_file import write_manifest, verify_manifest + +write_manifest("/release/payload", "/release/MANIFEST.json") + +# 稍后… +result = verify_manifest("/release/payload", "/release/MANIFEST.json") +if not result["ok"]: + raise SystemExit(f"manifest mismatch: {result}") +``` + +`result` 以 `matched`、`missing`、`modified`、`extra` 分别报告列表。 +多余文件(`extra`)不会让验证失败(对齐 `sync_dir` 默认不删除的行为), +`missing` 与 `modified` 则会。JSON 动作:`FA_write_manifest`、 +`FA_verify_manifest`。 + +### 通知 +通过 webhook、Slack 或 SMTP 推送一次性消息,或在 trigger / scheduler +失败时自动通知: + +```python +from automation_file import ( + SlackSink, WebhookSink, EmailSink, + notification_manager, notify_send, +) + +notification_manager.register(SlackSink("https://hooks.slack.com/services/T/B/X")) +notify_send("deploy complete", body="rev abc123", level="info") +``` + +每个 sink 都遵循相同的 `send(subject, body, level)` 合约。Fanout 的 +`NotificationManager` 会做单 sink 错误隔离(一个坏掉的 sink 不会影响 +其他 sink)、滑动窗口去重(避免卡住的 trigger 刷屏),并对每个 +webhook / Slack URL 进行 SSRF 验证。Scheduler 与 trigger 派发器在失败时 +会以 `level="error"` 自动通知 — 只要注册 sink 就能拿到生产环境告警。 +JSON 动作:`FA_notify_send`、`FA_notify_list`。 + +### 配置文件与密钥提供者 +在 `automation_file.toml` 一次声明 sink 与默认值。密钥引用在加载时由 +环境变量或文件根目录(Docker / K8s 风格)解析: + +```toml +# automation_file.toml + +[secrets] +file_root = "/run/secrets" + +[defaults] +dedup_seconds = 120 + +[[notify.sinks]] +type = "slack" +name = "team-alerts" +webhook_url = "${env:SLACK_WEBHOOK}" + +[[notify.sinks]] +type = "email" +name = "ops-email" +host = "smtp.example.com" +port = 587 +sender = "alerts@example.com" +recipients = ["ops@example.com"] +username = "${env:SMTP_USER}" +password = "${file:smtp_password}" +``` + +```python +from automation_file import AutomationConfig, notification_manager + +config = AutomationConfig.load("automation_file.toml") +config.apply_to(notification_manager) +``` + +未解析的 `${…}` 引用会抛出 `SecretNotFoundException`,而不是默默变成空 +字符串。可组合 `ChainedSecretProvider` / `EnvSecretProvider` / +`FileSecretProvider` 构建自定义提供者链,并以 +`AutomationConfig.load(path, provider=…)` 传入。 + +### 动作清单变量替换 +以 `substitute=True` 启用后,`${…}` 引用会在派发时展开: + +```python +from automation_file import execute_action + +execute_action( + [["FA_create_file", {"file_path": "reports/${date:%Y-%m-%d}/${uuid}.txt"}]], + substitute=True, +) +``` + +支持 `${env:VAR}`、`${date:FMT}`(strftime)、`${uuid}`、`${cwd}`。未知名称 +会抛出 `SubstitutionException`,不会默默变成空字符串。 + +### 条件执行 +只在路径守卫通过时执行嵌套动作清单: + +```json +[ + ["FA_if_exists", {"path": "/data/in/job.json", + "then": [["FA_copy_file", {"source": "/data/in/job.json", + "target": "/data/processed/job.json"}]]}], + ["FA_if_newer", {"source": "/src", "target": "/dst", + "then": [["FA_sync_dir", {"src": "/src", "dst": "/dst"}]]}], + ["FA_if_size_gt", {"path": "/logs/app.log", "size": 10485760, + "then": [["FA_run_shell", {"command": ["logrotate", "/logs/app.log"]}]]}] +] +``` + +### SQLite 审计日志 +`AuditLog` 以短连接 + 模块级锁为每个动作写入一条记录: + +```python +from automation_file import AuditLog + +audit = AuditLog("audit.sqlite3") +audit.record(action="FA_copy_file", actor="ops", + status="ok", duration_ms=12, detail={"src": "a", "dst": "b"}) + +for row in audit.recent(limit=50): + print(row["timestamp"], row["action"], row["status"]) +``` + +### 文件完整性监控 +按 manifest 轮询整棵树,检测到 drift 时触发 callback + 通知: + +```python +from automation_file import IntegrityMonitor, notification_manager, write_manifest + +write_manifest("/srv/site", "/srv/MANIFEST.json") + +mon = IntegrityMonitor( + root="/srv/site", + manifest_path="/srv/MANIFEST.json", + interval=60.0, + manager=notification_manager, + on_drift=lambda summary: print("drift:", summary), +) +mon.start() +``` + +加载 manifest 时的错误也会被视为 drift,让篡改与配置问题走同一条处理 +路径。 + +### AES-256-GCM 文件加密 +带认证的加密与自描述封包格式。可由密码派生密钥或直接生成密钥: + +```python +from automation_file import encrypt_file, decrypt_file, key_from_password + +key = key_from_password("correct horse battery staple", salt=b"app-salt-v1") +encrypt_file("secret.pdf", "secret.pdf.enc", key, associated_data=b"v1") +decrypt_file("secret.pdf.enc", "secret.pdf", key, associated_data=b"v1") +``` + +篡改由 GCM 认证 tag 检测,以 `CryptoException("authentication failed")` +报告。JSON 动作:`FA_encrypt_file`、`FA_decrypt_file`。 + +### HTTPActionClient Python SDK +HTTP 动作服务器的类型化客户端;默认强制 loopback,并自动携带 shared +secret: + +```python +from automation_file import HTTPActionClient + +with HTTPActionClient("http://127.0.0.1:9944", shared_secret="s3cr3t") as client: + client.ping() # OPTIONS /actions + result = client.execute([["FA_create_dir", {"dir_path": "x"}]]) +``` + +认证失败会转换为 `HTTPActionClientException(kind="unauthorized")`; +404 则表示服务器存在但未对外提供 `/actions`。 + +### Prometheus metrics 导出器 +`ActionExecutor` 为每个动作记录一条计数器与一条直方图样本。在 loopback +`/metrics` 端点提供: + +```python +from automation_file import start_metrics_server + +server = start_metrics_server(host="127.0.0.1", port=9945) +# curl http://127.0.0.1:9945/metrics +``` + +导出 `automation_file_actions_total{action,status}` 以及 +`automation_file_action_duration_seconds{action}`。若要绑定非 loopback +地址必须显式传入 `allow_non_loopback=True`。 + +### DAG 动作执行器 +按依赖顺序执行动作;独立分支通过线程池并行展开。每个节点的形式为 +`{"id": ..., "action": [...], "depends_on": [...]}`: + +```python +from automation_file import execute_action_dag + +execute_action_dag([ + {"id": "fetch", "action": ["FA_download_file", + ["https://example.com/src.tar.gz", "src.tar.gz"]]}, + {"id": "verify", "action": ["FA_verify_checksum", + ["src.tar.gz", "3b0c44298fc1..."]], + "depends_on": ["fetch"]}, + {"id": "unpack", "action": ["FA_unzip_file", ["src.tar.gz", "src"]], + "depends_on": ["verify"]}, +]) +``` + +若 `verify` 抛出异常,默认情况下 `unpack` 会被标记为 `skipped`。传入 +`fail_fast=False` 可让后代节点仍然执行。JSON 动作:`FA_execute_action_dag`。 + +### Entry-point 插件 +第三方包通过 `pyproject.toml` 声明动作: + +```toml +[project.entry-points."automation_file.actions"] +my_plugin = "my_plugin:register" +``` + +其中 `register` 是一个零参数的可调用对象,返回 `dict[str, Callable]`。只要 +包被安装在同一个虚拟环境,这些命令就会出现在每次新建的 registry 中: + +```python +# my_plugin/__init__.py +def greet(name: str) -> str: + return f"hello {name}" + +def register() -> dict: + return {"FA_greet": greet} +``` + +```python +# 执行 `pip install my_plugin` 之后 +from automation_file import execute_action +execute_action([["FA_greet", {"name": "world"}]]) +``` + +插件失败(导入错误、factory 异常、返回类型不正确、registry 拒绝)都会被记录 +并吞掉 — 一个坏掉的插件不会影响整个库。 + +### GUI +```bash +python -m automation_file ui # 或:python main_ui.py +``` + +```python +from automation_file import launch_ui +launch_ui() +``` + +页签:Home、Local、Transfer、Progress、JSON actions、Triggers、Scheduler、 +Servers。底部常驻的 log 面板实时流式输出每一笔结果与错误。 + +### 以 executor 为核心构建项目脚手架 +```python +from automation_file import create_project_dir + +create_project_dir("my_workflow") +``` + +## CLI + +```bash +# 子命令(一次性操作) +python -m automation_file ui +python -m automation_file zip ./src out.zip --dir +python -m automation_file unzip out.zip ./restored +python -m automation_file download https://example.com/file.bin file.bin +python -m automation_file create-file hello.txt --content "hi" +python -m automation_file server --host 127.0.0.1 --port 9943 +python -m automation_file http-server --host 127.0.0.1 --port 9944 +python -m automation_file drive-upload my.txt --token token.json --credentials creds.json + +# 旧式标志(JSON 动作清单) +python -m automation_file --execute_file actions.json +python -m automation_file --execute_dir ./actions/ +python -m automation_file --execute_str '[["FA_create_dir",{"dir_path":"x"}]]' +python -m automation_file --create_project ./my_project +``` + +## JSON 动作格式 + +每一项动作可以是单纯的命令名称、`[name, kwargs]` 组合,或 `[name, args]` +列表: + +```json +[ + ["FA_create_file", {"file_path": "test.txt"}], + ["FA_drive_upload_to_drive", {"file_path": "test.txt"}], + ["FA_drive_search_all_file"] +] +``` + +## 文档 + +完整 API 文档位于 `docs/`,可用 Sphinx 生成: + +```bash +pip install -r docs/requirements.txt +sphinx-build -b html docs/source docs/_build/html +``` + +架构笔记、代码规范与安全考量请参见 [`CLAUDE.md`](CLAUDE.md)。 diff --git a/README.zh-TW.md b/README.zh-TW.md new file mode 100644 index 0000000..ee46f63 --- /dev/null +++ b/README.zh-TW.md @@ -0,0 +1,726 @@ +# FileAutomation + +[English](README.md) | **繁體中文** | [简体中文](README.zh-CN.md) + +一套模組化的自動化框架,涵蓋本機檔案 / 目錄 / ZIP 操作、經 SSRF 驗證的 HTTP +下載、遠端儲存(Google Drive、S3、Azure Blob、Dropbox、SFTP),以及透過內建 +TCP / HTTP 伺服器執行的 JSON 驅動動作。內附 PySide6 GUI,每個功能都有對應 +分頁。所有公開 API 皆由頂層 `automation_file` facade 統一匯出。 + +- 本機檔案 / 目錄 / ZIP 操作,內建路徑穿越防護(`safe_join`) +- 經 SSRF 驗證的 HTTP 下載,支援重試與大小 / 時間上限 +- Google Drive CRUD(上傳、下載、搜尋、刪除、分享、資料夾) +- 一等公民的 S3、Azure Blob、Dropbox、SFTP 後端 — 預設安裝 +- JSON 動作清單由共用的 `ActionExecutor` 執行 — 支援驗證、乾跑、平行 +- Loopback 優先的 TCP **與** HTTP 伺服器,接受 JSON 指令批次並可選 shared-secret 驗證 +- 可靠性原語:`retry_on_transient` 裝飾器、`Quota` 大小 / 時間預算 +- **檔案監看觸發** — 當路徑變動時執行動作清單(`FA_watch_*`) +- **Cron 排程器** — 僅用標準函式庫的 5 欄位解析器執行週期性動作清單(`FA_schedule_*`) +- **傳輸進度 + 取消** — HTTP 與 S3 傳輸可選的 `progress_name` 掛鉤(`FA_progress_*`) +- **快速檔案搜尋** — OS 索引快速路徑(`mdfind` / `locate` / `es.exe`)搭配串流式 `scandir` 備援(`FA_fast_find`) +- **檢查碼 + 完整性驗證** — 串流式 `file_checksum` / `verify_checksum`,支援任何 `hashlib` 演算法;`download_file(expected_sha256=...)` 於下載完成後立即驗證(`FA_file_checksum`、`FA_verify_checksum`) +- **可續傳 HTTP 下載** — `download_file(resume=True)` 寫入 `.part` 並傳送 `Range: bytes=-`,讓中斷的傳輸繼續而非從頭開始 +- **重複檔案尋找器** — 三階段 size → 部分雜湊 → 完整雜湊管線;大小唯一的檔案完全不會被雜湊(`FA_find_duplicates`) +- **DAG 動作執行器** — 依相依順序拓撲排程,獨立分支平行展開,失敗時其後代預設標記跳過(`FA_execute_action_dag`) +- **Entry-point 外掛** — 第三方套件透過 `[project.entry-points."automation_file.actions"]` 註冊自訂 `FA_*` 動作;`build_default_registry()` 會自動載入 +- **增量目錄同步** — rsync 風格鏡像,支援 size+mtime 或 checksum 變更偵測,選擇性刪除多餘檔案,支援乾跑(`FA_sync_dir`) +- **目錄 manifest** — 以 JSON 快照記錄樹下每個檔案的檢查碼,驗證時分開回報 missing / modified / extra(`FA_write_manifest`、`FA_verify_manifest`) +- **通知 sink** — webhook / Slack / SMTP / Telegram / Discord / Teams / PagerDuty,fanout 管理器做個別 sink 錯誤隔離與滑動視窗去重;trigger + scheduler 失敗時自動通知(`FA_notify_send`、`FA_notify_list`) +- **設定檔 + 秘密提供者** — 在 `automation_file.toml` 宣告通知 sink / 預設值;`${env:…}` 與 `${file:…}` 參考透過 Env / File / Chained 提供者抽象解析,讓秘密不留在檔案裡 +- **設定熱重載** — `ConfigWatcher` 輪詢 `automation_file.toml`,變更時即時套用 sink / 預設值,無需重啟 +- **Shell / grep / JSON 編輯 / tar / 備份輪替** — `FA_run_shell`(參數列表式 subprocess,含逾時)、`FA_grep`(串流文字搜尋)、`FA_json_get` / `FA_json_set` / `FA_json_delete`(原地 JSON 編輯)、`FA_create_tar` / `FA_extract_tar`、`FA_rotate_backups` +- **FTP / FTPS 後端** — 純 FTP 或透過 `FTP_TLS.auth()` 的顯式 FTPS;自動註冊為 `FA_ftp_*` +- **跨後端複製** — `FA_copy_between` 透過 `local://`、`s3://`、`drive://`、`azure://`、`dropbox://`、`sftp://`、`ftp://` URI 在任意兩個後端之間搬運資料 +- **排程器重疊防護** — 正在執行的工作在下次觸發時會被跳過,除非明確傳入 `allow_overlap=True` +- **伺服器動作 ACL** — `allowed_actions=(...)` 限制 TCP / HTTP 伺服器可派送的指令 +- **變數替換** — 動作參數中可選使用 `${env:VAR}` / `${date:%Y-%m-%d}` / `${uuid}` / `${cwd}`,透過 `execute_action(..., substitute=True)` 展開 +- **條件式執行** — `FA_if_exists` / `FA_if_newer` / `FA_if_size_gt` 僅在路徑守護通過時執行巢狀動作清單 +- **SQLite 稽核日誌** — `AuditLog(db_path)` 為每個動作記錄 actor / status / duration;以 `recent` / `count` / `purge` 查詢 +- **檔案完整性監控** — `IntegrityMonitor` 依 manifest 輪詢整棵樹,偵測到 drift 時觸發 callback + 通知 +- **HTTPActionClient SDK** — HTTP 動作伺服器的型別化 Python 客戶端,具 shared-secret 驗證、loopback 防護與 OPTIONS ping +- **AES-256-GCM 檔案加密** — `encrypt_file` / `decrypt_file` 搭配 `generate_key()` / `key_from_password()`(PBKDF2-HMAC-SHA256);JSON 動作 `FA_encrypt_file` / `FA_decrypt_file` +- **Prometheus metrics 匯出器** — `start_metrics_server()` 提供 `automation_file_actions_total{action,status}` 計數器與 `automation_file_action_duration_seconds{action}` 直方圖 +- PySide6 GUI(`python -m automation_file ui`)每個後端一個分頁,含 JSON 動作執行器,另有 Triggers、Scheduler、即時 Progress 專屬分頁 +- 功能豐富的 CLI,包含一次性子指令與舊式 JSON 批次旗標 +- 專案鷹架(`ProjectBuilder`)協助建立以 executor 為核心的自動化專案 + +## 架構 + +```mermaid +flowchart LR + User[User / CLI / JSON batch] + + subgraph Facade["automation_file (facade)"] + Public["Public API
execute_action, execute_action_parallel,
validate_action, driver_instance,
start_autocontrol_socket_server,
start_http_action_server, Quota,
retry_on_transient, safe_join, ..."] + end + + subgraph Core["core"] + Registry[(ActionRegistry
FA_* commands)] + Executor[ActionExecutor] + Callback[CallbackExecutor] + Loader[PackageLoader] + Json[json_store] + Retry[retry] + QuotaMod[quota] + Progress[progress
Token + Reporter] + end + + subgraph Events["event-driven"] + TriggerMod["trigger
watchdog file watcher"] + SchedulerMod["scheduler
cron background thread"] + end + + subgraph Local["local"] + FileOps[file_ops] + DirOps[dir_ops] + ZipOps[zip_ops] + Safe[safe_paths] + end + + subgraph Remote["remote"] + UrlVal[url_validator] + Http[http_download] + Drive["google_drive
client + *_ops"] + S3["s3"] + Azure["azure_blob"] + Dropbox["dropbox_api"] + SFTP["sftp"] + end + + subgraph Server["server"] + TCP[TCPActionServer] + HTTP[HTTPActionServer] + end + + subgraph UI["ui (PySide6)"] + Launcher[launch_ui] + MainWindow["MainWindow
9-tab control surface"] + end + + subgraph Project["project / utils"] + Builder[ProjectBuilder] + Templates[templates] + Discovery[file_discovery] + end + + User --> Public + User --> Launcher + Launcher --> MainWindow + MainWindow --> Public + Public --> Executor + Public --> Callback + Public --> Loader + Public --> TCP + Public --> HTTP + + Executor --> Registry + Executor --> Retry + Executor --> QuotaMod + Callback --> Registry + Loader --> Registry + TCP --> Executor + HTTP --> Executor + Executor --> Json + + Registry --> FileOps + Registry --> DirOps + Registry --> ZipOps + Registry --> Safe + Registry --> Http + Registry --> Drive + Registry --> S3 + Registry --> Azure + Registry --> Dropbox + Registry --> SFTP + Registry --> Builder + Registry --> TriggerMod + Registry --> SchedulerMod + Registry --> Progress + + TriggerMod --> Executor + SchedulerMod --> Executor + Http --> Progress + S3 --> Progress + + Http --> UrlVal + Http --> Retry + Builder --> Templates + Builder --> Discovery +``` + +`build_default_registry()` 建立的 `ActionRegistry` 是所有 `FA_*` 指令的唯一 +權威來源。`ActionExecutor`、`CallbackExecutor`、`PackageLoader`、 +`TCPActionServer`、`HTTPActionServer` 都透過同一份共用 registry(以 +`executor.registry` 對外公開)解析指令。 + +## 安裝 + +```bash +pip install automation_file +``` + +單一安裝即涵蓋所有後端(Google Drive、S3、Azure Blob、Dropbox、SFTP)以及 +PySide6 GUI — 日常使用不需要任何 extras。 + +```bash +pip install "automation_file[dev]" # ruff, mypy, pre-commit, pytest-cov, build, twine +``` + +需求: +- Python 3.10+ +- 內建相依套件:`google-api-python-client`、`google-auth-oauthlib`、 + `requests`、`tqdm`、`boto3`、`azure-storage-blob`、`dropbox`、`paramiko`、 + `PySide6`、`watchdog` + +## 使用方式 + +### 執行 JSON 動作清單 +```python +from automation_file import execute_action + +execute_action([ + ["FA_create_file", {"file_path": "test.txt"}], + ["FA_copy_file", {"source": "test.txt", "target": "copy.txt"}], +]) +``` + +### 驗證、乾跑、平行 +```python +from automation_file import execute_action, execute_action_parallel, validate_action + +# Fail-fast:只要有任何指令名稱未知就在執行前中止。 +execute_action(actions, validate_first=True) + +# Dry-run:只記錄會被呼叫的內容,不真的執行。 +execute_action(actions, dry_run=True) + +# Parallel:透過 thread pool 平行執行獨立動作。 +execute_action_parallel(actions, max_workers=4) + +# 手動驗證 — 回傳解析後的名稱清單。 +names = validate_action(actions) +``` + +### 初始化 Google Drive 並上傳 +```python +from automation_file import driver_instance, drive_upload_to_drive + +driver_instance.later_init("token.json", "credentials.json") +drive_upload_to_drive("example.txt") +``` + +### 經驗證的 HTTP 下載(含重試) +```python +from automation_file import download_file + +download_file("https://example.com/file.zip", "file.zip") +``` + +### 啟動 loopback TCP 伺服器(可選 shared-secret 驗證) +```python +from automation_file import start_autocontrol_socket_server + +server = start_autocontrol_socket_server( + host="127.0.0.1", port=9943, shared_secret="optional-secret", +) +``` + +設定 `shared_secret` 時,客戶端每個封包都必須以 `AUTH \n` 為前綴。 +若要綁定非 loopback 位址必須明確傳入 `allow_non_loopback=True`。 + +### 啟動 HTTP 動作伺服器 +```python +from automation_file import start_http_action_server + +server = start_http_action_server( + host="127.0.0.1", port=9944, shared_secret="optional-secret", +) + +# curl -H 'Authorization: Bearer optional-secret' \ +# -d '[["FA_create_dir",{"dir_path":"x"}]]' \ +# http://127.0.0.1:9944/actions +``` + +### Retry 與 quota 原語 +```python +from automation_file import retry_on_transient, Quota + +@retry_on_transient(max_attempts=5, backoff_base=0.5) +def flaky_network_call(): ... + +quota = Quota(max_bytes=50 * 1024 * 1024, max_seconds=30.0) +with quota.time_budget("bulk-upload"): + bulk_upload_work() +``` + +### 路徑穿越防護 +```python +from automation_file import safe_join + +target = safe_join("/data/jobs", user_supplied_path) +# 若解析後的路徑跳脫 /data/jobs 會拋出 PathTraversalException。 +``` + +### Cloud / SFTP 後端 +每個後端都會由 `build_default_registry()` 自動註冊,因此 `FA_s3_*`、 +`FA_azure_blob_*`、`FA_dropbox_*`、`FA_sftp_*` 動作開箱即用 — 不需要另外呼叫 +`register_*_ops`。 + +```python +from automation_file import execute_action, s3_instance + +s3_instance.later_init(region_name="us-east-1") + +execute_action([ + ["FA_s3_upload_file", {"local_path": "report.csv", "bucket": "reports", "key": "report.csv"}], +]) +``` + +所有後端(`s3`、`azure_blob`、`dropbox_api`、`sftp`)都對外提供相同的五組 +操作:`upload_file`、`upload_dir`、`download_file`、`delete_*`、`list_*`。 +SFTP 使用 `paramiko.RejectPolicy` — 未知主機會被拒絕,不會自動加入。 + +### 檔案監看觸發 +每當被監看路徑發生檔案系統事件,就執行動作清單: + +```python +from automation_file import watch_start, watch_stop + +watch_start( + name="inbox-sweeper", + path="/data/inbox", + action_list=[["FA_copy_all_file_to_dir", {"source_dir": "/data/inbox", + "target_dir": "/data/processed"}]], + events=["created", "modified"], + recursive=False, +) +# 稍後: +watch_stop("inbox-sweeper") +``` + +`FA_watch_start` / `FA_watch_stop` / `FA_watch_stop_all` / `FA_watch_list` +讓 JSON 動作清單能使用相同的生命週期。 + +### Cron 排程器 +以純標準函式庫的 5 欄位 cron 解析器執行週期性動作清單: + +```python +from automation_file import schedule_add + +schedule_add( + name="nightly-snapshot", + cron_expression="0 2 * * *", # 每天本地時間 02:00 + action_list=[["FA_zip_dir", {"dir_we_want_to_zip": "/data", + "zip_name": "/backup/data_nightly"}]], +) +``` + +支援 `*`、確切值、`a-b` 範圍、逗號清單、`*/n` 步進語法,以及 `jan..dec` / +`sun..sat` 別名。JSON 動作:`FA_schedule_add`、`FA_schedule_remove`、 +`FA_schedule_remove_all`、`FA_schedule_list`。 + +### 傳輸進度 + 取消 +HTTP 與 S3 傳輸支援可選的 `progress_name` 關鍵字參數: + +```python +from automation_file import download_file, progress_cancel + +download_file("https://example.com/big.bin", "big.bin", + progress_name="big-download") + +# 從另一個執行緒或 GUI: +progress_cancel("big-download") +``` + +共用的 `progress_registry` 透過 `progress_list()` 以及 `FA_progress_list` / +`FA_progress_cancel` / `FA_progress_clear` JSON 動作提供即時快照。GUI 的 +**Progress** 分頁每半秒輪詢一次 registry。 + +### 快速檔案搜尋 +若 OS 索引器可用就直接查詢(macOS 的 `mdfind`、Linux 的 `locate` / +`plocate`、Windows 的 Everything `es.exe`),否則退回以串流 `os.scandir` +走訪。不需要額外相依套件。 + +```python +from automation_file import fast_find, scandir_find, has_os_index + +# 可用時使用 OS 索引器,否則退回 scandir。 +results = fast_find("/var/log", "*.log", limit=100) + +# 強制使用可攜路徑(跳過 OS 索引器)。 +results = fast_find("/data", "report_*.csv", use_index=False) + +# 串流 — 不需走訪整棵樹就能提早停止。 +for path in scandir_find("/data", "*.csv"): + if "2026" in path: + break +``` + +`FA_fast_find` 將同一個函式提供給 JSON 動作清單: + +```json +[["FA_fast_find", {"root": "/var/log", "pattern": "*.log", "limit": 50}]] +``` + +### 檢查碼 + 完整性驗證 +串流式處理任何 `hashlib` 演算法;`verify_checksum` 以 `hmac.compare_digest` +(常數時間)比對摘要: + +```python +from automation_file import file_checksum, verify_checksum + +digest = file_checksum("bundle.tar.gz") # 預設為 sha256 +verify_checksum("bundle.tar.gz", digest) # -> True +verify_checksum("bundle.tar.gz", "deadbeef...", algorithm="blake2b") +``` + +同時以 `FA_file_checksum` / `FA_verify_checksum` 提供 JSON 動作。 + +### 可續傳 HTTP 下載 +`download_file(resume=True)` 會寫入 `.part` 並於下次嘗試傳送 +`Range: bytes=-`。搭配 `expected_sha256=` 可在下載完成後立刻驗證完整性: + +```python +from automation_file import download_file + +download_file( + "https://example.com/big.bin", + "big.bin", + resume=True, + expected_sha256="3b0c44298fc1...", +) +``` + +### 重複檔案尋找器 +三階段管線:依大小分桶 → 64 KiB 部分雜湊 → 完整雜湊。大小唯一的檔案完全不會 +被雜湊: + +```python +from automation_file import find_duplicates + +groups = find_duplicates("/data", min_size=1024) +# list[list[str]] — 每個內層 list 是一組相同內容的檔案,以大小遞減排序。 +``` + +`FA_find_duplicates` 以相同呼叫提供給 JSON。 + +### 增量目錄同步 +`sync_dir` 以只複製新增或變更檔案的方式將 `src` 鏡像至 `dst`。變更偵測預設 +為 `(size, mtime)`;當 mtime 不可信時可傳入 `compare="checksum"`。`dst` +下多餘的檔案預設保留 — 傳入 `delete=True` 才會清除(`dry_run=True` 可先 +預覽): + +```python +from automation_file import sync_dir + +summary = sync_dir("/data/src", "/data/dst", delete=True) +# summary: {"copied": [...], "skipped": [...], "deleted": [...], +# "errors": [...], "dry_run": False} +``` + +Symlink 會以 symlink 形式重建而非被跟隨,因此指向樹外的連結不會拖垮鏡像。 +JSON 動作:`FA_sync_dir`。 + +### 目錄 manifest +將樹下每個檔案的檢查碼寫入 JSON manifest,之後再驗證樹是否變動: + +```python +from automation_file import write_manifest, verify_manifest + +write_manifest("/release/payload", "/release/MANIFEST.json") + +# 稍後… +result = verify_manifest("/release/payload", "/release/MANIFEST.json") +if not result["ok"]: + raise SystemExit(f"manifest mismatch: {result}") +``` + +`result` 以 `matched`、`missing`、`modified`、`extra` 分別回報清單。 +多餘檔案(`extra`)不會讓驗證失敗(對齊 `sync_dir` 預設不刪除的行為), +`missing` 與 `modified` 則會。JSON 動作:`FA_write_manifest`、 +`FA_verify_manifest`。 + +### 通知 +透過 webhook、Slack 或 SMTP 推送一次性訊息,或在 trigger / scheduler +失敗時自動通知: + +```python +from automation_file import ( + SlackSink, WebhookSink, EmailSink, + notification_manager, notify_send, +) + +notification_manager.register(SlackSink("https://hooks.slack.com/services/T/B/X")) +notify_send("deploy complete", body="rev abc123", level="info") +``` + +每個 sink 都遵循相同的 `send(subject, body, level)` 合約。Fanout 的 +`NotificationManager` 會進行個別 sink 錯誤隔離(一個壞掉的 sink 不會影響 +其他 sink)、滑動視窗去重(避免卡住的 trigger 洗版),以及對每個 +webhook / Slack URL 做 SSRF 驗證。Scheduler 與 trigger 派送器在失敗時 +會以 `level="error"` 自動通知 — 只要註冊 sink 就能取得生產環境告警。 +JSON 動作:`FA_notify_send`、`FA_notify_list`。 + +### 設定檔與秘密提供者 +在 `automation_file.toml` 一次宣告 sink 與預設值。秘密參考在載入時由 +環境變數或檔案根目錄(Docker / K8s 風格)解析: + +```toml +# automation_file.toml + +[secrets] +file_root = "/run/secrets" + +[defaults] +dedup_seconds = 120 + +[[notify.sinks]] +type = "slack" +name = "team-alerts" +webhook_url = "${env:SLACK_WEBHOOK}" + +[[notify.sinks]] +type = "email" +name = "ops-email" +host = "smtp.example.com" +port = 587 +sender = "alerts@example.com" +recipients = ["ops@example.com"] +username = "${env:SMTP_USER}" +password = "${file:smtp_password}" +``` + +```python +from automation_file import AutomationConfig, notification_manager + +config = AutomationConfig.load("automation_file.toml") +config.apply_to(notification_manager) +``` + +未解析的 `${…}` 參考會拋出 `SecretNotFoundException`,而非默默變成空字串。 +可組合 `ChainedSecretProvider` / `EnvSecretProvider` / +`FileSecretProvider` 建立自訂提供者鏈,並以 +`AutomationConfig.load(path, provider=…)` 傳入。 + +### 動作清單變數替換 +以 `substitute=True` 啟用後,`${…}` 參考會在派送時展開: + +```python +from automation_file import execute_action + +execute_action( + [["FA_create_file", {"file_path": "reports/${date:%Y-%m-%d}/${uuid}.txt"}]], + substitute=True, +) +``` + +支援 `${env:VAR}`、`${date:FMT}`(strftime)、`${uuid}`、`${cwd}`。未知名稱 +會拋出 `SubstitutionException`,不會默默變成空字串。 + +### 條件式執行 +只在路徑守護通過時執行巢狀動作清單: + +```json +[ + ["FA_if_exists", {"path": "/data/in/job.json", + "then": [["FA_copy_file", {"source": "/data/in/job.json", + "target": "/data/processed/job.json"}]]}], + ["FA_if_newer", {"source": "/src", "target": "/dst", + "then": [["FA_sync_dir", {"src": "/src", "dst": "/dst"}]]}], + ["FA_if_size_gt", {"path": "/logs/app.log", "size": 10485760, + "then": [["FA_run_shell", {"command": ["logrotate", "/logs/app.log"]}]]}] +] +``` + +### SQLite 稽核日誌 +`AuditLog` 以短連線 + 模組層級 lock 為每個動作寫入一筆紀錄: + +```python +from automation_file import AuditLog + +audit = AuditLog("audit.sqlite3") +audit.record(action="FA_copy_file", actor="ops", + status="ok", duration_ms=12, detail={"src": "a", "dst": "b"}) + +for row in audit.recent(limit=50): + print(row["timestamp"], row["action"], row["status"]) +``` + +### 檔案完整性監控 +依 manifest 輪詢整棵樹,偵測到 drift 時觸發 callback + 通知: + +```python +from automation_file import IntegrityMonitor, notification_manager, write_manifest + +write_manifest("/srv/site", "/srv/MANIFEST.json") + +mon = IntegrityMonitor( + root="/srv/site", + manifest_path="/srv/MANIFEST.json", + interval=60.0, + manager=notification_manager, + on_drift=lambda summary: print("drift:", summary), +) +mon.start() +``` + +載入 manifest 時的錯誤也會被視為 drift,讓竄改與設定問題走同一條處理 +路徑。 + +### AES-256-GCM 檔案加密 +具驗證的加密與自述式封包格式。可由密碼衍生金鑰或直接產生金鑰: + +```python +from automation_file import encrypt_file, decrypt_file, key_from_password + +key = key_from_password("correct horse battery staple", salt=b"app-salt-v1") +encrypt_file("secret.pdf", "secret.pdf.enc", key, associated_data=b"v1") +decrypt_file("secret.pdf.enc", "secret.pdf", key, associated_data=b"v1") +``` + +竄改由 GCM 驗證 tag 偵測,以 `CryptoException("authentication failed")` +回報。JSON 動作:`FA_encrypt_file`、`FA_decrypt_file`。 + +### HTTPActionClient Python SDK +HTTP 動作伺服器的型別化客戶端;預設強制 loopback,並自動附帶 shared +secret: + +```python +from automation_file import HTTPActionClient + +with HTTPActionClient("http://127.0.0.1:9944", shared_secret="s3cr3t") as client: + client.ping() # OPTIONS /actions + result = client.execute([["FA_create_dir", {"dir_path": "x"}]]) +``` + +驗證失敗會轉成 `HTTPActionClientException(kind="unauthorized")`; +404 則表示伺服器存在但未對外提供 `/actions`。 + +### Prometheus metrics 匯出器 +`ActionExecutor` 為每個動作記錄一筆計數器與一筆直方圖樣本。在 loopback +`/metrics` 端點提供: + +```python +from automation_file import start_metrics_server + +server = start_metrics_server(host="127.0.0.1", port=9945) +# curl http://127.0.0.1:9945/metrics +``` + +匯出 `automation_file_actions_total{action,status}` 以及 +`automation_file_action_duration_seconds{action}`。若要綁定非 loopback +位址必須明確傳入 `allow_non_loopback=True`。 + +### DAG 動作執行器 +依相依關係執行動作;獨立分支會透過執行緒池平行展開。每個節點形式為 +`{"id": ..., "action": [...], "depends_on": [...]}`: + +```python +from automation_file import execute_action_dag + +execute_action_dag([ + {"id": "fetch", "action": ["FA_download_file", + ["https://example.com/src.tar.gz", "src.tar.gz"]]}, + {"id": "verify", "action": ["FA_verify_checksum", + ["src.tar.gz", "3b0c44298fc1..."]], + "depends_on": ["fetch"]}, + {"id": "unpack", "action": ["FA_unzip_file", ["src.tar.gz", "src"]], + "depends_on": ["verify"]}, +]) +``` + +若 `verify` 拋出例外,預設情況下 `unpack` 會被標記為 `skipped`。傳入 +`fail_fast=False` 可讓後代節點仍然執行。JSON 動作:`FA_execute_action_dag`。 + +### Entry-point 外掛 +第三方套件以 `pyproject.toml` 宣告動作: + +```toml +[project.entry-points."automation_file.actions"] +my_plugin = "my_plugin:register" +``` + +其中 `register` 是一個零參數的可呼叫物件,回傳 `dict[str, Callable]`。只要 +套件被安裝於同一個虛擬環境,這些指令就會出現在每次新建的 registry: + +```python +# my_plugin/__init__.py +def greet(name: str) -> str: + return f"hello {name}" + +def register() -> dict: + return {"FA_greet": greet} +``` + +```python +# 執行 `pip install my_plugin` 之後 +from automation_file import execute_action +execute_action([["FA_greet", {"name": "world"}]]) +``` + +外掛失敗(匯入錯誤、factory 例外、回傳型別不正確、registry 拒絕)都會被記錄 +並吞掉 — 一個壞掉的外掛不會影響整個函式庫。 + +### GUI +```bash +python -m automation_file ui # 或:python main_ui.py +``` + +```python +from automation_file import launch_ui +launch_ui() +``` + +分頁:Home、Local、Transfer、Progress、JSON actions、Triggers、Scheduler、 +Servers。底部常駐的 log 面板即時串流每一筆結果與錯誤。 + +### 以 executor 為核心建立專案鷹架 +```python +from automation_file import create_project_dir + +create_project_dir("my_workflow") +``` + +## CLI + +```bash +# 子指令(一次性操作) +python -m automation_file ui +python -m automation_file zip ./src out.zip --dir +python -m automation_file unzip out.zip ./restored +python -m automation_file download https://example.com/file.bin file.bin +python -m automation_file create-file hello.txt --content "hi" +python -m automation_file server --host 127.0.0.1 --port 9943 +python -m automation_file http-server --host 127.0.0.1 --port 9944 +python -m automation_file drive-upload my.txt --token token.json --credentials creds.json + +# 舊式旗標(JSON 動作清單) +python -m automation_file --execute_file actions.json +python -m automation_file --execute_dir ./actions/ +python -m automation_file --execute_str '[["FA_create_dir",{"dir_path":"x"}]]' +python -m automation_file --create_project ./my_project +``` + +## JSON 動作格式 + +每一項動作可以是單純的指令名稱、`[name, kwargs]` 組合,或 `[name, args]` +清單: + +```json +[ + ["FA_create_file", {"file_path": "test.txt"}], + ["FA_drive_upload_to_drive", {"file_path": "test.txt"}], + ["FA_drive_search_all_file"] +] +``` + +## 文件 + +完整 API 文件位於 `docs/`,可用 Sphinx 產生: + +```bash +pip install -r docs/requirements.txt +sphinx-build -b html docs/source docs/_build/html +``` + +架構筆記、程式碼慣例與安全考量請參見 [`CLAUDE.md`](CLAUDE.md)。 diff --git a/automation_file/__init__.py b/automation_file/__init__.py index 7419549..3a620d1 100644 --- a/automation_file/__init__.py +++ b/automation_file/__init__.py @@ -9,6 +9,7 @@ from typing import TYPE_CHECKING, Any +from automation_file.client import HTTPActionClient, HTTPActionClientException from automation_file.core.action_executor import ( ActionExecutor, add_command_to_executor, @@ -19,11 +20,54 @@ validate_action, ) from automation_file.core.action_registry import ActionRegistry, build_default_registry +from automation_file.core.audit import AuditException, AuditLog from automation_file.core.callback_executor import CallbackExecutor +from automation_file.core.checksum import ( + ChecksumMismatchException, + file_checksum, + verify_checksum, +) +from automation_file.core.config import AutomationConfig, ConfigException +from automation_file.core.config_watcher import ConfigWatcher +from automation_file.core.crypto import ( + CryptoException, + decrypt_file, + encrypt_file, + generate_key, + key_from_password, +) +from automation_file.core.dag_executor import execute_action_dag +from automation_file.core.fim import IntegrityMonitor from automation_file.core.json_store import read_action_json, write_action_json +from automation_file.core.manifest import ManifestException, verify_manifest, write_manifest +from automation_file.core.metrics import ACTION_COUNT, ACTION_DURATION, record_action +from automation_file.core.metrics import render as render_metrics from automation_file.core.package_loader import PackageLoader +from automation_file.core.progress import ( + CancellationToken, + CancelledException, + ProgressRegistry, + ProgressReporter, + progress_cancel, + progress_clear, + progress_list, + progress_registry, + register_progress_ops, +) from automation_file.core.quota import Quota from automation_file.core.retry import retry_on_transient +from automation_file.core.secrets import ( + ChainedSecretProvider, + EnvSecretProvider, + FileSecretProvider, + SecretException, + SecretNotFoundException, + SecretProvider, + default_provider, + resolve_secret_refs, +) +from automation_file.core.substitution import SubstitutionException, substitute +from automation_file.local.conditional import if_exists, if_newer, if_size_gt from automation_file.local.dir_ops import copy_dir, create_dir, remove_dir_tree, rename_dir from automation_file.local.file_ops import ( copy_all_file_to_dir, @@ -33,7 +77,16 @@ remove_file, rename_file, ) +from automation_file.local.json_edit import ( + JsonEditException, + json_delete, + json_get, + json_set, +) from automation_file.local.safe_paths import is_within, safe_join +from automation_file.local.shell_ops import ShellException, run_shell +from automation_file.local.sync_ops import SyncException, sync_dir +from automation_file.local.tar_ops import TarException, create_tar, extract_tar from automation_file.local.zip_ops import ( read_zip_file, set_zip_password, @@ -44,17 +97,40 @@ zip_file_info, zip_info, ) +from automation_file.notify import ( + DiscordSink, + EmailSink, + NotificationException, + NotificationManager, + NotificationSink, + PagerDutySink, + SlackSink, + TeamsSink, + TelegramSink, + WebhookSink, + notification_manager, + notify_send, + register_notify_ops, +) from automation_file.project.project_builder import ProjectBuilder, create_project_dir from automation_file.remote.azure_blob import ( AzureBlobClient, azure_blob_instance, register_azure_blob_ops, ) +from automation_file.remote.cross_backend import CrossBackendException, copy_between from automation_file.remote.dropbox_api import ( DropboxClient, dropbox_instance, register_dropbox_ops, ) +from automation_file.remote.ftp import ( + FTPClient, + FTPConnectOptions, + FTPException, + ftp_instance, + register_ftp_ops, +) from automation_file.remote.google_drive.client import GoogleDriveClient, driver_instance from automation_file.remote.google_drive.delete_ops import drive_delete_file from automation_file.remote.google_drive.download_ops import ( @@ -82,12 +158,39 @@ from automation_file.remote.s3 import S3Client, register_s3_ops, s3_instance from automation_file.remote.sftp import SFTPClient, register_sftp_ops, sftp_instance from automation_file.remote.url_validator import validate_http_url +from automation_file.scheduler import ( + CronExpression, + ScheduledJob, + Scheduler, + register_scheduler_ops, + schedule_add, + schedule_list, + schedule_remove, + schedule_remove_all, + scheduler, +) +from automation_file.server.action_acl import ActionACL, ActionNotPermittedException from automation_file.server.http_server import HTTPActionServer, start_http_action_server +from automation_file.server.metrics_server import MetricsServer, start_metrics_server from automation_file.server.tcp_server import ( TCPActionServer, start_autocontrol_socket_server, ) +from automation_file.trigger import ( + FileWatcher, + TriggerManager, + register_trigger_ops, + trigger_manager, + watch_list, + watch_start, + watch_stop, + watch_stop_all, +) +from automation_file.utils.deduplicate import find_duplicates +from automation_file.utils.fast_find import fast_find, has_os_index, scandir_find from automation_file.utils.file_discovery import get_dir_files_as_list +from automation_file.utils.grep import GrepException, grep_files, iter_grep +from automation_file.utils.rotate import RotateException, rotate_backups if TYPE_CHECKING: from automation_file.ui.launcher import ( @@ -117,9 +220,12 @@ def __getattr__(name: str) -> Any: "build_default_registry", "execute_action", "execute_action_parallel", + "execute_action_dag", "execute_files", "validate_action", "retry_on_transient", + "substitute", + "SubstitutionException", "add_command_to_executor", "read_action_json", "write_action_json", @@ -137,6 +243,17 @@ def __getattr__(name: str) -> Any: "create_dir", "remove_dir_tree", "rename_dir", + "sync_dir", + "SyncException", + "create_tar", + "extract_tar", + "TarException", + "run_shell", + "ShellException", + "json_get", + "json_set", + "json_delete", + "JsonEditException", "zip_dir", "zip_file", "zip_info", @@ -147,6 +264,9 @@ def __getattr__(name: str) -> Any: "unzip_all", "safe_join", "is_within", + "if_exists", + "if_newer", + "if_size_gt", # Remote "download_file", "validate_http_url", @@ -178,14 +298,109 @@ def __getattr__(name: str) -> Any: "SFTPClient", "sftp_instance", "register_sftp_ops", + "FTPClient", + "FTPConnectOptions", + "FTPException", + "ftp_instance", + "register_ftp_ops", + "CrossBackendException", + "copy_between", # Server / Project / Utils "TCPActionServer", "start_autocontrol_socket_server", "HTTPActionServer", "start_http_action_server", + "HTTPActionClient", + "HTTPActionClientException", + "ActionACL", + "ActionNotPermittedException", "ProjectBuilder", "create_project_dir", "get_dir_files_as_list", + "fast_find", + "scandir_find", + "has_os_index", + "file_checksum", + "verify_checksum", + "ChecksumMismatchException", + "find_duplicates", + "grep_files", + "iter_grep", + "GrepException", + "rotate_backups", + "RotateException", + "ManifestException", + "write_manifest", + "verify_manifest", + "AuditException", + "AuditLog", + "IntegrityMonitor", + "CryptoException", + "encrypt_file", + "decrypt_file", + "generate_key", + "key_from_password", + "ACTION_COUNT", + "ACTION_DURATION", + "record_action", + "render_metrics", + "MetricsServer", + "start_metrics_server", + # Triggers + "FileWatcher", + "TriggerManager", + "register_trigger_ops", + "trigger_manager", + "watch_start", + "watch_stop", + "watch_stop_all", + "watch_list", + # Scheduler + "CronExpression", + "ScheduledJob", + "Scheduler", + "register_scheduler_ops", + "schedule_add", + "schedule_list", + "schedule_remove", + "schedule_remove_all", + "scheduler", + # Progress / cancellation + "CancellationToken", + "CancelledException", + "ProgressRegistry", + "ProgressReporter", + "progress_cancel", + "progress_clear", + "progress_list", + "progress_registry", + "register_progress_ops", + # Notifications + "DiscordSink", + "EmailSink", + "NotificationException", + "NotificationManager", + "NotificationSink", + "PagerDutySink", + "SlackSink", + "TeamsSink", + "TelegramSink", + "WebhookSink", + "notification_manager", + "notify_send", + "register_notify_ops", + # Config / secrets + "AutomationConfig", + "ConfigException", + "ConfigWatcher", + "ChainedSecretProvider", + "EnvSecretProvider", + "FileSecretProvider", + "SecretException", + "SecretNotFoundException", + "SecretProvider", + "default_provider", + "resolve_secret_refs", # UI (lazy-loaded) "launch_ui", ] diff --git a/automation_file/client/__init__.py b/automation_file/client/__init__.py new file mode 100644 index 0000000..3678359 --- /dev/null +++ b/automation_file/client/__init__.py @@ -0,0 +1,7 @@ +"""Client SDK for talking to a running ``HTTPActionServer``.""" + +from __future__ import annotations + +from automation_file.client.http_client import HTTPActionClient, HTTPActionClientException + +__all__ = ["HTTPActionClient", "HTTPActionClientException"] diff --git a/automation_file/client/http_client.py b/automation_file/client/http_client.py new file mode 100644 index 0000000..d5a56b2 --- /dev/null +++ b/automation_file/client/http_client.py @@ -0,0 +1,132 @@ +"""Python SDK for :class:`~automation_file.server.http_server.HTTPActionServer`. + +``HTTPActionClient(base_url, *, shared_secret=None)`` wraps a single +``requests.Session`` and exposes ``execute(actions)`` which POSTs the JSON +action list to ``/actions``. The client is intentionally thin: +it handles auth header assembly, response-code checking, and error +translation, but makes no attempt to mirror ``ActionExecutor``'s API +surface — callers pass the same action-list shape they would pass to +``execute_action``. +""" + +from __future__ import annotations + +from types import TracebackType +from typing import Any + +import requests + +from automation_file.exceptions import FileAutomationException +from automation_file.logging_config import file_automation_logger +from automation_file.remote.url_validator import validate_http_url + +_DEFAULT_TIMEOUT = 30.0 +_ACTIONS_PATH = "/actions" + + +class HTTPActionClientException(FileAutomationException): + """Raised when the server rejects a request or the response is malformed.""" + + +class HTTPActionClient: + """Synchronous SDK for a running :class:`HTTPActionServer`.""" + + def __init__( + self, + base_url: str, + *, + shared_secret: str | None = None, + timeout: float = _DEFAULT_TIMEOUT, + verify_loopback: bool = False, + ) -> None: + stripped = base_url.rstrip("/") + if not stripped: + raise HTTPActionClientException("base_url must be non-empty") + if verify_loopback: + validate_http_url(stripped) + self._base_url = stripped + self._shared_secret = shared_secret + self._timeout = float(timeout) + self._session = requests.Session() + + @property + def base_url(self) -> str: + return self._base_url + + def execute(self, actions: list | dict) -> Any: + """POST ``actions`` to ``/actions`` and return the decoded JSON body.""" + if not isinstance(actions, (list, dict)): + raise HTTPActionClientException( + f"actions must be list or dict, got {type(actions).__name__}" + ) + url = f"{self._base_url}{_ACTIONS_PATH}" + headers = {"Content-Type": "application/json"} + if self._shared_secret: + headers["Authorization"] = f"Bearer {self._shared_secret}" + try: + response = self._session.post( + url, + json=actions, + headers=headers, + timeout=self._timeout, + allow_redirects=False, + ) + except requests.RequestException as err: + raise HTTPActionClientException(f"request to {url} failed: {err}") from err + return _decode_response(response) + + def ping(self) -> bool: + """Best-effort reachability probe — returns True if the server responds.""" + url = f"{self._base_url}{_ACTIONS_PATH}" + try: + response = self._session.request( + "OPTIONS", url, timeout=min(self._timeout, 5.0), allow_redirects=False + ) + except requests.RequestException: + return False + # The server only handles POST /actions; OPTIONS yields 501 which + # still proves it's reachable. 401/403 also prove reachability. + return response.status_code < 500 or response.status_code == 501 + + def close(self) -> None: + self._session.close() + + def __enter__(self) -> HTTPActionClient: + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc: BaseException | None, + tb: TracebackType | None, + ) -> None: + self.close() + + +def _decode_response(response: requests.Response) -> Any: + status = response.status_code + if status == 401: + raise HTTPActionClientException("unauthorized: missing or invalid shared secret") + if status == 403: + body = _safe_body(response) + raise HTTPActionClientException(f"forbidden: {body}") + if status == 404: + raise HTTPActionClientException("server does not expose /actions") + if status >= 400: + body = _safe_body(response) + raise HTTPActionClientException(f"server returned HTTP {status}: {body}") + try: + return response.json() + except ValueError as err: + file_automation_logger.error("http_client: bad JSON response: %r", err) + raise HTTPActionClientException(f"server returned invalid JSON: {err}") from err + + +def _safe_body(response: requests.Response) -> str: + try: + data = response.json() + except ValueError: + return response.text[:200] + if isinstance(data, dict) and "error" in data: + return str(data["error"]) + return str(data)[:200] diff --git a/automation_file/core/action_executor.py b/automation_file/core/action_executor.py index b7c7972..f9aadad 100644 --- a/automation_file/core/action_executor.py +++ b/automation_file/core/action_executor.py @@ -15,12 +15,15 @@ from __future__ import annotations +import time from collections.abc import Mapping from concurrent.futures import ThreadPoolExecutor from typing import Any from automation_file.core.action_registry import ActionRegistry, build_default_registry from automation_file.core.json_store import read_action_json +from automation_file.core.metrics import record_action +from automation_file.core.substitution import substitute as substitute_payload from automation_file.exceptions import ExecuteActionException, ValidationException from automation_file.logging_config import file_automation_logger @@ -95,14 +98,19 @@ def execute_action( action_list: list | Mapping[str, Any], dry_run: bool = False, validate_first: bool = False, + substitute: bool = False, ) -> dict[str, Any]: """Execute every action; return ``{"execute: ": result|repr(error)}``. ``dry_run=True`` logs and records the resolved name without invoking the command. ``validate_first=True`` runs :meth:`validate` before touching - any action so a typo aborts the whole batch up-front. + any action so a typo aborts the whole batch up-front. ``substitute=True`` + expands ``${env:...}`` / ``${date:...}`` / ``${uuid}`` / ``${cwd}`` + placeholders inside every string in the payload. """ actions = self._coerce(action_list) + if substitute: + actions = substitute_payload(actions) # type: ignore[assignment] if validate_first: self.validate(actions) results: dict[str, Any] = {} @@ -144,20 +152,15 @@ def add_command_to_executor(self, command_dict: Mapping[str, Any]) -> None: # Internals --------------------------------------------------------- def _run_one(self, action: list, dry_run: bool) -> Any: + name = _safe_action_name(action) + if dry_run: + return self._run_dry(action) + started = time.monotonic() + ok = False try: - if dry_run: - name, kind, payload = self._parse_action(action) - if self.registry.resolve(name) is None: - raise ExecuteActionException(f"unknown action: {name!r}") - file_automation_logger.info( - "dry_run: %s kind=%s payload=%r", - name, - kind, - payload, - ) - return f"dry_run:{name}" value = self._execute_event(action) file_automation_logger.info("execute_action: %s", action) + ok = True return value except ExecuteActionException as error: file_automation_logger.error("execute_action malformed: %r", error) @@ -165,6 +168,24 @@ def _run_one(self, action: list, dry_run: bool) -> Any: except Exception as error: # pylint: disable=broad-except file_automation_logger.error("execute_action runtime error: %r", error) return repr(error) + finally: + record_action(name, time.monotonic() - started, ok) + + def _run_dry(self, action: list) -> Any: + try: + name, kind, payload = self._parse_action(action) + if self.registry.resolve(name) is None: + raise ExecuteActionException(f"unknown action: {name!r}") + except ExecuteActionException as error: + file_automation_logger.error("execute_action malformed: %r", error) + return repr(error) + file_automation_logger.info( + "dry_run: %s kind=%s payload=%r", + name, + kind, + payload, + ) + return f"dry_run:{name}" @staticmethod def _coerce(action_list: list | Mapping[str, Any]) -> list: @@ -182,6 +203,12 @@ def _coerce(action_list: list | Mapping[str, Any]) -> list: return action_list +def _safe_action_name(action: Any) -> str: + if isinstance(action, list) and action and isinstance(action[0], str): + return action[0] + return "unknown" + + # Default shared executor — built once, mutated in place by plugins. executor: ActionExecutor = ActionExecutor() @@ -190,12 +217,14 @@ def execute_action( action_list: list | Mapping[str, Any], dry_run: bool = False, validate_first: bool = False, + substitute: bool = False, ) -> dict[str, Any]: """Module-level shim that delegates to the shared executor.""" return executor.execute_action( action_list, dry_run=dry_run, validate_first=validate_first, + substitute=substitute, ) diff --git a/automation_file/core/action_registry.py b/automation_file/core/action_registry.py index 26d882a..bc11948 100644 --- a/automation_file/core/action_registry.py +++ b/automation_file/core/action_registry.py @@ -66,7 +66,16 @@ def event_dict(self) -> dict[str, Command]: def _local_commands() -> dict[str, Command]: - from automation_file.local import dir_ops, file_ops, zip_ops + from automation_file.local import ( + conditional, + dir_ops, + file_ops, + json_edit, + shell_ops, + sync_ops, + tar_ops, + zip_ops, + ) return { # Files @@ -81,6 +90,16 @@ def _local_commands() -> dict[str, Command]: "FA_create_dir": dir_ops.create_dir, "FA_remove_dir_tree": dir_ops.remove_dir_tree, "FA_rename_dir": dir_ops.rename_dir, + "FA_sync_dir": sync_ops.sync_dir, + # Shell + "FA_run_shell": shell_ops.run_shell, + # JSON edit + "FA_json_get": json_edit.json_get, + "FA_json_set": json_edit.json_set, + "FA_json_delete": json_edit.json_delete, + # Tar + "FA_create_tar": tar_ops.create_tar, + "FA_extract_tar": tar_ops.extract_tar, # Zip "FA_zip_dir": zip_ops.zip_dir, "FA_zip_file": zip_ops.zip_file, @@ -90,6 +109,10 @@ def _local_commands() -> dict[str, Command]: "FA_unzip_file": zip_ops.unzip_file, "FA_read_zip_file": zip_ops.read_zip_file, "FA_unzip_all": zip_ops.unzip_all, + # Conditional dispatch + "FA_if_exists": conditional.if_exists, + "FA_if_newer": conditional.if_newer, + "FA_if_size_gt": conditional.if_size_gt, } @@ -129,9 +152,42 @@ def _http_commands() -> dict[str, Command]: return {"FA_download_file": http_download.download_file} +def _utils_commands() -> dict[str, Command]: + from automation_file.core import checksum, crypto, manifest + from automation_file.remote import cross_backend + from automation_file.utils import deduplicate, fast_find, grep, rotate + + return { + "FA_fast_find": fast_find.fast_find, + "FA_file_checksum": checksum.file_checksum, + "FA_verify_checksum": checksum.verify_checksum, + "FA_find_duplicates": deduplicate.find_duplicates, + "FA_execute_action_dag": _lazy_execute_action_dag, + "FA_write_manifest": manifest.write_manifest, + "FA_verify_manifest": manifest.verify_manifest, + "FA_grep": grep.grep_files, + "FA_rotate_backups": rotate.rotate_backups, + "FA_copy_between": cross_backend.copy_between, + "FA_encrypt_file": crypto.encrypt_file, + "FA_decrypt_file": crypto.decrypt_file, + } + + +def _lazy_execute_action_dag( + nodes: list, + max_workers: int = 4, + fail_fast: bool = True, +) -> dict[str, Any]: + """Deferred import shim so the registry module doesn't depend on the DAG executor.""" + from automation_file.core.dag_executor import execute_action_dag + + return execute_action_dag(nodes, max_workers=max_workers, fail_fast=fail_fast) + + def _register_cloud_backends(registry: ActionRegistry) -> None: from automation_file.remote.azure_blob import register_azure_blob_ops from automation_file.remote.dropbox_api import register_dropbox_ops + from automation_file.remote.ftp import register_ftp_ops from automation_file.remote.s3 import register_s3_ops from automation_file.remote.sftp import register_sftp_ops @@ -139,16 +195,58 @@ def _register_cloud_backends(registry: ActionRegistry) -> None: register_azure_blob_ops(registry) register_dropbox_ops(registry) register_sftp_ops(registry) + register_ftp_ops(registry) + + +def _register_trigger_ops(registry: ActionRegistry) -> None: + from automation_file.trigger import register_trigger_ops + + register_trigger_ops(registry) + + +def _register_scheduler_ops(registry: ActionRegistry) -> None: + from automation_file.scheduler import register_scheduler_ops + + register_scheduler_ops(registry) + + +def _register_progress_ops(registry: ActionRegistry) -> None: + from automation_file.core.progress import register_progress_ops + + register_progress_ops(registry) + + +def _register_notify_ops(registry: ActionRegistry) -> None: + from automation_file.notify import register_notify_ops + + register_notify_ops(registry) def build_default_registry() -> ActionRegistry: - """Return a registry pre-populated with every built-in ``FA_*`` action.""" + """Return a registry pre-populated with every built-in ``FA_*`` action. + + After the built-ins are registered, any third-party package advertising + an ``automation_file.actions`` entry point is loaded so its commands + land in the same registry. Plugins may override built-in names. + """ registry = ActionRegistry() registry.register_many(_local_commands()) registry.register_many(_http_commands()) + registry.register_many(_utils_commands()) registry.register_many(_drive_commands()) _register_cloud_backends(registry) + _register_trigger_ops(registry) + _register_scheduler_ops(registry) + _register_progress_ops(registry) + _register_notify_ops(registry) + _load_plugins(registry) file_automation_logger.info( "action_registry: built default registry with %d commands", len(registry) ) return registry + + +def _load_plugins(registry: ActionRegistry) -> None: + from automation_file.core.plugins import load_entry_point_plugins + + load_entry_point_plugins(registry.register_many) diff --git a/automation_file/core/audit.py b/automation_file/core/audit.py new file mode 100644 index 0000000..ce573e1 --- /dev/null +++ b/automation_file/core/audit.py @@ -0,0 +1,140 @@ +"""SQLite-backed audit log for executed actions. + +``AuditLog(db_path)`` opens (or creates) a single-table SQLite database and +appends one row per action execution. Rows carry the timestamp, action name, +a JSON-encoded snapshot of the payload, the result / error repr, and the +duration in milliseconds. + +Writes use a short-lived connection per call (``check_same_thread=False`` +semantics) so the log is safe to share between background worker threads +and the scheduler. Readers call :meth:`AuditLog.recent` to pull the most +recent N rows. + +The module deliberately avoids buffering / background queues: every row is +persisted synchronously with an ``INSERT`` inside a ``with connect(..)`` so +a crash at most loses the currently-executing action. +""" + +from __future__ import annotations + +import json +import sqlite3 +import threading +import time +from contextlib import closing +from pathlib import Path +from typing import Any + +from automation_file.exceptions import FileAutomationException +from automation_file.logging_config import file_automation_logger + +_SCHEMA = """ +CREATE TABLE IF NOT EXISTS audit ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts REAL NOT NULL, + action TEXT NOT NULL, + payload TEXT NOT NULL, + result TEXT, + error TEXT, + duration_ms REAL NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_audit_ts ON audit (ts DESC); +""" + + +class AuditException(FileAutomationException): + """Raised when the audit log cannot be opened or written.""" + + +class AuditLog: + """Synchronous SQLite audit log.""" + + def __init__(self, db_path: str | Path) -> None: + self._db_path = Path(db_path) + self._lock = threading.Lock() + try: + self._db_path.parent.mkdir(parents=True, exist_ok=True) + with closing(self._connect()) as conn: + conn.executescript(_SCHEMA) + conn.commit() + except (OSError, sqlite3.DatabaseError) as err: + raise AuditException(f"cannot open audit log {self._db_path}: {err}") from err + + def record( + self, + action: str, + payload: Any, + *, + result: Any = None, + error: BaseException | None = None, + duration_ms: float = 0.0, + ) -> None: + """Append a single audit row. Never raises — failures are logged only.""" + row = ( + time.time(), + action, + _safe_json(payload), + _safe_json(result) if result is not None else None, + repr(error) if error is not None else None, + float(duration_ms), + ) + try: + with self._lock, closing(self._connect()) as conn: + conn.execute( + "INSERT INTO audit (ts, action, payload, result, error, duration_ms)" + " VALUES (?, ?, ?, ?, ?, ?)", + row, + ) + conn.commit() + except sqlite3.DatabaseError as err: + file_automation_logger.error("audit.record failed: %r", err) + + def recent(self, limit: int = 100) -> list[dict[str, Any]]: + """Return the newest ``limit`` rows, newest first.""" + if limit <= 0: + return [] + with closing(self._connect()) as conn: + cursor = conn.execute( + "SELECT id, ts, action, payload, result, error, duration_ms" + " FROM audit ORDER BY ts DESC LIMIT ?", + (limit,), + ) + rows = cursor.fetchall() + return [ + { + "id": row[0], + "ts": row[1], + "action": row[2], + "payload": json.loads(row[3]) if row[3] else None, + "result": json.loads(row[4]) if row[4] else None, + "error": row[5], + "duration_ms": row[6], + } + for row in rows + ] + + def count(self) -> int: + with closing(self._connect()) as conn: + cursor = conn.execute("SELECT COUNT(*) FROM audit") + (total,) = cursor.fetchone() + return int(total) + + def purge(self, older_than_seconds: float) -> int: + """Delete rows older than ``older_than_seconds`` and return the row count.""" + if older_than_seconds <= 0: + raise AuditException("older_than_seconds must be positive") + cutoff = time.time() - older_than_seconds + with self._lock, closing(self._connect()) as conn: + cursor = conn.execute("DELETE FROM audit WHERE ts < ?", (cutoff,)) + conn.commit() + return int(cursor.rowcount) + + def _connect(self) -> sqlite3.Connection: + return sqlite3.connect(self._db_path, timeout=5.0) + + +def _safe_json(value: Any) -> str: + try: + return json.dumps(value, default=repr, ensure_ascii=False) + except (TypeError, ValueError): + return json.dumps(repr(value), ensure_ascii=False) diff --git a/automation_file/core/checksum.py b/automation_file/core/checksum.py new file mode 100644 index 0000000..69b5080 --- /dev/null +++ b/automation_file/core/checksum.py @@ -0,0 +1,72 @@ +"""File checksum + integrity verification helpers. + +Streaming hashes so multi-GB files don't blow up memory. The hash algorithm +is whatever :func:`hashlib.new` understands (``sha256``, ``sha1``, ``md5``, +``blake2b``, ...). :func:`file_checksum` returns the hex digest; +:func:`verify_checksum` does a constant-time compare against the expected +value so callers can't leak timing on the check. +""" + +from __future__ import annotations + +import hashlib +import hmac +import os +from pathlib import Path + +from automation_file.exceptions import FileAutomationException, FileNotExistsException +from automation_file.logging_config import file_automation_logger + +__all__ = [ + "ChecksumMismatchException", + "file_checksum", + "verify_checksum", +] + +_DEFAULT_ALGO = "sha256" +_DEFAULT_CHUNK = 1024 * 1024 # 1 MiB + + +class ChecksumMismatchException(FileAutomationException): + """Raised when a computed digest does not match the expected value.""" + + +def file_checksum( + path: str | os.PathLike[str], + algorithm: str = _DEFAULT_ALGO, + chunk_size: int = _DEFAULT_CHUNK, +) -> str: + """Return the hex digest of ``path`` under ``algorithm``. + + Reads the file in ``chunk_size`` blocks so the memory cost is bounded + regardless of file size. Raises :class:`FileNotExistsException` when the + path is missing and :class:`ValueError` for unknown algorithms. + """ + target = Path(path) + if not target.is_file(): + raise FileNotExistsException(f"checksum target missing: {target}") + try: + digest = hashlib.new(algorithm) + except ValueError as err: + raise ValueError(f"unknown hash algorithm: {algorithm!r}") from err + with target.open("rb") as handle: + for block in iter(lambda: handle.read(chunk_size), b""): + digest.update(block) + return digest.hexdigest() + + +def verify_checksum( + path: str | os.PathLike[str], + expected: str, + algorithm: str = _DEFAULT_ALGO, + chunk_size: int = _DEFAULT_CHUNK, +) -> bool: + """Return True iff ``path``'s digest matches ``expected`` (case-insensitive). + + Uses :func:`hmac.compare_digest` so the match check is constant-time. + """ + actual = file_checksum(path, algorithm=algorithm, chunk_size=chunk_size) + matched = hmac.compare_digest(actual.lower(), expected.lower()) + if not matched: + file_automation_logger.warning("verify_checksum mismatch: %s algo=%s", path, algorithm) + return matched diff --git a/automation_file/core/config.py b/automation_file/core/config.py new file mode 100644 index 0000000..6b2b1f6 --- /dev/null +++ b/automation_file/core/config.py @@ -0,0 +1,202 @@ +"""TOML-based configuration for automation_file. + +Callers describe notification sinks, secret-provider roots, and scheduler +defaults in a single ``automation_file.toml`` file. :class:`AutomationConfig` +loads it, resolves ``${env:…}`` / ``${file:…}`` references via the secret +provider chain, and exposes helpers to materialise runtime objects (sinks, +etc.) without the caller poking at the raw dict. + +Minimal example:: + + [secrets] + file_root = "/run/secrets" + + [[notify.sinks]] + type = "slack" + name = "team-alerts" + webhook_url = "${env:SLACK_WEBHOOK}" + + [[notify.sinks]] + type = "email" + name = "ops-email" + host = "smtp.example.com" + port = 587 + sender = "alerts@example.com" + recipients = ["ops@example.com"] + username = "${env:SMTP_USER}" + password = "${file:smtp_password}" + + [defaults] + dedup_seconds = 120 + +Only the sections the caller uses need to appear; everything is optional. +""" + +from __future__ import annotations + +import sys +from pathlib import Path +from typing import Any + +if sys.version_info >= (3, 11): + import tomllib +else: # pragma: no cover - exercised only on Python 3.10 runners + import tomli as tomllib + +from automation_file.core.secrets import ( + ChainedSecretProvider, + default_provider, + resolve_secret_refs, +) +from automation_file.exceptions import FileAutomationException +from automation_file.logging_config import file_automation_logger +from automation_file.notify.manager import NotificationManager +from automation_file.notify.sinks import ( + EmailSink, + NotificationException, + NotificationSink, + SlackSink, + WebhookSink, +) + + +class ConfigException(FileAutomationException): + """Raised when the config file is missing, unparseable, or malformed.""" + + +class AutomationConfig: + """Parsed, secret-resolved view of an ``automation_file.toml`` document.""" + + def __init__(self, data: dict[str, Any], *, source: Path | None = None) -> None: + self._data = data + self._source = source + + @classmethod + def load( + cls, + path: str | Path, + *, + provider: ChainedSecretProvider | None = None, + ) -> AutomationConfig: + """Parse ``path`` as TOML, resolve secret refs, and return the config.""" + config_path = Path(path) + if not config_path.is_file(): + raise ConfigException(f"config file not found: {config_path}") + try: + raw = tomllib.loads(config_path.read_text(encoding="utf-8")) + except (OSError, tomllib.TOMLDecodeError) as err: + raise ConfigException(f"cannot parse {config_path}: {err}") from err + secrets_section = raw.get("secrets") or {} + file_root = secrets_section.get("file_root") + effective_provider = provider or default_provider(file_root) + resolved = resolve_secret_refs(raw, effective_provider) + file_automation_logger.info("config loaded from %s", config_path) + return cls(resolved, source=config_path) + + @property + def source(self) -> Path | None: + return self._source + + @property + def raw(self) -> dict[str, Any]: + """Return a shallow copy of the resolved document.""" + return dict(self._data) + + def section(self, name: str) -> dict[str, Any]: + """Return one top-level section as a dict (empty if absent).""" + value = self._data.get(name) + if value is None: + return {} + if not isinstance(value, dict): + raise ConfigException(f"section {name!r} must be a table, got {type(value).__name__}") + return value + + def notification_sinks(self) -> list[NotificationSink]: + """Instantiate every sink declared under ``[[notify.sinks]]``.""" + notify_section = self.section("notify") + entries = notify_section.get("sinks") or [] + if not isinstance(entries, list): + raise ConfigException("'notify.sinks' must be an array of tables") + sinks: list[NotificationSink] = [] + for entry in entries: + if not isinstance(entry, dict): + raise ConfigException("each 'notify.sinks' entry must be a table") + sinks.append(_build_sink(entry)) + return sinks + + def apply_to(self, manager: NotificationManager) -> int: + """Register every configured sink into ``manager``. Returns the count. + + Existing registrations are preserved; duplicates by name are replaced + (see :meth:`NotificationManager.register`). + """ + count = 0 + for sink in self.notification_sinks(): + manager.register(sink) + count += 1 + defaults = self.section("defaults") + if "dedup_seconds" in defaults: + try: + manager.dedup_seconds = float(defaults["dedup_seconds"]) + except (TypeError, ValueError) as err: + raise ConfigException( + f"defaults.dedup_seconds must be a number, got {defaults['dedup_seconds']!r}" + ) from err + return count + + +def _build_sink(entry: dict[str, Any]) -> NotificationSink: + sink_type = entry.get("type") + if not isinstance(sink_type, str): + raise ConfigException("each sink entry needs a 'type' string") + builder = _SINK_BUILDERS.get(sink_type) + if builder is None: + raise ConfigException( + f"unknown sink type {sink_type!r} (expected one of {sorted(_SINK_BUILDERS)})" + ) + try: + return builder(entry) + except NotificationException: + raise + except (TypeError, ValueError) as err: + raise ConfigException( + f"invalid config for sink {entry.get('name') or sink_type!r}: {err}" + ) from err + + +def _build_webhook(entry: dict[str, Any]) -> WebhookSink: + return WebhookSink( + url=entry["url"], + name=entry.get("name", "webhook"), + timeout=float(entry.get("timeout", 10.0)), + extra_headers=entry.get("extra_headers"), + ) + + +def _build_slack(entry: dict[str, Any]) -> SlackSink: + return SlackSink( + webhook_url=entry["webhook_url"], + name=entry.get("name", "slack"), + timeout=float(entry.get("timeout", 10.0)), + ) + + +def _build_email(entry: dict[str, Any]) -> EmailSink: + return EmailSink( + host=entry["host"], + port=int(entry["port"]), + sender=entry["sender"], + recipients=list(entry["recipients"]), + username=entry.get("username"), + password=entry.get("password"), + use_tls=bool(entry.get("use_tls", True)), + name=entry.get("name", "email"), + timeout=float(entry.get("timeout", 10.0)), + ) + + +_SINK_BUILDERS = { + "webhook": _build_webhook, + "slack": _build_slack, + "email": _build_email, +} diff --git a/automation_file/core/config_watcher.py b/automation_file/core/config_watcher.py new file mode 100644 index 0000000..80c7679 --- /dev/null +++ b/automation_file/core/config_watcher.py @@ -0,0 +1,96 @@ +"""Polling hot-reload for ``automation_file.toml``. + +``ConfigWatcher`` runs a small background thread that checks the config file's +mtime + size every ``interval`` seconds. When either changes it reloads via +:meth:`AutomationConfig.load` and fires the user-supplied ``on_change`` +callback with the fresh config. Errors during reload are logged but do not +terminate the watcher — the next successful reload picks up where we left +off. + +We use polling rather than watchdog here: it's one file, cross-platform, and +the reload cadence is inherently human-scale (seconds, not milliseconds). +""" + +from __future__ import annotations + +import threading +from collections.abc import Callable +from pathlib import Path + +from automation_file.core.config import AutomationConfig, ConfigException +from automation_file.core.secrets import ChainedSecretProvider +from automation_file.exceptions import FileAutomationException +from automation_file.logging_config import file_automation_logger + +OnChange = Callable[[AutomationConfig], None] + + +class ConfigWatcher: + """Polls ``path`` and invokes ``on_change`` whenever the file changes.""" + + def __init__( + self, + path: str | Path, + on_change: OnChange, + *, + interval: float = 2.0, + provider: ChainedSecretProvider | None = None, + ) -> None: + if interval <= 0: + raise ConfigException("interval must be positive") + self._path = Path(path) + self._on_change = on_change + self._interval = interval + self._provider = provider + self._stop = threading.Event() + self._thread: threading.Thread | None = None + self._last_fingerprint: tuple[float, int] | None = None + + def start(self) -> AutomationConfig: + """Load the config once, arm the watcher, and return the initial load.""" + config = AutomationConfig.load(self._path, provider=self._provider) + self._last_fingerprint = self._fingerprint() + self._stop.clear() + thread = threading.Thread(target=self._run, name="fa-config-watcher", daemon=True) + thread.start() + self._thread = thread + file_automation_logger.info( + "config_watcher: watching %s (interval=%.2fs)", self._path, self._interval + ) + return config + + def stop(self, timeout: float = 5.0) -> None: + self._stop.set() + thread = self._thread + self._thread = None + if thread is not None and thread.is_alive(): + thread.join(timeout=timeout) + + def check_once(self) -> bool: + """Reload if the file changed since the last call. Returns True on reload.""" + fingerprint = self._fingerprint() + if fingerprint == self._last_fingerprint: + return False + self._last_fingerprint = fingerprint + try: + config = AutomationConfig.load(self._path, provider=self._provider) + except FileAutomationException as err: + file_automation_logger.error("config_watcher: reload failed: %r", err) + return False + try: + self._on_change(config) + except FileAutomationException as err: + file_automation_logger.error("config_watcher: on_change raised: %r", err) + return True + + def _fingerprint(self) -> tuple[float, int]: + try: + stat = self._path.stat() + except OSError: + return (0.0, 0) + return (stat.st_mtime, stat.st_size) + + def _run(self) -> None: + while not self._stop.is_set(): + self.check_once() + self._stop.wait(self._interval) diff --git a/automation_file/core/crypto.py b/automation_file/core/crypto.py new file mode 100644 index 0000000..1a9573e --- /dev/null +++ b/automation_file/core/crypto.py @@ -0,0 +1,174 @@ +"""AES-256-GCM file encryption helpers. + +``encrypt_file(source, target, key)`` writes a self-describing envelope:: + + magic = b"FA-AESG" 7 bytes + version = 0x01 1 byte + flags = 0x00 1 byte (reserved) + aad_len = uint32 BE 4 bytes + nonce = 12 bytes + aad = + ciphertext + tag (rest — GCM tag is the trailing 16 bytes) + +``decrypt_file`` reads the same format, verifies the tag, and writes the +plaintext to ``target``. Tampering (bit flips anywhere in the envelope +except ``aad_len``) surfaces as :class:`CryptoException`. + +GCM has a hard plaintext limit of roughly 64 GiB per ``(key, nonce)`` +pair; since each encrypt generates a fresh nonce, the practical cap is +per-file and is much larger than typical automation payloads. For files +approaching that size, split before calling ``encrypt_file``. +""" + +from __future__ import annotations + +import os +from pathlib import Path + +from cryptography.exceptions import InvalidTag +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.ciphers.aead import AESGCM +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + +from automation_file.exceptions import FileAutomationException +from automation_file.logging_config import file_automation_logger + +_MAGIC = b"FA-AESG" +_VERSION = 0x01 +_NONCE_SIZE = 12 +_HEADER_FIXED_SIZE = len(_MAGIC) + 2 + 4 # magic + version + flags + aad_len +_VALID_KEY_SIZES = frozenset({16, 24, 32}) +_DEFAULT_PBKDF2_ITERATIONS = 200_000 + + +class CryptoException(FileAutomationException): + """Raised when encryption / decryption fails (including on tamper).""" + + +def generate_key(*, bits: int = 256) -> bytes: + """Return cryptographically random bytes suitable for AES-GCM.""" + if bits not in (128, 192, 256): + raise CryptoException(f"bits must be 128 / 192 / 256, got {bits}") + return os.urandom(bits // 8) + + +def key_from_password( + password: str, + salt: bytes, + *, + iterations: int = _DEFAULT_PBKDF2_ITERATIONS, + bits: int = 256, +) -> bytes: + """Derive a symmetric key from ``password`` via PBKDF2-HMAC-SHA256.""" + if not password: + raise CryptoException("password must be non-empty") + if len(salt) < 16: + raise CryptoException("salt must be at least 16 bytes") + if bits not in (128, 192, 256): + raise CryptoException(f"bits must be 128 / 192 / 256, got {bits}") + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=bits // 8, + salt=salt, + iterations=iterations, + ) + return kdf.derive(password.encode("utf-8")) + + +def encrypt_file( + source: str | os.PathLike[str], + target: str | os.PathLike[str], + key: bytes, + *, + associated_data: bytes = b"", +) -> dict[str, int]: + """Encrypt ``source`` to ``target`` under AES-GCM. Returns a size summary.""" + _validate_key(key) + if not isinstance(associated_data, (bytes, bytearray)): + raise CryptoException("associated_data must be bytes") + src = Path(source) + if not src.is_file(): + raise CryptoException(f"source file not found: {src}") + + plaintext = src.read_bytes() + nonce = os.urandom(_NONCE_SIZE) + aesgcm = AESGCM(bytes(key)) + ciphertext = aesgcm.encrypt(nonce, plaintext, bytes(associated_data) or None) + + envelope = _build_header(associated_data, nonce) + ciphertext + dst = Path(target) + dst.parent.mkdir(parents=True, exist_ok=True) + dst.write_bytes(envelope) + file_automation_logger.info( + "encrypt_file: %s -> %s (%d -> %d bytes)", + src, + dst, + len(plaintext), + len(envelope), + ) + return {"plaintext_bytes": len(plaintext), "ciphertext_bytes": len(envelope)} + + +def decrypt_file( + source: str | os.PathLike[str], + target: str | os.PathLike[str], + key: bytes, +) -> dict[str, int]: + """Decrypt ``source`` to ``target``. Raises on invalid tag / header.""" + _validate_key(key) + src = Path(source) + if not src.is_file(): + raise CryptoException(f"source file not found: {src}") + envelope = src.read_bytes() + nonce, aad, ciphertext = _parse_envelope(envelope) + aesgcm = AESGCM(bytes(key)) + try: + plaintext = aesgcm.decrypt(nonce, ciphertext, aad or None) + except InvalidTag as err: + raise CryptoException("authentication failed: wrong key or tampered data") from err + + dst = Path(target) + dst.parent.mkdir(parents=True, exist_ok=True) + dst.write_bytes(plaintext) + file_automation_logger.info( + "decrypt_file: %s -> %s (%d -> %d bytes)", + src, + dst, + len(envelope), + len(plaintext), + ) + return {"ciphertext_bytes": len(envelope), "plaintext_bytes": len(plaintext)} + + +def _validate_key(key: bytes) -> None: + if not isinstance(key, (bytes, bytearray)): + raise CryptoException("key must be bytes") + if len(key) not in _VALID_KEY_SIZES: + raise CryptoException( + f"key length must be 16 / 24 / 32 bytes, got {len(key)}", + ) + + +def _build_header(aad: bytes, nonce: bytes) -> bytes: + aad_len = len(aad).to_bytes(4, "big") + return _MAGIC + bytes([_VERSION, 0x00]) + aad_len + nonce + bytes(aad) + + +def _parse_envelope(envelope: bytes) -> tuple[bytes, bytes, bytes]: + if len(envelope) < _HEADER_FIXED_SIZE + _NONCE_SIZE + 16: + raise CryptoException("ciphertext envelope is shorter than the fixed header") + if not envelope.startswith(_MAGIC): + raise CryptoException("not an AES-GCM envelope (bad magic)") + version = envelope[len(_MAGIC)] + if version != _VERSION: + raise CryptoException(f"unsupported envelope version {version}") + aad_len = int.from_bytes(envelope[_HEADER_FIXED_SIZE - 4 : _HEADER_FIXED_SIZE], "big") + nonce_start = _HEADER_FIXED_SIZE + nonce_end = nonce_start + _NONCE_SIZE + aad_end = nonce_end + aad_len + if aad_end > len(envelope): + raise CryptoException("envelope truncated before aad end") + nonce = envelope[nonce_start:nonce_end] + aad = envelope[nonce_end:aad_end] + ciphertext = envelope[aad_end:] + return nonce, aad, ciphertext diff --git a/automation_file/core/dag_executor.py b/automation_file/core/dag_executor.py new file mode 100644 index 0000000..7bea7d3 --- /dev/null +++ b/automation_file/core/dag_executor.py @@ -0,0 +1,175 @@ +"""DAG executor — run actions in dependency order, parallelising independent branches. + +A DAG node is a dict: + +.. code-block:: python + + {"id": "build", "action": ["FA_create_dir", {"dir_path": "build"}]} + {"id": "zip", "action": ["FA_zip_dir", ...], "depends_on": ["build"]} + +The executor performs Kahn-style topological scheduling: every node whose +dependencies are satisfied becomes runnable and is dispatched to a shared +thread pool immediately — so diamonds and wide fan-outs run in parallel +without the caller hand-tuning ``max_workers`` around dependency edges. + +If a node raises, its transitive dependents are marked ``skipped`` by +default (fail-fast semantics). Pass ``fail_fast=False`` to run dependents +regardless (useful for cleanup steps). +""" + +from __future__ import annotations + +import threading +from collections import defaultdict, deque +from collections.abc import Mapping +from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait +from typing import Any + +from automation_file.core.action_executor import executor as default_executor +from automation_file.exceptions import DagException + +__all__ = ["execute_action_dag"] + + +def execute_action_dag( + nodes: list[Mapping[str, Any]], + max_workers: int = 4, + fail_fast: bool = True, +) -> dict[str, Any]: + """Run ``nodes`` in topological order, parallelising independent branches. + + Each node is ``{"id": str, "action": [...], "depends_on": [id, ...]}``. + ``depends_on`` is optional (default ``[]``). Returns a dict mapping each + node id to either the action's return value, the repr of its exception, + or ``"skipped: "`` when ``fail_fast`` blocks a branch. + + Raises :class:`DagException` for static errors detected before any action + runs: duplicate ids, unknown dependencies, or cycles. + """ + graph, indegree = _build_graph(nodes) + node_map = {_require_id(node): node for node in nodes} + results: dict[str, Any] = {} + lock = threading.Lock() + + ready: deque[str] = deque(node_id for node_id, count in indegree.items() if count == 0) + + with ThreadPoolExecutor(max_workers=max_workers) as pool: + in_flight: dict[Future[Any], str] = {} + + def submit(node_id: str) -> None: + action = node_map[node_id].get("action") + if not isinstance(action, list): + err = DagException(f"node {node_id!r} missing action list") + with lock: + results[node_id] = repr(err) + if fail_fast: + for dependent in graph.get(node_id, ()): + indegree[dependent] -= 1 + _mark_skipped(dependent, node_id, graph, indegree, results, lock) + return + future = pool.submit(_run_action, action) + in_flight[future] = node_id + + while ready or in_flight: + while ready: + submit(ready.popleft()) + if not in_flight: + break + done, _ = wait(list(in_flight), return_when=FIRST_COMPLETED) + for future in done: + node_id = in_flight.pop(future) + failed = False + try: + value: Any = future.result() + except Exception as err: # pylint: disable=broad-except + value = repr(err) + failed = True + with lock: + results[node_id] = value + for dependent in graph.get(node_id, ()): + indegree[dependent] -= 1 + if failed and fail_fast: + _mark_skipped(dependent, node_id, graph, indegree, results, lock) + elif indegree[dependent] == 0 and dependent not in results: + ready.append(dependent) + + return results + + +def _run_action(action: list) -> Any: + # Use the single-action path so exceptions surface as real exceptions + # rather than being swallowed by execute_action's per-action try/except. + return default_executor._execute_event(action) + + +def _build_graph( + nodes: list[Mapping[str, Any]], +) -> tuple[dict[str, list[str]], dict[str, int]]: + graph: dict[str, list[str]] = defaultdict(list) + indegree: dict[str, int] = {} + ids: set[str] = set() + + for node in nodes: + node_id = _require_id(node) + if node_id in ids: + raise DagException(f"duplicate node id: {node_id!r}") + ids.add(node_id) + indegree[node_id] = 0 + + for node in nodes: + node_id = _require_id(node) + deps = node.get("depends_on", []) or [] + if not isinstance(deps, list): + raise DagException(f"node {node_id!r} depends_on must be list") + for dep in deps: + if dep not in ids: + raise DagException(f"node {node_id!r} depends on unknown id {dep!r}") + if dep == node_id: + raise DagException(f"node {node_id!r} depends on itself") + graph[dep].append(node_id) + indegree[node_id] += 1 + + _detect_cycle(ids, graph, dict(indegree)) + return dict(graph), indegree + + +def _require_id(node: Mapping[str, Any]) -> str: + node_id = node.get("id") + if not isinstance(node_id, str) or not node_id: + raise DagException(f"node missing non-empty 'id': {node!r}") + return node_id + + +def _detect_cycle( + ids: set[str], + graph: dict[str, list[str]], + indegree: dict[str, int], +) -> None: + queue: deque[str] = deque(node_id for node_id, count in indegree.items() if count == 0) + visited = 0 + while queue: + current = queue.popleft() + visited += 1 + for dependent in graph.get(current, ()): + indegree[dependent] -= 1 + if indegree[dependent] == 0: + queue.append(dependent) + if visited != len(ids): + raise DagException("cycle detected in DAG") + + +def _mark_skipped( + dependent: str, + reason_id: str, + graph: dict[str, list[str]], + indegree: dict[str, int], + results: dict[str, Any], + lock: threading.Lock, +) -> None: + with lock: + if dependent in results: + return + results[dependent] = f"skipped: dep {reason_id!r} failed" + for grandchild in graph.get(dependent, ()): + indegree[grandchild] -= 1 + _mark_skipped(grandchild, dependent, graph, indegree, results, lock) diff --git a/automation_file/core/fim.py b/automation_file/core/fim.py new file mode 100644 index 0000000..12051ef --- /dev/null +++ b/automation_file/core/fim.py @@ -0,0 +1,151 @@ +"""File integrity monitoring (FIM) — periodic manifest verification. + +``IntegrityMonitor(root, manifest_path)`` runs a background thread that +re-verifies the tree against a previously-written manifest at a fixed +interval. When drift is detected (missing or modified files) the monitor + +1. invokes the optional ``on_drift`` callback with the verification summary, +2. emits an ``error``-level notification through the supplied + :class:`~automation_file.notify.NotificationManager` (defaults to the + process-wide singleton), and +3. logs a single warning describing the counts. + +This is the "watchdog" side of manifests: once a baseline has been written +with :func:`write_manifest`, a monitor keeps checking that the tree still +matches and alerts when it does not. Extras (new files not in the manifest) +do not count as drift by default — mirrors the posture of ``verify_manifest``. +""" + +from __future__ import annotations + +import threading +from collections.abc import Callable +from pathlib import Path +from typing import Any + +from automation_file.core.manifest import ManifestException, verify_manifest +from automation_file.exceptions import FileAutomationException +from automation_file.logging_config import file_automation_logger +from automation_file.notify import NotificationManager, notification_manager + +OnDrift = Callable[[dict[str, Any]], None] + + +class IntegrityMonitor: + """Periodically verify a manifest and fire alerts on drift.""" + + def __init__( + self, + root: str | Path, + manifest_path: str | Path, + *, + interval: float = 60.0, + on_drift: OnDrift | None = None, + manager: NotificationManager | None = None, + alert_on_extra: bool = False, + ) -> None: + if interval <= 0: + raise FileAutomationException("interval must be positive") + self._root = Path(root) + self._manifest_path = Path(manifest_path) + self._interval = float(interval) + self._on_drift = on_drift + self._manager = manager or notification_manager + self._alert_on_extra = bool(alert_on_extra) + self._stop = threading.Event() + self._thread: threading.Thread | None = None + self._last_summary: dict[str, Any] | None = None + + @property + def last_summary(self) -> dict[str, Any] | None: + return self._last_summary + + def start(self) -> None: + """Arm the monitor. The first verification runs on the next tick.""" + if self._thread is not None and self._thread.is_alive(): + return + self._stop.clear() + thread = threading.Thread(target=self._run, name="fa-integrity-monitor", daemon=True) + thread.start() + self._thread = thread + file_automation_logger.info( + "integrity_monitor: watching %s against %s (interval=%.1fs)", + self._root, + self._manifest_path, + self._interval, + ) + + def stop(self, timeout: float = 5.0) -> None: + self._stop.set() + thread = self._thread + self._thread = None + if thread is not None and thread.is_alive(): + thread.join(timeout=timeout) + + def check_once(self) -> dict[str, Any]: + """Run one verification pass and return the summary.""" + try: + summary = verify_manifest(self._root, self._manifest_path) + except (ManifestException, FileAutomationException) as err: + file_automation_logger.error("integrity_monitor: verify failed: %r", err) + summary = { + "matched": [], + "missing": [], + "modified": [], + "extra": [], + "ok": False, + "error": repr(err), + } + self._last_summary = summary + if self._is_drift(summary): + self._handle_drift(summary) + return summary + + def _is_drift(self, summary: dict[str, Any]) -> bool: + if summary.get("error"): + return True + if summary.get("missing") or summary.get("modified"): + return True + return bool(self._alert_on_extra and summary.get("extra")) + + def _handle_drift(self, summary: dict[str, Any]) -> None: + file_automation_logger.warning( + "integrity_monitor: drift detected missing=%d modified=%d extra=%d", + len(summary.get("missing") or []), + len(summary.get("modified") or []), + len(summary.get("extra") or []), + ) + if self._on_drift is not None: + try: + self._on_drift(summary) + except FileAutomationException as err: + file_automation_logger.error("integrity_monitor: on_drift raised: %r", err) + body = _format_body(summary) + try: + self._manager.notify( + subject=f"integrity drift: {self._root}", + body=body, + level="error", + ) + except FileAutomationException as err: + file_automation_logger.error("integrity_monitor: notify failed: %r", err) + + def _run(self) -> None: + while not self._stop.is_set(): + self._stop.wait(self._interval) + if self._stop.is_set(): + break + self.check_once() + + +def _format_body(summary: dict[str, Any]) -> str: + parts: list[str] = [] + if summary.get("error"): + parts.append(f"error: {summary['error']}") + for key in ("missing", "modified", "extra"): + items = summary.get(key) or [] + if items: + preview = ", ".join(items[:5]) + suffix = f" (+{len(items) - 5} more)" if len(items) > 5 else "" + parts.append(f"{key}: {preview}{suffix}") + return "\n".join(parts) if parts else "no drift detected" diff --git a/automation_file/core/manifest.py b/automation_file/core/manifest.py new file mode 100644 index 0000000..be409dc --- /dev/null +++ b/automation_file/core/manifest.py @@ -0,0 +1,183 @@ +"""Directory-tree manifests — JSON snapshot of every file's checksum. + +A manifest is a simple JSON document recording every file under a root, +its size, and a streaming digest (SHA-256 by default). Two operations:: + + write_manifest(root, manifest_path) # snapshot now + verify_manifest(root, manifest_path) # verify the tree still matches + +Use cases: release-artifact verification, backup integrity checks, detecting +tampering on a sync target, or pre-flight checks before a rename / move. + +The document shape is intentionally small and human-readable:: + + { + "version": 1, + "algorithm": "sha256", + "root": "/abs/path/at/snapshot/time", + "created_at": "2026-04-21T10:15:30+00:00", + "files": { + "a.txt": {"size": 3, "checksum": "..."}, + "nested/b.txt": {"size": 1_024, "checksum": "..."} + } + } + +Paths in the ``files`` mapping are POSIX-style (forward-slash) relative +paths so the manifest round-trips across Windows and Unix. +""" + +from __future__ import annotations + +import datetime as dt +import json +import os +from pathlib import Path +from typing import Any + +from automation_file.core.checksum import file_checksum +from automation_file.exceptions import DirNotExistsException, FileAutomationException +from automation_file.logging_config import file_automation_logger + +_MANIFEST_VERSION = 1 +_DEFAULT_ALGO = "sha256" + + +class ManifestException(FileAutomationException): + """Raised for invalid manifest documents or unreadable manifest paths.""" + + +def write_manifest( + root: str | os.PathLike[str], + manifest_path: str | os.PathLike[str], + *, + algorithm: str = _DEFAULT_ALGO, +) -> dict[str, Any]: + """Write a manifest for every file under ``root``. Returns the manifest dict.""" + root_path = Path(root) + if not root_path.is_dir(): + raise DirNotExistsException(str(root_path)) + files: dict[str, dict[str, Any]] = {} + for relative in _walk_files(root_path): + absolute = root_path / relative + files[_posix_rel(relative)] = { + "size": absolute.stat().st_size, + "checksum": file_checksum(absolute, algorithm=algorithm), + } + manifest: dict[str, Any] = { + "version": _MANIFEST_VERSION, + "algorithm": algorithm, + "root": str(root_path.resolve()), + "created_at": dt.datetime.now(dt.timezone.utc).isoformat(), + "files": files, + } + Path(manifest_path).parent.mkdir(parents=True, exist_ok=True) + Path(manifest_path).write_text(json.dumps(manifest, indent=2), encoding="utf-8") + file_automation_logger.info( + "write_manifest: %d files under %s -> %s", len(files), root_path, manifest_path + ) + return manifest + + +def verify_manifest( + root: str | os.PathLike[str], + manifest_path: str | os.PathLike[str], +) -> dict[str, Any]: + """Verify every file recorded in ``manifest_path`` still matches under ``root``. + + Returns a summary dict:: + + { + "matched": ["a.txt"], + "missing": ["gone.txt"], + "modified": ["changed.txt"], + "extra": ["new.txt"], # present under root, not in manifest + "ok": False, + } + + ``ok`` is True iff ``missing`` and ``modified`` are both empty (extras + are reported but do not fail verification — mirror ``sync_dir``'s + default non-deleting posture). + """ + root_path = Path(root) + if not root_path.is_dir(): + raise DirNotExistsException(str(root_path)) + manifest = _load_manifest(manifest_path) + algorithm = manifest.get("algorithm", _DEFAULT_ALGO) + recorded = manifest.get("files") or {} + + summary: dict[str, Any] = { + "matched": [], + "missing": [], + "modified": [], + "extra": [], + "ok": False, + } + + for rel, meta in recorded.items(): + _verify_one(root_path, rel, meta, algorithm, summary) + + recorded_keys = set(recorded.keys()) + for rel_path in _walk_files(root_path): + posix = _posix_rel(rel_path) + if posix not in recorded_keys: + summary["extra"].append(posix) + + summary["ok"] = not summary["missing"] and not summary["modified"] + file_automation_logger.info( + "verify_manifest: ok=%s matched=%d missing=%d modified=%d extra=%d", + summary["ok"], + len(summary["matched"]), + len(summary["missing"]), + len(summary["modified"]), + len(summary["extra"]), + ) + return summary + + +def _verify_one( + root: Path, + relative: str, + meta: dict[str, Any], + algorithm: str, + summary: dict[str, Any], +) -> None: + target = root / relative + if not target.is_file(): + summary["missing"].append(relative) + return + expected = meta.get("checksum") + size = meta.get("size") + if isinstance(size, int) and target.stat().st_size != size: + summary["modified"].append(relative) + return + if not isinstance(expected, str) or file_checksum(target, algorithm=algorithm) != expected: + summary["modified"].append(relative) + return + summary["matched"].append(relative) + + +def _load_manifest(path: str | os.PathLike[str]) -> dict[str, Any]: + manifest_path = Path(path) + if not manifest_path.is_file(): + raise ManifestException(f"manifest not found: {manifest_path}") + try: + data = json.loads(manifest_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError) as err: + raise ManifestException(f"cannot read manifest {manifest_path}: {err}") from err + if not isinstance(data, dict) or "files" not in data: + raise ManifestException(f"manifest missing 'files' mapping: {manifest_path}") + return data + + +def _walk_files(root: Path) -> list[Path]: + entries: list[Path] = [] + for dirpath, dirnames, filenames in os.walk(root, followlinks=False): + dirnames.sort() + base = Path(dirpath) + for name in sorted(filenames): + entries.append((base / name).relative_to(root)) + return entries + + +def _posix_rel(relative: Path) -> str: + return str(relative).replace("\\", "/") diff --git a/automation_file/core/metrics.py b/automation_file/core/metrics.py new file mode 100644 index 0000000..f9d7c7f --- /dev/null +++ b/automation_file/core/metrics.py @@ -0,0 +1,65 @@ +"""Prometheus metrics — per-action counters and duration histogram. + +The module exposes two metrics that are updated from +:class:`~automation_file.core.action_executor.ActionExecutor` on every +call: + +* ``automation_file_actions_total{action, status}`` — counter incremented + with ``status="ok"`` or ``status="error"`` per action. +* ``automation_file_action_duration_seconds{action}`` — histogram of wall + time spent inside the registered callable. + +:func:`render` returns the wire-format text and matching ``Content-Type`` +suitable for a ``GET /metrics`` handler. :func:`record_action` is the +single write path — failures are swallowed so a broken metrics backend +can never abort a real action. +""" + +from __future__ import annotations + +from prometheus_client import CONTENT_TYPE_LATEST, REGISTRY, Counter, Histogram, generate_latest + +from automation_file.logging_config import file_automation_logger + +_DURATION_BUCKETS = ( + 0.005, + 0.01, + 0.025, + 0.05, + 0.1, + 0.25, + 0.5, + 1.0, + 2.5, + 5.0, + 10.0, + 30.0, + 60.0, +) + +ACTION_COUNT = Counter( + "automation_file_actions_total", + "Total actions executed, partitioned by outcome.", + labelnames=("action", "status"), +) +ACTION_DURATION = Histogram( + "automation_file_action_duration_seconds", + "Time spent inside a registered action callable.", + labelnames=("action",), + buckets=_DURATION_BUCKETS, +) + + +def record_action(action: str, duration_seconds: float, ok: bool) -> None: + """Record one action execution. Never raises.""" + status = "ok" if ok else "error" + try: + ACTION_COUNT.labels(action=action, status=status).inc() + ACTION_DURATION.labels(action=action).observe(max(0.0, float(duration_seconds))) + except Exception as err: # pragma: no cover - defensive + file_automation_logger.error("metrics.record_action failed: %r", err) + + +def render() -> tuple[bytes, str]: + """Return ``(payload, content_type)`` for a ``/metrics`` response.""" + return generate_latest(REGISTRY), CONTENT_TYPE_LATEST diff --git a/automation_file/core/plugins.py b/automation_file/core/plugins.py new file mode 100644 index 0000000..297d2d2 --- /dev/null +++ b/automation_file/core/plugins.py @@ -0,0 +1,86 @@ +"""Entry-point plugin discovery. + +Third-party packages can register additional actions with +``automation_file`` without the library having to import them directly. + +A plugin advertises itself in its ``pyproject.toml``:: + + [project.entry-points."automation_file.actions"] + my_plugin = "my_plugin:register" + +where ``register`` is a zero-argument callable returning a +``Mapping[str, Callable]`` — the same shape you would pass to +:func:`automation_file.add_command_to_executor`. + +:func:`load_entry_point_plugins` is invoked by +:func:`automation_file.core.action_registry.build_default_registry` so +installed plugins populate every freshly-built registry automatically. +Plugin failures are logged and swallowed — one broken plugin must not +break the library for everyone else. +""" + +from __future__ import annotations + +from collections.abc import Callable, Mapping +from importlib.metadata import EntryPoint, entry_points +from typing import Any + +from automation_file.logging_config import file_automation_logger + +__all__ = ["ENTRY_POINT_GROUP", "load_entry_point_plugins"] + +ENTRY_POINT_GROUP = "automation_file.actions" + + +def load_entry_point_plugins( + register: Callable[[Mapping[str, Callable[..., Any]]], None], +) -> int: + """Discover and register every ``automation_file.actions`` entry point. + + ``register`` receives one ``{name: callable}`` mapping per plugin and + is responsible for storing it (typically + :meth:`ActionRegistry.register_many`). Returns the number of plugins + that registered successfully. + """ + loaded = 0 + for entry in _iter_entry_points(): + try: + factory = entry.load() + except Exception as err: # pylint: disable=broad-except + file_automation_logger.error( + "plugin load failed: %s (%s): %r", entry.name, entry.value, err + ) + continue + try: + mapping = factory() + except Exception as err: # pylint: disable=broad-except + file_automation_logger.error( + "plugin factory raised: %s (%s): %r", entry.name, entry.value, err + ) + continue + if not isinstance(mapping, Mapping): + file_automation_logger.error( + "plugin %s returned %s, expected Mapping", + entry.name, + type(mapping).__name__, + ) + continue + try: + register(mapping) + except Exception as err: # pylint: disable=broad-except + file_automation_logger.error("plugin register failed: %s: %r", entry.name, err) + continue + file_automation_logger.info( + "plugin registered: %s -> %d commands", entry.name, len(mapping) + ) + loaded += 1 + return loaded + + +def _iter_entry_points() -> list[EntryPoint]: + try: + return list(entry_points(group=ENTRY_POINT_GROUP)) + except TypeError: + # importlib.metadata before 3.10 used a different API; the project + # targets 3.10+, so this branch exists only as defensive padding. + return list(entry_points().get(ENTRY_POINT_GROUP, [])) diff --git a/automation_file/core/progress.py b/automation_file/core/progress.py new file mode 100644 index 0000000..92ea7c4 --- /dev/null +++ b/automation_file/core/progress.py @@ -0,0 +1,161 @@ +"""Transfer progress + cancellation primitives. + +Long-running transfers (HTTP downloads, S3 uploads/downloads, …) accept a +named handle from the shared :data:`progress_registry`. The registry keeps a +:class:`ProgressReporter` (bytes transferred, optional total) and a +:class:`CancellationToken` per name so the GUI or a JSON action can observe +progress or cancel mid-flight. + +Instrumentation is opt-in: callers pass ``progress_name="