diff --git a/.github/workflows/ci-dev.yml b/.github/workflows/ci-dev.yml
index 74ccaaf..dcf43d3 100644
--- a/.github/workflows/ci-dev.yml
+++ b/.github/workflows/ci-dev.yml
@@ -48,7 +48,8 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip wheel
- pip install -r dev_requirements.txt
+ Copy-Item dev.toml pyproject.toml -Force
+ pip install -e .
pip install pytest pytest-cov
- name: Run pytest with coverage
run: python -m pytest tests/ -v --tb=short --cov=automation_file --cov-report=term-missing --cov-report=xml
diff --git a/.github/workflows/ci-stable.yml b/.github/workflows/ci-stable.yml
index 97f180a..d6911e7 100644
--- a/.github/workflows/ci-stable.yml
+++ b/.github/workflows/ci-stable.yml
@@ -48,7 +48,8 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip wheel
- pip install -r requirements.txt
+ Copy-Item stable.toml pyproject.toml -Force
+ pip install -e .
pip install pytest pytest-cov
- name: Run pytest with coverage
run: python -m pytest tests/ -v --tb=short --cov=automation_file --cov-report=term-missing --cov-report=xml
diff --git a/.idea/FileAutomation.iml b/.idea/FileAutomation.iml
index 673a12b..91f2d42 100644
--- a/.idea/FileAutomation.iml
+++ b/.idea/FileAutomation.iml
@@ -5,7 +5,7 @@
-
+
\ No newline at end of file
diff --git a/README.md b/README.md
index e781ad8..edc387b 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,7 @@
# FileAutomation
+**English** | [繁體中文](README.zh-TW.md) | [简体中文](README.zh-CN.md)
+
A modular automation framework for local file / directory / ZIP operations,
SSRF-validated HTTP downloads, remote storage (Google Drive, S3, Azure Blob,
Dropbox, SFTP), and JSON-driven action execution over embedded TCP / HTTP
@@ -14,7 +16,33 @@ facade.
- JSON action lists executed by a shared `ActionExecutor` — validate, dry-run, parallel
- Loopback-first TCP **and** HTTP servers that accept JSON command batches with optional shared-secret auth
- Reliability primitives: `retry_on_transient` decorator, `Quota` size / time budgets
-- PySide6 GUI (`python -m automation_file ui`) with a tab per backend plus a JSON-action runner
+- **File-watcher triggers** — run an action list whenever a path changes (`FA_watch_*`)
+- **Cron scheduler** — recurring action lists on a stdlib-only 5-field parser (`FA_schedule_*`)
+- **Transfer progress + cancellation** — opt-in `progress_name` hook on HTTP and S3 transfers (`FA_progress_*`)
+- **Fast file search** — OS index fast path (`mdfind` / `locate` / `es.exe`) with a streaming `scandir` fallback (`FA_fast_find`)
+- **Checksums + integrity verification** — streaming `file_checksum` / `verify_checksum` with any `hashlib` algorithm; `download_file(expected_sha256=...)` verifies after transfer (`FA_file_checksum`, `FA_verify_checksum`)
+- **Resumable HTTP downloads** — `download_file(resume=True)` writes to `.part` and sends `Range: bytes=-` so interrupted transfers continue
+- **Duplicate-file finder** — three-stage size → partial-hash → full-hash pipeline; unique-size files are never hashed (`FA_find_duplicates`)
+- **DAG action executor** — topological scheduling with parallel fan-out and per-branch skip-on-failure (`FA_execute_action_dag`)
+- **Entry-point plugins** — third-party packages register their own `FA_*` actions via `[project.entry-points."automation_file.actions"]`; `build_default_registry()` picks them up automatically
+- **Incremental directory sync** — rsync-style mirror with size+mtime or checksum change detection, optional delete of extras, dry-run (`FA_sync_dir`)
+- **Directory manifests** — JSON snapshot of every file's checksum under a root, with separate missing/modified/extra reporting on verify (`FA_write_manifest`, `FA_verify_manifest`)
+- **Notification sinks** — webhook / Slack / SMTP / Telegram / Discord / Teams / PagerDuty with a fanout manager that does per-sink error isolation and sliding-window dedup; auto-notify on trigger + scheduler failures (`FA_notify_send`, `FA_notify_list`)
+- **Config file + secret providers** — declare notification sinks / defaults in `automation_file.toml`; `${env:…}` and `${file:…}` references resolve through an Env/File/Chained provider abstraction so secrets stay out of the file itself
+- **Config hot reload** — `ConfigWatcher` polls `automation_file.toml` and re-applies sinks / defaults on change without restart
+- **Shell / grep / JSON edit / tar / backup rotation** — `FA_run_shell` (argument-list subprocess with timeout), `FA_grep` (streaming text search), `FA_json_get` / `FA_json_set` / `FA_json_delete` (in-place JSON editing), `FA_create_tar` / `FA_extract_tar`, `FA_rotate_backups`
+- **FTP / FTPS backend** — plain FTP or explicit FTPS via `FTP_TLS.auth()`; auto-registered as `FA_ftp_*`
+- **Cross-backend copy** — `FA_copy_between` moves data between any two backends via `local://`, `s3://`, `drive://`, `azure://`, `dropbox://`, `sftp://`, `ftp://` URIs
+- **Scheduler overlap guard** — running jobs are skipped on the next fire unless `allow_overlap=True`
+- **Server action ACL** — `allowed_actions=(...)` restricts which commands TCP / HTTP servers will dispatch
+- **Variable substitution** — opt-in `${env:VAR}` / `${date:%Y-%m-%d}` / `${uuid}` / `${cwd}` expansion in action arguments via `execute_action(..., substitute=True)`
+- **Conditional execution** — `FA_if_exists` / `FA_if_newer` / `FA_if_size_gt` run a nested action list only when a guard passes
+- **SQLite audit log** — `AuditLog(db_path)` records every action execution with actor / status / duration; query via `recent` / `count` / `purge`
+- **File integrity monitor** — `IntegrityMonitor` polls a tree against a manifest and fires a callback + notification on drift
+- **HTTPActionClient SDK** — typed Python client for the HTTP action server with shared-secret auth, loopback guard, and OPTIONS-based ping
+- **AES-256-GCM file encryption** — `encrypt_file` / `decrypt_file` with `generate_key()` / `key_from_password()` (PBKDF2-HMAC-SHA256); JSON actions `FA_encrypt_file` / `FA_decrypt_file`
+- **Prometheus metrics exporter** — `start_metrics_server()` exposes `automation_file_actions_total{action,status}` counters and `automation_file_action_duration_seconds{action}` histograms
+- PySide6 GUI (`python -m automation_file ui`) with a tab per backend, the JSON-action runner, and dedicated tabs for Triggers, Scheduler, and live Progress
- Rich CLI with one-shot subcommands plus legacy JSON-batch flags
- Project scaffolding (`ProjectBuilder`) for executor-based automations
@@ -36,6 +64,12 @@ flowchart LR
Json[json_store]
Retry[retry]
QuotaMod[quota]
+ Progress[progress
Token + Reporter]
+ end
+
+ subgraph Events["event-driven"]
+ TriggerMod["trigger
watchdog file watcher"]
+ SchedulerMod["scheduler
cron background thread"]
end
subgraph Local["local"]
@@ -101,6 +135,14 @@ flowchart LR
Registry --> Dropbox
Registry --> SFTP
Registry --> Builder
+ Registry --> TriggerMod
+ Registry --> SchedulerMod
+ Registry --> Progress
+
+ TriggerMod --> Executor
+ SchedulerMod --> Executor
+ Http --> Progress
+ S3 --> Progress
Http --> UrlVal
Http --> Retry
@@ -130,7 +172,7 @@ Requirements:
- Python 3.10+
- Bundled dependencies: `google-api-python-client`, `google-auth-oauthlib`,
`requests`, `tqdm`, `boto3`, `azure-storage-blob`, `dropbox`, `paramiko`,
- `PySide6`
+ `PySide6`, `watchdog`
## Usage
@@ -240,6 +282,401 @@ All backends (`s3`, `azure_blob`, `dropbox_api`, `sftp`) expose the same five
operations: `upload_file`, `upload_dir`, `download_file`, `delete_*`, `list_*`.
SFTP uses `paramiko.RejectPolicy` — unknown hosts are rejected, not auto-added.
+### File-watcher triggers
+Run an action list whenever a filesystem event fires on a watched path:
+
+```python
+from automation_file import watch_start, watch_stop
+
+watch_start(
+ name="inbox-sweeper",
+ path="/data/inbox",
+ action_list=[["FA_copy_all_file_to_dir", {"source_dir": "/data/inbox",
+ "target_dir": "/data/processed"}]],
+ events=["created", "modified"],
+ recursive=False,
+)
+# later:
+watch_stop("inbox-sweeper")
+```
+
+`FA_watch_start` / `FA_watch_stop` / `FA_watch_stop_all` / `FA_watch_list`
+surface the same lifecycle to JSON action lists.
+
+### Cron scheduler
+Recurring action lists on a stdlib-only 5-field cron parser:
+
+```python
+from automation_file import schedule_add
+
+schedule_add(
+ name="nightly-snapshot",
+ cron_expression="0 2 * * *", # every day at 02:00 local time
+ action_list=[["FA_zip_dir", {"dir_we_want_to_zip": "/data",
+ "zip_name": "/backup/data_nightly"}]],
+)
+```
+
+Supports `*`, exact values, `a-b` ranges, comma lists, and `*/n` step
+syntax with `jan..dec` / `sun..sat` aliases. JSON actions:
+`FA_schedule_add`, `FA_schedule_remove`, `FA_schedule_remove_all`,
+`FA_schedule_list`.
+
+### Transfer progress + cancellation
+HTTP and S3 transfers accept an opt-in `progress_name` kwarg:
+
+```python
+from automation_file import download_file, progress_cancel
+
+download_file("https://example.com/big.bin", "big.bin",
+ progress_name="big-download")
+
+# From another thread or the GUI:
+progress_cancel("big-download")
+```
+
+The shared `progress_registry` exposes live snapshots via `progress_list()`
+and the `FA_progress_list` / `FA_progress_cancel` / `FA_progress_clear` JSON
+actions. The GUI's **Progress** tab polls the registry every half second.
+
+### Fast file search
+Query an OS index when available (`mdfind` on macOS, `locate` / `plocate` on
+Linux, Everything's `es.exe` on Windows) and fall back to a streaming
+`os.scandir` walk otherwise. No extra dependencies.
+
+```python
+from automation_file import fast_find, scandir_find, has_os_index
+
+# Uses the OS indexer when available, scandir fallback otherwise.
+results = fast_find("/var/log", "*.log", limit=100)
+
+# Force the portable path (skip the OS indexer).
+results = fast_find("/data", "report_*.csv", use_index=False)
+
+# Streaming — stop early without scanning the whole tree.
+for path in scandir_find("/data", "*.csv"):
+ if "2026" in path:
+ break
+```
+
+`FA_fast_find` exposes the same function to JSON action lists:
+
+```json
+[["FA_fast_find", {"root": "/var/log", "pattern": "*.log", "limit": 50}]]
+```
+
+### Checksums + integrity verification
+Stream any `hashlib` algorithm; `verify_checksum` compares with
+`hmac.compare_digest` (constant-time):
+
+```python
+from automation_file import file_checksum, verify_checksum
+
+digest = file_checksum("bundle.tar.gz") # sha256 by default
+verify_checksum("bundle.tar.gz", digest) # -> True
+verify_checksum("bundle.tar.gz", "deadbeef...", algorithm="blake2b")
+```
+
+Also available as `FA_file_checksum` / `FA_verify_checksum` JSON actions.
+
+### Resumable HTTP downloads
+`download_file(resume=True)` writes to `.part` and sends
+`Range: bytes=-` on the next attempt. Pair with `expected_sha256=` for
+integrity verification once the transfer completes:
+
+```python
+from automation_file import download_file
+
+download_file(
+ "https://example.com/big.bin",
+ "big.bin",
+ resume=True,
+ expected_sha256="3b0c44298fc1...",
+)
+```
+
+### Duplicate-file finder
+Three-stage pipeline: size bucket → 64 KiB partial hash → full hash.
+Unique-size files are never hashed:
+
+```python
+from automation_file import find_duplicates
+
+groups = find_duplicates("/data", min_size=1024)
+# list[list[str]] — each inner list is a set of identical files, sorted
+# by size descending.
+```
+
+`FA_find_duplicates` runs the same search from JSON.
+
+### Incremental directory sync
+`sync_dir` mirrors `src` into `dst` by copying only files that are new or
+changed. Change detection is `(size, mtime)` by default; pass
+`compare="checksum"` when mtime is unreliable. Extras under `dst` are left
+alone by default — pass `delete=True` to prune them (and `dry_run=True` to
+preview):
+
+```python
+from automation_file import sync_dir
+
+summary = sync_dir("/data/src", "/data/dst", delete=True)
+# summary: {"copied": [...], "skipped": [...], "deleted": [...],
+# "errors": [...], "dry_run": False}
+```
+
+Symlinks are re-created as symlinks rather than followed, so a link
+pointing outside the tree can't blow up the mirror. JSON action:
+`FA_sync_dir`.
+
+### Directory manifests
+Write a JSON manifest of every file's checksum under a tree and verify the
+tree hasn't changed later:
+
+```python
+from automation_file import write_manifest, verify_manifest
+
+write_manifest("/release/payload", "/release/MANIFEST.json")
+
+# Later…
+result = verify_manifest("/release/payload", "/release/MANIFEST.json")
+if not result["ok"]:
+ raise SystemExit(f"manifest mismatch: {result}")
+```
+
+`result` reports `matched`, `missing`, `modified`, and `extra` lists
+separately. Extras don't fail verification (mirrors `sync_dir`'s
+non-deleting default); `missing` or `modified` do. JSON actions:
+`FA_write_manifest`, `FA_verify_manifest`.
+
+### Notifications
+Push one-off messages or auto-notify on trigger/scheduler failures via
+webhook, Slack, or SMTP:
+
+```python
+from automation_file import (
+ SlackSink, WebhookSink, EmailSink,
+ notification_manager, notify_send,
+)
+
+notification_manager.register(SlackSink("https://hooks.slack.com/services/T/B/X"))
+notify_send("deploy complete", body="rev abc123", level="info")
+```
+
+Every sink implements the same `send(subject, body, level)` contract. The
+fanout `NotificationManager` does per-sink error isolation (one broken
+sink doesn't starve the others), sliding-window dedup so a stuck trigger
+can't flood a channel, and SSRF validation on every webhook/Slack URL.
+Scheduler and trigger dispatchers auto-notify on failure at
+`level="error"` — registering a sink is all that's needed. JSON actions:
+`FA_notify_send`, `FA_notify_list`.
+
+### Config file and secret providers
+Declare sinks and defaults once in `automation_file.toml`. Secret
+references resolve at load time from environment variables or a file root
+(Docker / K8s style):
+
+```toml
+# automation_file.toml
+
+[secrets]
+file_root = "/run/secrets"
+
+[defaults]
+dedup_seconds = 120
+
+[[notify.sinks]]
+type = "slack"
+name = "team-alerts"
+webhook_url = "${env:SLACK_WEBHOOK}"
+
+[[notify.sinks]]
+type = "email"
+name = "ops-email"
+host = "smtp.example.com"
+port = 587
+sender = "alerts@example.com"
+recipients = ["ops@example.com"]
+username = "${env:SMTP_USER}"
+password = "${file:smtp_password}"
+```
+
+```python
+from automation_file import AutomationConfig, notification_manager
+
+config = AutomationConfig.load("automation_file.toml")
+config.apply_to(notification_manager)
+```
+
+Unresolved `${…}` references raise `SecretNotFoundException` rather than
+silently becoming empty strings. Custom provider chains can be built from
+`ChainedSecretProvider` / `EnvSecretProvider` / `FileSecretProvider` and
+passed as `AutomationConfig.load(path, provider=…)`.
+
+### Variable substitution in action lists
+Opt in with `substitute=True` and `${…}` references expand at dispatch time:
+
+```python
+from automation_file import execute_action
+
+execute_action(
+ [["FA_create_file", {"file_path": "reports/${date:%Y-%m-%d}/${uuid}.txt"}]],
+ substitute=True,
+)
+```
+
+Supports `${env:VAR}`, `${date:FMT}` (strftime), `${uuid}`, `${cwd}`. Unknown
+names raise `SubstitutionException` — no silent empty strings.
+
+### Conditional execution
+Run a nested action list only when a path-based guard passes:
+
+```json
+[
+ ["FA_if_exists", {"path": "/data/in/job.json",
+ "then": [["FA_copy_file", {"source": "/data/in/job.json",
+ "target": "/data/processed/job.json"}]]}],
+ ["FA_if_newer", {"source": "/src", "target": "/dst",
+ "then": [["FA_sync_dir", {"src": "/src", "dst": "/dst"}]]}],
+ ["FA_if_size_gt", {"path": "/logs/app.log", "size": 10485760,
+ "then": [["FA_run_shell", {"command": ["logrotate", "/logs/app.log"]}]]}]
+]
+```
+
+### SQLite audit log
+`AuditLog` writes one row per action with short-lived connections and a
+module-level lock:
+
+```python
+from automation_file import AuditLog
+
+audit = AuditLog("audit.sqlite3")
+audit.record(action="FA_copy_file", actor="ops",
+ status="ok", duration_ms=12, detail={"src": "a", "dst": "b"})
+
+for row in audit.recent(limit=50):
+ print(row["timestamp"], row["action"], row["status"])
+```
+
+### File integrity monitor
+Poll a tree against a manifest and fire a callback + notification on drift:
+
+```python
+from automation_file import IntegrityMonitor, notification_manager, write_manifest
+
+write_manifest("/srv/site", "/srv/MANIFEST.json")
+
+mon = IntegrityMonitor(
+ root="/srv/site",
+ manifest_path="/srv/MANIFEST.json",
+ interval=60.0,
+ manager=notification_manager,
+ on_drift=lambda summary: print("drift:", summary),
+)
+mon.start()
+```
+
+Manifest-load errors are surfaced as drift so tamper and config issues
+aren't silently different code paths.
+
+### AES-256-GCM file encryption
+Authenticated encryption with a self-describing envelope. Derive a key from
+a password or generate one directly:
+
+```python
+from automation_file import encrypt_file, decrypt_file, key_from_password
+
+key = key_from_password("correct horse battery staple", salt=b"app-salt-v1")
+encrypt_file("secret.pdf", "secret.pdf.enc", key, associated_data=b"v1")
+decrypt_file("secret.pdf.enc", "secret.pdf", key, associated_data=b"v1")
+```
+
+Tamper is detected via GCM's authentication tag and reported as
+`CryptoException("authentication failed")`. JSON actions:
+`FA_encrypt_file`, `FA_decrypt_file`.
+
+### HTTPActionClient Python SDK
+Typed client for the HTTP action server; enforces loopback by default and
+carries the shared secret for you:
+
+```python
+from automation_file import HTTPActionClient
+
+with HTTPActionClient("http://127.0.0.1:9944", shared_secret="s3cr3t") as client:
+ client.ping() # OPTIONS /actions
+ result = client.execute([["FA_create_dir", {"dir_path": "x"}]])
+```
+
+Auth failures map to `HTTPActionClientException` with `kind="unauthorized"`;
+404 responses report the server exists but does not expose `/actions`.
+
+### Prometheus metrics exporter
+`ActionExecutor` records one counter row and one histogram sample per
+action. Serve them on a loopback `/metrics` endpoint:
+
+```python
+from automation_file import start_metrics_server
+
+server = start_metrics_server(host="127.0.0.1", port=9945)
+# curl http://127.0.0.1:9945/metrics
+```
+
+Exports `automation_file_actions_total{action,status}` and
+`automation_file_action_duration_seconds{action}`. Non-loopback binds
+require `allow_non_loopback=True` explicitly.
+
+### DAG action executor
+Run actions in dependency order; independent branches fan out across a
+thread pool. Each node is `{"id": ..., "action": [...], "depends_on":
+[...]}`:
+
+```python
+from automation_file import execute_action_dag
+
+execute_action_dag([
+ {"id": "fetch", "action": ["FA_download_file",
+ ["https://example.com/src.tar.gz", "src.tar.gz"]]},
+ {"id": "verify", "action": ["FA_verify_checksum",
+ ["src.tar.gz", "3b0c44298fc1..."]],
+ "depends_on": ["fetch"]},
+ {"id": "unpack", "action": ["FA_unzip_file", ["src.tar.gz", "src"]],
+ "depends_on": ["verify"]},
+])
+```
+
+If `verify` raises, `unpack` is marked `skipped` by default. Pass
+`fail_fast=False` to run dependents regardless. JSON action:
+`FA_execute_action_dag`.
+
+### Entry-point plugins
+Third-party packages advertise actions via `pyproject.toml`:
+
+```toml
+[project.entry-points."automation_file.actions"]
+my_plugin = "my_plugin:register"
+```
+
+where `register` is a zero-argument callable returning a
+`dict[str, Callable]`. Once installed in the same environment, the
+commands show up in every freshly-built registry:
+
+```python
+# my_plugin/__init__.py
+def greet(name: str) -> str:
+ return f"hello {name}"
+
+def register() -> dict:
+ return {"FA_greet": greet}
+```
+
+```python
+# after `pip install my_plugin`
+from automation_file import execute_action
+execute_action([["FA_greet", {"name": "world"}]])
+```
+
+Plugin failures are logged and swallowed — one broken plugin cannot
+break the library.
+
### GUI
```bash
python -m automation_file ui # or: python main_ui.py
@@ -250,7 +687,7 @@ from automation_file import launch_ui
launch_ui()
```
-Tabs: Local, HTTP, Google Drive, S3, Azure Blob, Dropbox, SFTP, JSON actions,
+Tabs: Home, Local, Transfer, Progress, JSON actions, Triggers, Scheduler,
Servers. A persistent log panel at the bottom streams every result and error.
### Scaffold an executor-based project
diff --git a/README.zh-CN.md b/README.zh-CN.md
new file mode 100644
index 0000000..c0ae731
--- /dev/null
+++ b/README.zh-CN.md
@@ -0,0 +1,726 @@
+# FileAutomation
+
+[English](README.md) | [繁體中文](README.zh-TW.md) | **简体中文**
+
+一套模块化的自动化框架,涵盖本地文件 / 目录 / ZIP 操作、经 SSRF 验证的 HTTP
+下载、远程存储(Google Drive、S3、Azure Blob、Dropbox、SFTP),以及通过内嵌
+TCP / HTTP 服务器执行的 JSON 驱动动作。内附 PySide6 GUI,每个功能都有对应
+页签。所有公开 API 均由顶层 `automation_file` facade 统一导出。
+
+- 本地文件 / 目录 / ZIP 操作,内置路径穿越防护(`safe_join`)
+- 经 SSRF 验证的 HTTP 下载,支持重试与大小 / 时间上限
+- Google Drive CRUD(上传、下载、搜索、删除、分享、文件夹)
+- 一等公民的 S3、Azure Blob、Dropbox、SFTP 后端 — 默认安装
+- JSON 动作清单由共享的 `ActionExecutor` 执行 — 支持验证、干跑、并行
+- Loopback 优先的 TCP **与** HTTP 服务器,接受 JSON 指令批量并可选 shared-secret 验证
+- 可靠性原语:`retry_on_transient` 装饰器、`Quota` 大小 / 时间预算
+- **文件监听触发** — 当路径变动时执行动作清单(`FA_watch_*`)
+- **Cron 调度器** — 仅用标准库的 5 字段解析器执行周期性动作清单(`FA_schedule_*`)
+- **传输进度 + 取消** — HTTP 与 S3 传输可选的 `progress_name` 钩子(`FA_progress_*`)
+- **快速文件搜索** — OS 索引快速路径(`mdfind` / `locate` / `es.exe`)搭配流式 `scandir` 回退(`FA_fast_find`)
+- **校验和 + 完整性验证** — 流式 `file_checksum` / `verify_checksum`,支持任何 `hashlib` 算法;`download_file(expected_sha256=...)` 在下载完成后立即验证(`FA_file_checksum`、`FA_verify_checksum`)
+- **可续传 HTTP 下载** — `download_file(resume=True)` 写入 `.part` 并发送 `Range: bytes=-`,让中断的传输继续而非从头开始
+- **重复文件查找器** — 三阶段 size → 部分哈希 → 完整哈希管线;大小唯一的文件完全不会被哈希(`FA_find_duplicates`)
+- **DAG 动作执行器** — 按依赖顺序拓扑调度,独立分支并行展开,失败时其后代默认标记为跳过(`FA_execute_action_dag`)
+- **Entry-point 插件** — 第三方包通过 `[project.entry-points."automation_file.actions"]` 注册自定义 `FA_*` 动作;`build_default_registry()` 会自动加载
+- **增量目录同步** — rsync 风格镜像,支持 size+mtime 或 checksum 变更检测,可选删除多余文件,支持干跑(`FA_sync_dir`)
+- **目录 manifest** — 以 JSON 快照记录树下每个文件的校验和,验证时分开报告 missing / modified / extra(`FA_write_manifest`、`FA_verify_manifest`)
+- **通知 sink** — webhook / Slack / SMTP / Telegram / Discord / Teams / PagerDuty,fanout 管理器做单 sink 错误隔离与滑动窗口去重;trigger + scheduler 失败时自动通知(`FA_notify_send`、`FA_notify_list`)
+- **配置文件 + 密钥提供者** — 在 `automation_file.toml` 声明通知 sink / 默认值;`${env:…}` 与 `${file:…}` 引用通过 Env / File / Chained 提供者抽象解析,让密钥不留在配置文件中
+- **配置热加载** — `ConfigWatcher` 轮询 `automation_file.toml`,变更时即时应用 sink / 默认值,无需重启
+- **Shell / grep / JSON 编辑 / tar / 备份轮转** — `FA_run_shell`(参数列表式 subprocess,含超时)、`FA_grep`(流式文本搜索)、`FA_json_get` / `FA_json_set` / `FA_json_delete`(原地 JSON 编辑)、`FA_create_tar` / `FA_extract_tar`、`FA_rotate_backups`
+- **FTP / FTPS 后端** — 纯 FTP 或通过 `FTP_TLS.auth()` 的显式 FTPS;自动注册为 `FA_ftp_*`
+- **跨后端复制** — `FA_copy_between` 通过 `local://`、`s3://`、`drive://`、`azure://`、`dropbox://`、`sftp://`、`ftp://` URI 在任意两个后端之间搬运数据
+- **调度器重叠防护** — 正在执行的作业在下次触发时会被跳过,除非显式传入 `allow_overlap=True`
+- **服务器动作 ACL** — `allowed_actions=(...)` 限制 TCP / HTTP 服务器可派发的命令
+- **变量替换** — 动作参数中可选使用 `${env:VAR}` / `${date:%Y-%m-%d}` / `${uuid}` / `${cwd}`,通过 `execute_action(..., substitute=True)` 展开
+- **条件执行** — `FA_if_exists` / `FA_if_newer` / `FA_if_size_gt` 仅在路径守卫通过时执行嵌套动作清单
+- **SQLite 审计日志** — `AuditLog(db_path)` 为每个动作记录 actor / status / duration;通过 `recent` / `count` / `purge` 查询
+- **文件完整性监控** — `IntegrityMonitor` 按 manifest 轮询整棵树,检测到 drift 时触发 callback + 通知
+- **HTTPActionClient SDK** — HTTP 动作服务器的类型化 Python 客户端,具 shared-secret 认证、loopback 守护与 OPTIONS ping
+- **AES-256-GCM 文件加密** — `encrypt_file` / `decrypt_file` 搭配 `generate_key()` / `key_from_password()`(PBKDF2-HMAC-SHA256);JSON 动作 `FA_encrypt_file` / `FA_decrypt_file`
+- **Prometheus metrics 导出器** — `start_metrics_server()` 提供 `automation_file_actions_total{action,status}` 计数器与 `automation_file_action_duration_seconds{action}` 直方图
+- PySide6 GUI(`python -m automation_file ui`)每个后端一个页签,含 JSON 动作执行器,另有 Triggers、Scheduler、实时 Progress 专属页签
+- 功能丰富的 CLI,包含一次性子命令与旧式 JSON 批量标志
+- 项目脚手架(`ProjectBuilder`)协助构建以 executor 为核心的自动化项目
+
+## 架构
+
+```mermaid
+flowchart LR
+ User[User / CLI / JSON batch]
+
+ subgraph Facade["automation_file (facade)"]
+ Public["Public API
execute_action, execute_action_parallel,
validate_action, driver_instance,
start_autocontrol_socket_server,
start_http_action_server, Quota,
retry_on_transient, safe_join, ..."]
+ end
+
+ subgraph Core["core"]
+ Registry[(ActionRegistry
FA_* commands)]
+ Executor[ActionExecutor]
+ Callback[CallbackExecutor]
+ Loader[PackageLoader]
+ Json[json_store]
+ Retry[retry]
+ QuotaMod[quota]
+ Progress[progress
Token + Reporter]
+ end
+
+ subgraph Events["event-driven"]
+ TriggerMod["trigger
watchdog file watcher"]
+ SchedulerMod["scheduler
cron background thread"]
+ end
+
+ subgraph Local["local"]
+ FileOps[file_ops]
+ DirOps[dir_ops]
+ ZipOps[zip_ops]
+ Safe[safe_paths]
+ end
+
+ subgraph Remote["remote"]
+ UrlVal[url_validator]
+ Http[http_download]
+ Drive["google_drive
client + *_ops"]
+ S3["s3"]
+ Azure["azure_blob"]
+ Dropbox["dropbox_api"]
+ SFTP["sftp"]
+ end
+
+ subgraph Server["server"]
+ TCP[TCPActionServer]
+ HTTP[HTTPActionServer]
+ end
+
+ subgraph UI["ui (PySide6)"]
+ Launcher[launch_ui]
+ MainWindow["MainWindow
9-tab control surface"]
+ end
+
+ subgraph Project["project / utils"]
+ Builder[ProjectBuilder]
+ Templates[templates]
+ Discovery[file_discovery]
+ end
+
+ User --> Public
+ User --> Launcher
+ Launcher --> MainWindow
+ MainWindow --> Public
+ Public --> Executor
+ Public --> Callback
+ Public --> Loader
+ Public --> TCP
+ Public --> HTTP
+
+ Executor --> Registry
+ Executor --> Retry
+ Executor --> QuotaMod
+ Callback --> Registry
+ Loader --> Registry
+ TCP --> Executor
+ HTTP --> Executor
+ Executor --> Json
+
+ Registry --> FileOps
+ Registry --> DirOps
+ Registry --> ZipOps
+ Registry --> Safe
+ Registry --> Http
+ Registry --> Drive
+ Registry --> S3
+ Registry --> Azure
+ Registry --> Dropbox
+ Registry --> SFTP
+ Registry --> Builder
+ Registry --> TriggerMod
+ Registry --> SchedulerMod
+ Registry --> Progress
+
+ TriggerMod --> Executor
+ SchedulerMod --> Executor
+ Http --> Progress
+ S3 --> Progress
+
+ Http --> UrlVal
+ Http --> Retry
+ Builder --> Templates
+ Builder --> Discovery
+```
+
+`build_default_registry()` 构建的 `ActionRegistry` 是所有 `FA_*` 命令的唯一
+权威来源。`ActionExecutor`、`CallbackExecutor`、`PackageLoader`、
+`TCPActionServer`、`HTTPActionServer` 都通过同一份共享 registry(以
+`executor.registry` 对外公开)解析命令。
+
+## 安装
+
+```bash
+pip install automation_file
+```
+
+单次安装即涵盖所有后端(Google Drive、S3、Azure Blob、Dropbox、SFTP)以及
+PySide6 GUI — 日常使用不需要任何 extras。
+
+```bash
+pip install "automation_file[dev]" # ruff, mypy, pre-commit, pytest-cov, build, twine
+```
+
+要求:
+- Python 3.10+
+- 内置依赖:`google-api-python-client`、`google-auth-oauthlib`、
+ `requests`、`tqdm`、`boto3`、`azure-storage-blob`、`dropbox`、`paramiko`、
+ `PySide6`、`watchdog`
+
+## 使用方式
+
+### 执行 JSON 动作清单
+```python
+from automation_file import execute_action
+
+execute_action([
+ ["FA_create_file", {"file_path": "test.txt"}],
+ ["FA_copy_file", {"source": "test.txt", "target": "copy.txt"}],
+])
+```
+
+### 验证、干跑、并行
+```python
+from automation_file import execute_action, execute_action_parallel, validate_action
+
+# Fail-fast:只要有任何命令名称未知就在执行前中止。
+execute_action(actions, validate_first=True)
+
+# Dry-run:只记录会被调用的内容,不真的执行。
+execute_action(actions, dry_run=True)
+
+# Parallel:通过 thread pool 并行执行独立动作。
+execute_action_parallel(actions, max_workers=4)
+
+# 手动验证 — 返回解析后的名称列表。
+names = validate_action(actions)
+```
+
+### 初始化 Google Drive 并上传
+```python
+from automation_file import driver_instance, drive_upload_to_drive
+
+driver_instance.later_init("token.json", "credentials.json")
+drive_upload_to_drive("example.txt")
+```
+
+### 经验证的 HTTP 下载(含重试)
+```python
+from automation_file import download_file
+
+download_file("https://example.com/file.zip", "file.zip")
+```
+
+### 启动 loopback TCP 服务器(可选 shared-secret 验证)
+```python
+from automation_file import start_autocontrol_socket_server
+
+server = start_autocontrol_socket_server(
+ host="127.0.0.1", port=9943, shared_secret="optional-secret",
+)
+```
+
+设定 `shared_secret` 时,客户端每个包都必须以 `AUTH \n` 为前缀。
+若要绑定非 loopback 地址必须明确传入 `allow_non_loopback=True`。
+
+### 启动 HTTP 动作服务器
+```python
+from automation_file import start_http_action_server
+
+server = start_http_action_server(
+ host="127.0.0.1", port=9944, shared_secret="optional-secret",
+)
+
+# curl -H 'Authorization: Bearer optional-secret' \
+# -d '[["FA_create_dir",{"dir_path":"x"}]]' \
+# http://127.0.0.1:9944/actions
+```
+
+### Retry 与 quota 原语
+```python
+from automation_file import retry_on_transient, Quota
+
+@retry_on_transient(max_attempts=5, backoff_base=0.5)
+def flaky_network_call(): ...
+
+quota = Quota(max_bytes=50 * 1024 * 1024, max_seconds=30.0)
+with quota.time_budget("bulk-upload"):
+ bulk_upload_work()
+```
+
+### 路径穿越防护
+```python
+from automation_file import safe_join
+
+target = safe_join("/data/jobs", user_supplied_path)
+# 若解析后的路径逃出 /data/jobs 会抛出 PathTraversalException。
+```
+
+### Cloud / SFTP 后端
+每个后端都会由 `build_default_registry()` 自动注册,因此 `FA_s3_*`、
+`FA_azure_blob_*`、`FA_dropbox_*`、`FA_sftp_*` 动作开箱即用 — 不需要另外调用
+`register_*_ops`。
+
+```python
+from automation_file import execute_action, s3_instance
+
+s3_instance.later_init(region_name="us-east-1")
+
+execute_action([
+ ["FA_s3_upload_file", {"local_path": "report.csv", "bucket": "reports", "key": "report.csv"}],
+])
+```
+
+所有后端(`s3`、`azure_blob`、`dropbox_api`、`sftp`)都提供相同的五组操作:
+`upload_file`、`upload_dir`、`download_file`、`delete_*`、`list_*`。
+SFTP 使用 `paramiko.RejectPolicy` — 未知主机会被拒绝,不会自动加入。
+
+### 文件监听触发
+每当被监听路径发生文件系统事件,就执行动作清单:
+
+```python
+from automation_file import watch_start, watch_stop
+
+watch_start(
+ name="inbox-sweeper",
+ path="/data/inbox",
+ action_list=[["FA_copy_all_file_to_dir", {"source_dir": "/data/inbox",
+ "target_dir": "/data/processed"}]],
+ events=["created", "modified"],
+ recursive=False,
+)
+# 稍后:
+watch_stop("inbox-sweeper")
+```
+
+`FA_watch_start` / `FA_watch_stop` / `FA_watch_stop_all` / `FA_watch_list`
+让 JSON 动作清单能使用相同的生命周期。
+
+### Cron 调度器
+以纯标准库的 5 字段 cron 解析器执行周期性动作清单:
+
+```python
+from automation_file import schedule_add
+
+schedule_add(
+ name="nightly-snapshot",
+ cron_expression="0 2 * * *", # 每天本地时间 02:00
+ action_list=[["FA_zip_dir", {"dir_we_want_to_zip": "/data",
+ "zip_name": "/backup/data_nightly"}]],
+)
+```
+
+支持 `*`、确切值、`a-b` 范围、逗号列表、`*/n` 步进语法,以及 `jan..dec` /
+`sun..sat` 别名。JSON 动作:`FA_schedule_add`、`FA_schedule_remove`、
+`FA_schedule_remove_all`、`FA_schedule_list`。
+
+### 传输进度 + 取消
+HTTP 与 S3 传输支持可选的 `progress_name` 关键字参数:
+
+```python
+from automation_file import download_file, progress_cancel
+
+download_file("https://example.com/big.bin", "big.bin",
+ progress_name="big-download")
+
+# 从另一个线程或 GUI:
+progress_cancel("big-download")
+```
+
+共享的 `progress_registry` 通过 `progress_list()` 以及 `FA_progress_list` /
+`FA_progress_cancel` / `FA_progress_clear` JSON 动作提供实时快照。GUI 的
+**Progress** 页签每半秒轮询一次 registry。
+
+### 快速文件搜索
+若 OS 索引器可用就直接查询(macOS 的 `mdfind`、Linux 的 `locate` /
+`plocate`、Windows 的 Everything `es.exe`),否则回退到以流式 `os.scandir`
+遍历。不需要额外依赖。
+
+```python
+from automation_file import fast_find, scandir_find, has_os_index
+
+# 可用时使用 OS 索引器,否则回退到 scandir。
+results = fast_find("/var/log", "*.log", limit=100)
+
+# 强制使用可移植路径(跳过 OS 索引器)。
+results = fast_find("/data", "report_*.csv", use_index=False)
+
+# 流式 — 不需要遍历整棵树就能提前停止。
+for path in scandir_find("/data", "*.csv"):
+ if "2026" in path:
+ break
+```
+
+`FA_fast_find` 将同一个函数提供给 JSON 动作清单:
+
+```json
+[["FA_fast_find", {"root": "/var/log", "pattern": "*.log", "limit": 50}]]
+```
+
+### 校验和 + 完整性验证
+流式处理任何 `hashlib` 算法;`verify_checksum` 使用 `hmac.compare_digest`
+(常数时间)比较摘要:
+
+```python
+from automation_file import file_checksum, verify_checksum
+
+digest = file_checksum("bundle.tar.gz") # 默认为 sha256
+verify_checksum("bundle.tar.gz", digest) # -> True
+verify_checksum("bundle.tar.gz", "deadbeef...", algorithm="blake2b")
+```
+
+同时以 `FA_file_checksum` / `FA_verify_checksum` 提供 JSON 动作。
+
+### 可续传 HTTP 下载
+`download_file(resume=True)` 会写入 `.part` 并在下次尝试时发送
+`Range: bytes=-`。配合 `expected_sha256=` 可在下载完成后立即验证完整性:
+
+```python
+from automation_file import download_file
+
+download_file(
+ "https://example.com/big.bin",
+ "big.bin",
+ resume=True,
+ expected_sha256="3b0c44298fc1...",
+)
+```
+
+### 重复文件查找器
+三阶段管线:按大小分桶 → 64 KiB 部分哈希 → 完整哈希。大小唯一的文件完全不会
+被哈希:
+
+```python
+from automation_file import find_duplicates
+
+groups = find_duplicates("/data", min_size=1024)
+# list[list[str]] — 每个内层 list 是一组相同内容的文件,按大小降序排序。
+```
+
+`FA_find_duplicates` 以相同调用提供给 JSON。
+
+### 增量目录同步
+`sync_dir` 以只复制新增或变更文件的方式将 `src` 镜像到 `dst`。变更检测默认
+为 `(size, mtime)`;当 mtime 不可靠时可传入 `compare="checksum"`。`dst`
+下多余的文件默认保留 — 传入 `delete=True` 才会清理(`dry_run=True` 可先
+预览):
+
+```python
+from automation_file import sync_dir
+
+summary = sync_dir("/data/src", "/data/dst", delete=True)
+# summary: {"copied": [...], "skipped": [...], "deleted": [...],
+# "errors": [...], "dry_run": False}
+```
+
+Symlink 会以 symlink 形式重建而非被跟随,因此指向树外的链接不会拖垮镜像。
+JSON 动作:`FA_sync_dir`。
+
+### 目录 manifest
+将树下每个文件的校验和写入 JSON manifest,之后再验证树是否变动:
+
+```python
+from automation_file import write_manifest, verify_manifest
+
+write_manifest("/release/payload", "/release/MANIFEST.json")
+
+# 稍后…
+result = verify_manifest("/release/payload", "/release/MANIFEST.json")
+if not result["ok"]:
+ raise SystemExit(f"manifest mismatch: {result}")
+```
+
+`result` 以 `matched`、`missing`、`modified`、`extra` 分别报告列表。
+多余文件(`extra`)不会让验证失败(对齐 `sync_dir` 默认不删除的行为),
+`missing` 与 `modified` 则会。JSON 动作:`FA_write_manifest`、
+`FA_verify_manifest`。
+
+### 通知
+通过 webhook、Slack 或 SMTP 推送一次性消息,或在 trigger / scheduler
+失败时自动通知:
+
+```python
+from automation_file import (
+ SlackSink, WebhookSink, EmailSink,
+ notification_manager, notify_send,
+)
+
+notification_manager.register(SlackSink("https://hooks.slack.com/services/T/B/X"))
+notify_send("deploy complete", body="rev abc123", level="info")
+```
+
+每个 sink 都遵循相同的 `send(subject, body, level)` 合约。Fanout 的
+`NotificationManager` 会做单 sink 错误隔离(一个坏掉的 sink 不会影响
+其他 sink)、滑动窗口去重(避免卡住的 trigger 刷屏),并对每个
+webhook / Slack URL 进行 SSRF 验证。Scheduler 与 trigger 派发器在失败时
+会以 `level="error"` 自动通知 — 只要注册 sink 就能拿到生产环境告警。
+JSON 动作:`FA_notify_send`、`FA_notify_list`。
+
+### 配置文件与密钥提供者
+在 `automation_file.toml` 一次声明 sink 与默认值。密钥引用在加载时由
+环境变量或文件根目录(Docker / K8s 风格)解析:
+
+```toml
+# automation_file.toml
+
+[secrets]
+file_root = "/run/secrets"
+
+[defaults]
+dedup_seconds = 120
+
+[[notify.sinks]]
+type = "slack"
+name = "team-alerts"
+webhook_url = "${env:SLACK_WEBHOOK}"
+
+[[notify.sinks]]
+type = "email"
+name = "ops-email"
+host = "smtp.example.com"
+port = 587
+sender = "alerts@example.com"
+recipients = ["ops@example.com"]
+username = "${env:SMTP_USER}"
+password = "${file:smtp_password}"
+```
+
+```python
+from automation_file import AutomationConfig, notification_manager
+
+config = AutomationConfig.load("automation_file.toml")
+config.apply_to(notification_manager)
+```
+
+未解析的 `${…}` 引用会抛出 `SecretNotFoundException`,而不是默默变成空
+字符串。可组合 `ChainedSecretProvider` / `EnvSecretProvider` /
+`FileSecretProvider` 构建自定义提供者链,并以
+`AutomationConfig.load(path, provider=…)` 传入。
+
+### 动作清单变量替换
+以 `substitute=True` 启用后,`${…}` 引用会在派发时展开:
+
+```python
+from automation_file import execute_action
+
+execute_action(
+ [["FA_create_file", {"file_path": "reports/${date:%Y-%m-%d}/${uuid}.txt"}]],
+ substitute=True,
+)
+```
+
+支持 `${env:VAR}`、`${date:FMT}`(strftime)、`${uuid}`、`${cwd}`。未知名称
+会抛出 `SubstitutionException`,不会默默变成空字符串。
+
+### 条件执行
+只在路径守卫通过时执行嵌套动作清单:
+
+```json
+[
+ ["FA_if_exists", {"path": "/data/in/job.json",
+ "then": [["FA_copy_file", {"source": "/data/in/job.json",
+ "target": "/data/processed/job.json"}]]}],
+ ["FA_if_newer", {"source": "/src", "target": "/dst",
+ "then": [["FA_sync_dir", {"src": "/src", "dst": "/dst"}]]}],
+ ["FA_if_size_gt", {"path": "/logs/app.log", "size": 10485760,
+ "then": [["FA_run_shell", {"command": ["logrotate", "/logs/app.log"]}]]}]
+]
+```
+
+### SQLite 审计日志
+`AuditLog` 以短连接 + 模块级锁为每个动作写入一条记录:
+
+```python
+from automation_file import AuditLog
+
+audit = AuditLog("audit.sqlite3")
+audit.record(action="FA_copy_file", actor="ops",
+ status="ok", duration_ms=12, detail={"src": "a", "dst": "b"})
+
+for row in audit.recent(limit=50):
+ print(row["timestamp"], row["action"], row["status"])
+```
+
+### 文件完整性监控
+按 manifest 轮询整棵树,检测到 drift 时触发 callback + 通知:
+
+```python
+from automation_file import IntegrityMonitor, notification_manager, write_manifest
+
+write_manifest("/srv/site", "/srv/MANIFEST.json")
+
+mon = IntegrityMonitor(
+ root="/srv/site",
+ manifest_path="/srv/MANIFEST.json",
+ interval=60.0,
+ manager=notification_manager,
+ on_drift=lambda summary: print("drift:", summary),
+)
+mon.start()
+```
+
+加载 manifest 时的错误也会被视为 drift,让篡改与配置问题走同一条处理
+路径。
+
+### AES-256-GCM 文件加密
+带认证的加密与自描述封包格式。可由密码派生密钥或直接生成密钥:
+
+```python
+from automation_file import encrypt_file, decrypt_file, key_from_password
+
+key = key_from_password("correct horse battery staple", salt=b"app-salt-v1")
+encrypt_file("secret.pdf", "secret.pdf.enc", key, associated_data=b"v1")
+decrypt_file("secret.pdf.enc", "secret.pdf", key, associated_data=b"v1")
+```
+
+篡改由 GCM 认证 tag 检测,以 `CryptoException("authentication failed")`
+报告。JSON 动作:`FA_encrypt_file`、`FA_decrypt_file`。
+
+### HTTPActionClient Python SDK
+HTTP 动作服务器的类型化客户端;默认强制 loopback,并自动携带 shared
+secret:
+
+```python
+from automation_file import HTTPActionClient
+
+with HTTPActionClient("http://127.0.0.1:9944", shared_secret="s3cr3t") as client:
+ client.ping() # OPTIONS /actions
+ result = client.execute([["FA_create_dir", {"dir_path": "x"}]])
+```
+
+认证失败会转换为 `HTTPActionClientException(kind="unauthorized")`;
+404 则表示服务器存在但未对外提供 `/actions`。
+
+### Prometheus metrics 导出器
+`ActionExecutor` 为每个动作记录一条计数器与一条直方图样本。在 loopback
+`/metrics` 端点提供:
+
+```python
+from automation_file import start_metrics_server
+
+server = start_metrics_server(host="127.0.0.1", port=9945)
+# curl http://127.0.0.1:9945/metrics
+```
+
+导出 `automation_file_actions_total{action,status}` 以及
+`automation_file_action_duration_seconds{action}`。若要绑定非 loopback
+地址必须显式传入 `allow_non_loopback=True`。
+
+### DAG 动作执行器
+按依赖顺序执行动作;独立分支通过线程池并行展开。每个节点的形式为
+`{"id": ..., "action": [...], "depends_on": [...]}`:
+
+```python
+from automation_file import execute_action_dag
+
+execute_action_dag([
+ {"id": "fetch", "action": ["FA_download_file",
+ ["https://example.com/src.tar.gz", "src.tar.gz"]]},
+ {"id": "verify", "action": ["FA_verify_checksum",
+ ["src.tar.gz", "3b0c44298fc1..."]],
+ "depends_on": ["fetch"]},
+ {"id": "unpack", "action": ["FA_unzip_file", ["src.tar.gz", "src"]],
+ "depends_on": ["verify"]},
+])
+```
+
+若 `verify` 抛出异常,默认情况下 `unpack` 会被标记为 `skipped`。传入
+`fail_fast=False` 可让后代节点仍然执行。JSON 动作:`FA_execute_action_dag`。
+
+### Entry-point 插件
+第三方包通过 `pyproject.toml` 声明动作:
+
+```toml
+[project.entry-points."automation_file.actions"]
+my_plugin = "my_plugin:register"
+```
+
+其中 `register` 是一个零参数的可调用对象,返回 `dict[str, Callable]`。只要
+包被安装在同一个虚拟环境,这些命令就会出现在每次新建的 registry 中:
+
+```python
+# my_plugin/__init__.py
+def greet(name: str) -> str:
+ return f"hello {name}"
+
+def register() -> dict:
+ return {"FA_greet": greet}
+```
+
+```python
+# 执行 `pip install my_plugin` 之后
+from automation_file import execute_action
+execute_action([["FA_greet", {"name": "world"}]])
+```
+
+插件失败(导入错误、factory 异常、返回类型不正确、registry 拒绝)都会被记录
+并吞掉 — 一个坏掉的插件不会影响整个库。
+
+### GUI
+```bash
+python -m automation_file ui # 或:python main_ui.py
+```
+
+```python
+from automation_file import launch_ui
+launch_ui()
+```
+
+页签:Home、Local、Transfer、Progress、JSON actions、Triggers、Scheduler、
+Servers。底部常驻的 log 面板实时流式输出每一笔结果与错误。
+
+### 以 executor 为核心构建项目脚手架
+```python
+from automation_file import create_project_dir
+
+create_project_dir("my_workflow")
+```
+
+## CLI
+
+```bash
+# 子命令(一次性操作)
+python -m automation_file ui
+python -m automation_file zip ./src out.zip --dir
+python -m automation_file unzip out.zip ./restored
+python -m automation_file download https://example.com/file.bin file.bin
+python -m automation_file create-file hello.txt --content "hi"
+python -m automation_file server --host 127.0.0.1 --port 9943
+python -m automation_file http-server --host 127.0.0.1 --port 9944
+python -m automation_file drive-upload my.txt --token token.json --credentials creds.json
+
+# 旧式标志(JSON 动作清单)
+python -m automation_file --execute_file actions.json
+python -m automation_file --execute_dir ./actions/
+python -m automation_file --execute_str '[["FA_create_dir",{"dir_path":"x"}]]'
+python -m automation_file --create_project ./my_project
+```
+
+## JSON 动作格式
+
+每一项动作可以是单纯的命令名称、`[name, kwargs]` 组合,或 `[name, args]`
+列表:
+
+```json
+[
+ ["FA_create_file", {"file_path": "test.txt"}],
+ ["FA_drive_upload_to_drive", {"file_path": "test.txt"}],
+ ["FA_drive_search_all_file"]
+]
+```
+
+## 文档
+
+完整 API 文档位于 `docs/`,可用 Sphinx 生成:
+
+```bash
+pip install -r docs/requirements.txt
+sphinx-build -b html docs/source docs/_build/html
+```
+
+架构笔记、代码规范与安全考量请参见 [`CLAUDE.md`](CLAUDE.md)。
diff --git a/README.zh-TW.md b/README.zh-TW.md
new file mode 100644
index 0000000..ee46f63
--- /dev/null
+++ b/README.zh-TW.md
@@ -0,0 +1,726 @@
+# FileAutomation
+
+[English](README.md) | **繁體中文** | [简体中文](README.zh-CN.md)
+
+一套模組化的自動化框架,涵蓋本機檔案 / 目錄 / ZIP 操作、經 SSRF 驗證的 HTTP
+下載、遠端儲存(Google Drive、S3、Azure Blob、Dropbox、SFTP),以及透過內建
+TCP / HTTP 伺服器執行的 JSON 驅動動作。內附 PySide6 GUI,每個功能都有對應
+分頁。所有公開 API 皆由頂層 `automation_file` facade 統一匯出。
+
+- 本機檔案 / 目錄 / ZIP 操作,內建路徑穿越防護(`safe_join`)
+- 經 SSRF 驗證的 HTTP 下載,支援重試與大小 / 時間上限
+- Google Drive CRUD(上傳、下載、搜尋、刪除、分享、資料夾)
+- 一等公民的 S3、Azure Blob、Dropbox、SFTP 後端 — 預設安裝
+- JSON 動作清單由共用的 `ActionExecutor` 執行 — 支援驗證、乾跑、平行
+- Loopback 優先的 TCP **與** HTTP 伺服器,接受 JSON 指令批次並可選 shared-secret 驗證
+- 可靠性原語:`retry_on_transient` 裝飾器、`Quota` 大小 / 時間預算
+- **檔案監看觸發** — 當路徑變動時執行動作清單(`FA_watch_*`)
+- **Cron 排程器** — 僅用標準函式庫的 5 欄位解析器執行週期性動作清單(`FA_schedule_*`)
+- **傳輸進度 + 取消** — HTTP 與 S3 傳輸可選的 `progress_name` 掛鉤(`FA_progress_*`)
+- **快速檔案搜尋** — OS 索引快速路徑(`mdfind` / `locate` / `es.exe`)搭配串流式 `scandir` 備援(`FA_fast_find`)
+- **檢查碼 + 完整性驗證** — 串流式 `file_checksum` / `verify_checksum`,支援任何 `hashlib` 演算法;`download_file(expected_sha256=...)` 於下載完成後立即驗證(`FA_file_checksum`、`FA_verify_checksum`)
+- **可續傳 HTTP 下載** — `download_file(resume=True)` 寫入 `.part` 並傳送 `Range: bytes=-`,讓中斷的傳輸繼續而非從頭開始
+- **重複檔案尋找器** — 三階段 size → 部分雜湊 → 完整雜湊管線;大小唯一的檔案完全不會被雜湊(`FA_find_duplicates`)
+- **DAG 動作執行器** — 依相依順序拓撲排程,獨立分支平行展開,失敗時其後代預設標記跳過(`FA_execute_action_dag`)
+- **Entry-point 外掛** — 第三方套件透過 `[project.entry-points."automation_file.actions"]` 註冊自訂 `FA_*` 動作;`build_default_registry()` 會自動載入
+- **增量目錄同步** — rsync 風格鏡像,支援 size+mtime 或 checksum 變更偵測,選擇性刪除多餘檔案,支援乾跑(`FA_sync_dir`)
+- **目錄 manifest** — 以 JSON 快照記錄樹下每個檔案的檢查碼,驗證時分開回報 missing / modified / extra(`FA_write_manifest`、`FA_verify_manifest`)
+- **通知 sink** — webhook / Slack / SMTP / Telegram / Discord / Teams / PagerDuty,fanout 管理器做個別 sink 錯誤隔離與滑動視窗去重;trigger + scheduler 失敗時自動通知(`FA_notify_send`、`FA_notify_list`)
+- **設定檔 + 秘密提供者** — 在 `automation_file.toml` 宣告通知 sink / 預設值;`${env:…}` 與 `${file:…}` 參考透過 Env / File / Chained 提供者抽象解析,讓秘密不留在檔案裡
+- **設定熱重載** — `ConfigWatcher` 輪詢 `automation_file.toml`,變更時即時套用 sink / 預設值,無需重啟
+- **Shell / grep / JSON 編輯 / tar / 備份輪替** — `FA_run_shell`(參數列表式 subprocess,含逾時)、`FA_grep`(串流文字搜尋)、`FA_json_get` / `FA_json_set` / `FA_json_delete`(原地 JSON 編輯)、`FA_create_tar` / `FA_extract_tar`、`FA_rotate_backups`
+- **FTP / FTPS 後端** — 純 FTP 或透過 `FTP_TLS.auth()` 的顯式 FTPS;自動註冊為 `FA_ftp_*`
+- **跨後端複製** — `FA_copy_between` 透過 `local://`、`s3://`、`drive://`、`azure://`、`dropbox://`、`sftp://`、`ftp://` URI 在任意兩個後端之間搬運資料
+- **排程器重疊防護** — 正在執行的工作在下次觸發時會被跳過,除非明確傳入 `allow_overlap=True`
+- **伺服器動作 ACL** — `allowed_actions=(...)` 限制 TCP / HTTP 伺服器可派送的指令
+- **變數替換** — 動作參數中可選使用 `${env:VAR}` / `${date:%Y-%m-%d}` / `${uuid}` / `${cwd}`,透過 `execute_action(..., substitute=True)` 展開
+- **條件式執行** — `FA_if_exists` / `FA_if_newer` / `FA_if_size_gt` 僅在路徑守護通過時執行巢狀動作清單
+- **SQLite 稽核日誌** — `AuditLog(db_path)` 為每個動作記錄 actor / status / duration;以 `recent` / `count` / `purge` 查詢
+- **檔案完整性監控** — `IntegrityMonitor` 依 manifest 輪詢整棵樹,偵測到 drift 時觸發 callback + 通知
+- **HTTPActionClient SDK** — HTTP 動作伺服器的型別化 Python 客戶端,具 shared-secret 驗證、loopback 防護與 OPTIONS ping
+- **AES-256-GCM 檔案加密** — `encrypt_file` / `decrypt_file` 搭配 `generate_key()` / `key_from_password()`(PBKDF2-HMAC-SHA256);JSON 動作 `FA_encrypt_file` / `FA_decrypt_file`
+- **Prometheus metrics 匯出器** — `start_metrics_server()` 提供 `automation_file_actions_total{action,status}` 計數器與 `automation_file_action_duration_seconds{action}` 直方圖
+- PySide6 GUI(`python -m automation_file ui`)每個後端一個分頁,含 JSON 動作執行器,另有 Triggers、Scheduler、即時 Progress 專屬分頁
+- 功能豐富的 CLI,包含一次性子指令與舊式 JSON 批次旗標
+- 專案鷹架(`ProjectBuilder`)協助建立以 executor 為核心的自動化專案
+
+## 架構
+
+```mermaid
+flowchart LR
+ User[User / CLI / JSON batch]
+
+ subgraph Facade["automation_file (facade)"]
+ Public["Public API
execute_action, execute_action_parallel,
validate_action, driver_instance,
start_autocontrol_socket_server,
start_http_action_server, Quota,
retry_on_transient, safe_join, ..."]
+ end
+
+ subgraph Core["core"]
+ Registry[(ActionRegistry
FA_* commands)]
+ Executor[ActionExecutor]
+ Callback[CallbackExecutor]
+ Loader[PackageLoader]
+ Json[json_store]
+ Retry[retry]
+ QuotaMod[quota]
+ Progress[progress
Token + Reporter]
+ end
+
+ subgraph Events["event-driven"]
+ TriggerMod["trigger
watchdog file watcher"]
+ SchedulerMod["scheduler
cron background thread"]
+ end
+
+ subgraph Local["local"]
+ FileOps[file_ops]
+ DirOps[dir_ops]
+ ZipOps[zip_ops]
+ Safe[safe_paths]
+ end
+
+ subgraph Remote["remote"]
+ UrlVal[url_validator]
+ Http[http_download]
+ Drive["google_drive
client + *_ops"]
+ S3["s3"]
+ Azure["azure_blob"]
+ Dropbox["dropbox_api"]
+ SFTP["sftp"]
+ end
+
+ subgraph Server["server"]
+ TCP[TCPActionServer]
+ HTTP[HTTPActionServer]
+ end
+
+ subgraph UI["ui (PySide6)"]
+ Launcher[launch_ui]
+ MainWindow["MainWindow
9-tab control surface"]
+ end
+
+ subgraph Project["project / utils"]
+ Builder[ProjectBuilder]
+ Templates[templates]
+ Discovery[file_discovery]
+ end
+
+ User --> Public
+ User --> Launcher
+ Launcher --> MainWindow
+ MainWindow --> Public
+ Public --> Executor
+ Public --> Callback
+ Public --> Loader
+ Public --> TCP
+ Public --> HTTP
+
+ Executor --> Registry
+ Executor --> Retry
+ Executor --> QuotaMod
+ Callback --> Registry
+ Loader --> Registry
+ TCP --> Executor
+ HTTP --> Executor
+ Executor --> Json
+
+ Registry --> FileOps
+ Registry --> DirOps
+ Registry --> ZipOps
+ Registry --> Safe
+ Registry --> Http
+ Registry --> Drive
+ Registry --> S3
+ Registry --> Azure
+ Registry --> Dropbox
+ Registry --> SFTP
+ Registry --> Builder
+ Registry --> TriggerMod
+ Registry --> SchedulerMod
+ Registry --> Progress
+
+ TriggerMod --> Executor
+ SchedulerMod --> Executor
+ Http --> Progress
+ S3 --> Progress
+
+ Http --> UrlVal
+ Http --> Retry
+ Builder --> Templates
+ Builder --> Discovery
+```
+
+`build_default_registry()` 建立的 `ActionRegistry` 是所有 `FA_*` 指令的唯一
+權威來源。`ActionExecutor`、`CallbackExecutor`、`PackageLoader`、
+`TCPActionServer`、`HTTPActionServer` 都透過同一份共用 registry(以
+`executor.registry` 對外公開)解析指令。
+
+## 安裝
+
+```bash
+pip install automation_file
+```
+
+單一安裝即涵蓋所有後端(Google Drive、S3、Azure Blob、Dropbox、SFTP)以及
+PySide6 GUI — 日常使用不需要任何 extras。
+
+```bash
+pip install "automation_file[dev]" # ruff, mypy, pre-commit, pytest-cov, build, twine
+```
+
+需求:
+- Python 3.10+
+- 內建相依套件:`google-api-python-client`、`google-auth-oauthlib`、
+ `requests`、`tqdm`、`boto3`、`azure-storage-blob`、`dropbox`、`paramiko`、
+ `PySide6`、`watchdog`
+
+## 使用方式
+
+### 執行 JSON 動作清單
+```python
+from automation_file import execute_action
+
+execute_action([
+ ["FA_create_file", {"file_path": "test.txt"}],
+ ["FA_copy_file", {"source": "test.txt", "target": "copy.txt"}],
+])
+```
+
+### 驗證、乾跑、平行
+```python
+from automation_file import execute_action, execute_action_parallel, validate_action
+
+# Fail-fast:只要有任何指令名稱未知就在執行前中止。
+execute_action(actions, validate_first=True)
+
+# Dry-run:只記錄會被呼叫的內容,不真的執行。
+execute_action(actions, dry_run=True)
+
+# Parallel:透過 thread pool 平行執行獨立動作。
+execute_action_parallel(actions, max_workers=4)
+
+# 手動驗證 — 回傳解析後的名稱清單。
+names = validate_action(actions)
+```
+
+### 初始化 Google Drive 並上傳
+```python
+from automation_file import driver_instance, drive_upload_to_drive
+
+driver_instance.later_init("token.json", "credentials.json")
+drive_upload_to_drive("example.txt")
+```
+
+### 經驗證的 HTTP 下載(含重試)
+```python
+from automation_file import download_file
+
+download_file("https://example.com/file.zip", "file.zip")
+```
+
+### 啟動 loopback TCP 伺服器(可選 shared-secret 驗證)
+```python
+from automation_file import start_autocontrol_socket_server
+
+server = start_autocontrol_socket_server(
+ host="127.0.0.1", port=9943, shared_secret="optional-secret",
+)
+```
+
+設定 `shared_secret` 時,客戶端每個封包都必須以 `AUTH \n` 為前綴。
+若要綁定非 loopback 位址必須明確傳入 `allow_non_loopback=True`。
+
+### 啟動 HTTP 動作伺服器
+```python
+from automation_file import start_http_action_server
+
+server = start_http_action_server(
+ host="127.0.0.1", port=9944, shared_secret="optional-secret",
+)
+
+# curl -H 'Authorization: Bearer optional-secret' \
+# -d '[["FA_create_dir",{"dir_path":"x"}]]' \
+# http://127.0.0.1:9944/actions
+```
+
+### Retry 與 quota 原語
+```python
+from automation_file import retry_on_transient, Quota
+
+@retry_on_transient(max_attempts=5, backoff_base=0.5)
+def flaky_network_call(): ...
+
+quota = Quota(max_bytes=50 * 1024 * 1024, max_seconds=30.0)
+with quota.time_budget("bulk-upload"):
+ bulk_upload_work()
+```
+
+### 路徑穿越防護
+```python
+from automation_file import safe_join
+
+target = safe_join("/data/jobs", user_supplied_path)
+# 若解析後的路徑跳脫 /data/jobs 會拋出 PathTraversalException。
+```
+
+### Cloud / SFTP 後端
+每個後端都會由 `build_default_registry()` 自動註冊,因此 `FA_s3_*`、
+`FA_azure_blob_*`、`FA_dropbox_*`、`FA_sftp_*` 動作開箱即用 — 不需要另外呼叫
+`register_*_ops`。
+
+```python
+from automation_file import execute_action, s3_instance
+
+s3_instance.later_init(region_name="us-east-1")
+
+execute_action([
+ ["FA_s3_upload_file", {"local_path": "report.csv", "bucket": "reports", "key": "report.csv"}],
+])
+```
+
+所有後端(`s3`、`azure_blob`、`dropbox_api`、`sftp`)都對外提供相同的五組
+操作:`upload_file`、`upload_dir`、`download_file`、`delete_*`、`list_*`。
+SFTP 使用 `paramiko.RejectPolicy` — 未知主機會被拒絕,不會自動加入。
+
+### 檔案監看觸發
+每當被監看路徑發生檔案系統事件,就執行動作清單:
+
+```python
+from automation_file import watch_start, watch_stop
+
+watch_start(
+ name="inbox-sweeper",
+ path="/data/inbox",
+ action_list=[["FA_copy_all_file_to_dir", {"source_dir": "/data/inbox",
+ "target_dir": "/data/processed"}]],
+ events=["created", "modified"],
+ recursive=False,
+)
+# 稍後:
+watch_stop("inbox-sweeper")
+```
+
+`FA_watch_start` / `FA_watch_stop` / `FA_watch_stop_all` / `FA_watch_list`
+讓 JSON 動作清單能使用相同的生命週期。
+
+### Cron 排程器
+以純標準函式庫的 5 欄位 cron 解析器執行週期性動作清單:
+
+```python
+from automation_file import schedule_add
+
+schedule_add(
+ name="nightly-snapshot",
+ cron_expression="0 2 * * *", # 每天本地時間 02:00
+ action_list=[["FA_zip_dir", {"dir_we_want_to_zip": "/data",
+ "zip_name": "/backup/data_nightly"}]],
+)
+```
+
+支援 `*`、確切值、`a-b` 範圍、逗號清單、`*/n` 步進語法,以及 `jan..dec` /
+`sun..sat` 別名。JSON 動作:`FA_schedule_add`、`FA_schedule_remove`、
+`FA_schedule_remove_all`、`FA_schedule_list`。
+
+### 傳輸進度 + 取消
+HTTP 與 S3 傳輸支援可選的 `progress_name` 關鍵字參數:
+
+```python
+from automation_file import download_file, progress_cancel
+
+download_file("https://example.com/big.bin", "big.bin",
+ progress_name="big-download")
+
+# 從另一個執行緒或 GUI:
+progress_cancel("big-download")
+```
+
+共用的 `progress_registry` 透過 `progress_list()` 以及 `FA_progress_list` /
+`FA_progress_cancel` / `FA_progress_clear` JSON 動作提供即時快照。GUI 的
+**Progress** 分頁每半秒輪詢一次 registry。
+
+### 快速檔案搜尋
+若 OS 索引器可用就直接查詢(macOS 的 `mdfind`、Linux 的 `locate` /
+`plocate`、Windows 的 Everything `es.exe`),否則退回以串流 `os.scandir`
+走訪。不需要額外相依套件。
+
+```python
+from automation_file import fast_find, scandir_find, has_os_index
+
+# 可用時使用 OS 索引器,否則退回 scandir。
+results = fast_find("/var/log", "*.log", limit=100)
+
+# 強制使用可攜路徑(跳過 OS 索引器)。
+results = fast_find("/data", "report_*.csv", use_index=False)
+
+# 串流 — 不需走訪整棵樹就能提早停止。
+for path in scandir_find("/data", "*.csv"):
+ if "2026" in path:
+ break
+```
+
+`FA_fast_find` 將同一個函式提供給 JSON 動作清單:
+
+```json
+[["FA_fast_find", {"root": "/var/log", "pattern": "*.log", "limit": 50}]]
+```
+
+### 檢查碼 + 完整性驗證
+串流式處理任何 `hashlib` 演算法;`verify_checksum` 以 `hmac.compare_digest`
+(常數時間)比對摘要:
+
+```python
+from automation_file import file_checksum, verify_checksum
+
+digest = file_checksum("bundle.tar.gz") # 預設為 sha256
+verify_checksum("bundle.tar.gz", digest) # -> True
+verify_checksum("bundle.tar.gz", "deadbeef...", algorithm="blake2b")
+```
+
+同時以 `FA_file_checksum` / `FA_verify_checksum` 提供 JSON 動作。
+
+### 可續傳 HTTP 下載
+`download_file(resume=True)` 會寫入 `.part` 並於下次嘗試傳送
+`Range: bytes=-`。搭配 `expected_sha256=` 可在下載完成後立刻驗證完整性:
+
+```python
+from automation_file import download_file
+
+download_file(
+ "https://example.com/big.bin",
+ "big.bin",
+ resume=True,
+ expected_sha256="3b0c44298fc1...",
+)
+```
+
+### 重複檔案尋找器
+三階段管線:依大小分桶 → 64 KiB 部分雜湊 → 完整雜湊。大小唯一的檔案完全不會
+被雜湊:
+
+```python
+from automation_file import find_duplicates
+
+groups = find_duplicates("/data", min_size=1024)
+# list[list[str]] — 每個內層 list 是一組相同內容的檔案,以大小遞減排序。
+```
+
+`FA_find_duplicates` 以相同呼叫提供給 JSON。
+
+### 增量目錄同步
+`sync_dir` 以只複製新增或變更檔案的方式將 `src` 鏡像至 `dst`。變更偵測預設
+為 `(size, mtime)`;當 mtime 不可信時可傳入 `compare="checksum"`。`dst`
+下多餘的檔案預設保留 — 傳入 `delete=True` 才會清除(`dry_run=True` 可先
+預覽):
+
+```python
+from automation_file import sync_dir
+
+summary = sync_dir("/data/src", "/data/dst", delete=True)
+# summary: {"copied": [...], "skipped": [...], "deleted": [...],
+# "errors": [...], "dry_run": False}
+```
+
+Symlink 會以 symlink 形式重建而非被跟隨,因此指向樹外的連結不會拖垮鏡像。
+JSON 動作:`FA_sync_dir`。
+
+### 目錄 manifest
+將樹下每個檔案的檢查碼寫入 JSON manifest,之後再驗證樹是否變動:
+
+```python
+from automation_file import write_manifest, verify_manifest
+
+write_manifest("/release/payload", "/release/MANIFEST.json")
+
+# 稍後…
+result = verify_manifest("/release/payload", "/release/MANIFEST.json")
+if not result["ok"]:
+ raise SystemExit(f"manifest mismatch: {result}")
+```
+
+`result` 以 `matched`、`missing`、`modified`、`extra` 分別回報清單。
+多餘檔案(`extra`)不會讓驗證失敗(對齊 `sync_dir` 預設不刪除的行為),
+`missing` 與 `modified` 則會。JSON 動作:`FA_write_manifest`、
+`FA_verify_manifest`。
+
+### 通知
+透過 webhook、Slack 或 SMTP 推送一次性訊息,或在 trigger / scheduler
+失敗時自動通知:
+
+```python
+from automation_file import (
+ SlackSink, WebhookSink, EmailSink,
+ notification_manager, notify_send,
+)
+
+notification_manager.register(SlackSink("https://hooks.slack.com/services/T/B/X"))
+notify_send("deploy complete", body="rev abc123", level="info")
+```
+
+每個 sink 都遵循相同的 `send(subject, body, level)` 合約。Fanout 的
+`NotificationManager` 會進行個別 sink 錯誤隔離(一個壞掉的 sink 不會影響
+其他 sink)、滑動視窗去重(避免卡住的 trigger 洗版),以及對每個
+webhook / Slack URL 做 SSRF 驗證。Scheduler 與 trigger 派送器在失敗時
+會以 `level="error"` 自動通知 — 只要註冊 sink 就能取得生產環境告警。
+JSON 動作:`FA_notify_send`、`FA_notify_list`。
+
+### 設定檔與秘密提供者
+在 `automation_file.toml` 一次宣告 sink 與預設值。秘密參考在載入時由
+環境變數或檔案根目錄(Docker / K8s 風格)解析:
+
+```toml
+# automation_file.toml
+
+[secrets]
+file_root = "/run/secrets"
+
+[defaults]
+dedup_seconds = 120
+
+[[notify.sinks]]
+type = "slack"
+name = "team-alerts"
+webhook_url = "${env:SLACK_WEBHOOK}"
+
+[[notify.sinks]]
+type = "email"
+name = "ops-email"
+host = "smtp.example.com"
+port = 587
+sender = "alerts@example.com"
+recipients = ["ops@example.com"]
+username = "${env:SMTP_USER}"
+password = "${file:smtp_password}"
+```
+
+```python
+from automation_file import AutomationConfig, notification_manager
+
+config = AutomationConfig.load("automation_file.toml")
+config.apply_to(notification_manager)
+```
+
+未解析的 `${…}` 參考會拋出 `SecretNotFoundException`,而非默默變成空字串。
+可組合 `ChainedSecretProvider` / `EnvSecretProvider` /
+`FileSecretProvider` 建立自訂提供者鏈,並以
+`AutomationConfig.load(path, provider=…)` 傳入。
+
+### 動作清單變數替換
+以 `substitute=True` 啟用後,`${…}` 參考會在派送時展開:
+
+```python
+from automation_file import execute_action
+
+execute_action(
+ [["FA_create_file", {"file_path": "reports/${date:%Y-%m-%d}/${uuid}.txt"}]],
+ substitute=True,
+)
+```
+
+支援 `${env:VAR}`、`${date:FMT}`(strftime)、`${uuid}`、`${cwd}`。未知名稱
+會拋出 `SubstitutionException`,不會默默變成空字串。
+
+### 條件式執行
+只在路徑守護通過時執行巢狀動作清單:
+
+```json
+[
+ ["FA_if_exists", {"path": "/data/in/job.json",
+ "then": [["FA_copy_file", {"source": "/data/in/job.json",
+ "target": "/data/processed/job.json"}]]}],
+ ["FA_if_newer", {"source": "/src", "target": "/dst",
+ "then": [["FA_sync_dir", {"src": "/src", "dst": "/dst"}]]}],
+ ["FA_if_size_gt", {"path": "/logs/app.log", "size": 10485760,
+ "then": [["FA_run_shell", {"command": ["logrotate", "/logs/app.log"]}]]}]
+]
+```
+
+### SQLite 稽核日誌
+`AuditLog` 以短連線 + 模組層級 lock 為每個動作寫入一筆紀錄:
+
+```python
+from automation_file import AuditLog
+
+audit = AuditLog("audit.sqlite3")
+audit.record(action="FA_copy_file", actor="ops",
+ status="ok", duration_ms=12, detail={"src": "a", "dst": "b"})
+
+for row in audit.recent(limit=50):
+ print(row["timestamp"], row["action"], row["status"])
+```
+
+### 檔案完整性監控
+依 manifest 輪詢整棵樹,偵測到 drift 時觸發 callback + 通知:
+
+```python
+from automation_file import IntegrityMonitor, notification_manager, write_manifest
+
+write_manifest("/srv/site", "/srv/MANIFEST.json")
+
+mon = IntegrityMonitor(
+ root="/srv/site",
+ manifest_path="/srv/MANIFEST.json",
+ interval=60.0,
+ manager=notification_manager,
+ on_drift=lambda summary: print("drift:", summary),
+)
+mon.start()
+```
+
+載入 manifest 時的錯誤也會被視為 drift,讓竄改與設定問題走同一條處理
+路徑。
+
+### AES-256-GCM 檔案加密
+具驗證的加密與自述式封包格式。可由密碼衍生金鑰或直接產生金鑰:
+
+```python
+from automation_file import encrypt_file, decrypt_file, key_from_password
+
+key = key_from_password("correct horse battery staple", salt=b"app-salt-v1")
+encrypt_file("secret.pdf", "secret.pdf.enc", key, associated_data=b"v1")
+decrypt_file("secret.pdf.enc", "secret.pdf", key, associated_data=b"v1")
+```
+
+竄改由 GCM 驗證 tag 偵測,以 `CryptoException("authentication failed")`
+回報。JSON 動作:`FA_encrypt_file`、`FA_decrypt_file`。
+
+### HTTPActionClient Python SDK
+HTTP 動作伺服器的型別化客戶端;預設強制 loopback,並自動附帶 shared
+secret:
+
+```python
+from automation_file import HTTPActionClient
+
+with HTTPActionClient("http://127.0.0.1:9944", shared_secret="s3cr3t") as client:
+ client.ping() # OPTIONS /actions
+ result = client.execute([["FA_create_dir", {"dir_path": "x"}]])
+```
+
+驗證失敗會轉成 `HTTPActionClientException(kind="unauthorized")`;
+404 則表示伺服器存在但未對外提供 `/actions`。
+
+### Prometheus metrics 匯出器
+`ActionExecutor` 為每個動作記錄一筆計數器與一筆直方圖樣本。在 loopback
+`/metrics` 端點提供:
+
+```python
+from automation_file import start_metrics_server
+
+server = start_metrics_server(host="127.0.0.1", port=9945)
+# curl http://127.0.0.1:9945/metrics
+```
+
+匯出 `automation_file_actions_total{action,status}` 以及
+`automation_file_action_duration_seconds{action}`。若要綁定非 loopback
+位址必須明確傳入 `allow_non_loopback=True`。
+
+### DAG 動作執行器
+依相依關係執行動作;獨立分支會透過執行緒池平行展開。每個節點形式為
+`{"id": ..., "action": [...], "depends_on": [...]}`:
+
+```python
+from automation_file import execute_action_dag
+
+execute_action_dag([
+ {"id": "fetch", "action": ["FA_download_file",
+ ["https://example.com/src.tar.gz", "src.tar.gz"]]},
+ {"id": "verify", "action": ["FA_verify_checksum",
+ ["src.tar.gz", "3b0c44298fc1..."]],
+ "depends_on": ["fetch"]},
+ {"id": "unpack", "action": ["FA_unzip_file", ["src.tar.gz", "src"]],
+ "depends_on": ["verify"]},
+])
+```
+
+若 `verify` 拋出例外,預設情況下 `unpack` 會被標記為 `skipped`。傳入
+`fail_fast=False` 可讓後代節點仍然執行。JSON 動作:`FA_execute_action_dag`。
+
+### Entry-point 外掛
+第三方套件以 `pyproject.toml` 宣告動作:
+
+```toml
+[project.entry-points."automation_file.actions"]
+my_plugin = "my_plugin:register"
+```
+
+其中 `register` 是一個零參數的可呼叫物件,回傳 `dict[str, Callable]`。只要
+套件被安裝於同一個虛擬環境,這些指令就會出現在每次新建的 registry:
+
+```python
+# my_plugin/__init__.py
+def greet(name: str) -> str:
+ return f"hello {name}"
+
+def register() -> dict:
+ return {"FA_greet": greet}
+```
+
+```python
+# 執行 `pip install my_plugin` 之後
+from automation_file import execute_action
+execute_action([["FA_greet", {"name": "world"}]])
+```
+
+外掛失敗(匯入錯誤、factory 例外、回傳型別不正確、registry 拒絕)都會被記錄
+並吞掉 — 一個壞掉的外掛不會影響整個函式庫。
+
+### GUI
+```bash
+python -m automation_file ui # 或:python main_ui.py
+```
+
+```python
+from automation_file import launch_ui
+launch_ui()
+```
+
+分頁:Home、Local、Transfer、Progress、JSON actions、Triggers、Scheduler、
+Servers。底部常駐的 log 面板即時串流每一筆結果與錯誤。
+
+### 以 executor 為核心建立專案鷹架
+```python
+from automation_file import create_project_dir
+
+create_project_dir("my_workflow")
+```
+
+## CLI
+
+```bash
+# 子指令(一次性操作)
+python -m automation_file ui
+python -m automation_file zip ./src out.zip --dir
+python -m automation_file unzip out.zip ./restored
+python -m automation_file download https://example.com/file.bin file.bin
+python -m automation_file create-file hello.txt --content "hi"
+python -m automation_file server --host 127.0.0.1 --port 9943
+python -m automation_file http-server --host 127.0.0.1 --port 9944
+python -m automation_file drive-upload my.txt --token token.json --credentials creds.json
+
+# 舊式旗標(JSON 動作清單)
+python -m automation_file --execute_file actions.json
+python -m automation_file --execute_dir ./actions/
+python -m automation_file --execute_str '[["FA_create_dir",{"dir_path":"x"}]]'
+python -m automation_file --create_project ./my_project
+```
+
+## JSON 動作格式
+
+每一項動作可以是單純的指令名稱、`[name, kwargs]` 組合,或 `[name, args]`
+清單:
+
+```json
+[
+ ["FA_create_file", {"file_path": "test.txt"}],
+ ["FA_drive_upload_to_drive", {"file_path": "test.txt"}],
+ ["FA_drive_search_all_file"]
+]
+```
+
+## 文件
+
+完整 API 文件位於 `docs/`,可用 Sphinx 產生:
+
+```bash
+pip install -r docs/requirements.txt
+sphinx-build -b html docs/source docs/_build/html
+```
+
+架構筆記、程式碼慣例與安全考量請參見 [`CLAUDE.md`](CLAUDE.md)。
diff --git a/automation_file/__init__.py b/automation_file/__init__.py
index 7419549..3a620d1 100644
--- a/automation_file/__init__.py
+++ b/automation_file/__init__.py
@@ -9,6 +9,7 @@
from typing import TYPE_CHECKING, Any
+from automation_file.client import HTTPActionClient, HTTPActionClientException
from automation_file.core.action_executor import (
ActionExecutor,
add_command_to_executor,
@@ -19,11 +20,54 @@
validate_action,
)
from automation_file.core.action_registry import ActionRegistry, build_default_registry
+from automation_file.core.audit import AuditException, AuditLog
from automation_file.core.callback_executor import CallbackExecutor
+from automation_file.core.checksum import (
+ ChecksumMismatchException,
+ file_checksum,
+ verify_checksum,
+)
+from automation_file.core.config import AutomationConfig, ConfigException
+from automation_file.core.config_watcher import ConfigWatcher
+from automation_file.core.crypto import (
+ CryptoException,
+ decrypt_file,
+ encrypt_file,
+ generate_key,
+ key_from_password,
+)
+from automation_file.core.dag_executor import execute_action_dag
+from automation_file.core.fim import IntegrityMonitor
from automation_file.core.json_store import read_action_json, write_action_json
+from automation_file.core.manifest import ManifestException, verify_manifest, write_manifest
+from automation_file.core.metrics import ACTION_COUNT, ACTION_DURATION, record_action
+from automation_file.core.metrics import render as render_metrics
from automation_file.core.package_loader import PackageLoader
+from automation_file.core.progress import (
+ CancellationToken,
+ CancelledException,
+ ProgressRegistry,
+ ProgressReporter,
+ progress_cancel,
+ progress_clear,
+ progress_list,
+ progress_registry,
+ register_progress_ops,
+)
from automation_file.core.quota import Quota
from automation_file.core.retry import retry_on_transient
+from automation_file.core.secrets import (
+ ChainedSecretProvider,
+ EnvSecretProvider,
+ FileSecretProvider,
+ SecretException,
+ SecretNotFoundException,
+ SecretProvider,
+ default_provider,
+ resolve_secret_refs,
+)
+from automation_file.core.substitution import SubstitutionException, substitute
+from automation_file.local.conditional import if_exists, if_newer, if_size_gt
from automation_file.local.dir_ops import copy_dir, create_dir, remove_dir_tree, rename_dir
from automation_file.local.file_ops import (
copy_all_file_to_dir,
@@ -33,7 +77,16 @@
remove_file,
rename_file,
)
+from automation_file.local.json_edit import (
+ JsonEditException,
+ json_delete,
+ json_get,
+ json_set,
+)
from automation_file.local.safe_paths import is_within, safe_join
+from automation_file.local.shell_ops import ShellException, run_shell
+from automation_file.local.sync_ops import SyncException, sync_dir
+from automation_file.local.tar_ops import TarException, create_tar, extract_tar
from automation_file.local.zip_ops import (
read_zip_file,
set_zip_password,
@@ -44,17 +97,40 @@
zip_file_info,
zip_info,
)
+from automation_file.notify import (
+ DiscordSink,
+ EmailSink,
+ NotificationException,
+ NotificationManager,
+ NotificationSink,
+ PagerDutySink,
+ SlackSink,
+ TeamsSink,
+ TelegramSink,
+ WebhookSink,
+ notification_manager,
+ notify_send,
+ register_notify_ops,
+)
from automation_file.project.project_builder import ProjectBuilder, create_project_dir
from automation_file.remote.azure_blob import (
AzureBlobClient,
azure_blob_instance,
register_azure_blob_ops,
)
+from automation_file.remote.cross_backend import CrossBackendException, copy_between
from automation_file.remote.dropbox_api import (
DropboxClient,
dropbox_instance,
register_dropbox_ops,
)
+from automation_file.remote.ftp import (
+ FTPClient,
+ FTPConnectOptions,
+ FTPException,
+ ftp_instance,
+ register_ftp_ops,
+)
from automation_file.remote.google_drive.client import GoogleDriveClient, driver_instance
from automation_file.remote.google_drive.delete_ops import drive_delete_file
from automation_file.remote.google_drive.download_ops import (
@@ -82,12 +158,39 @@
from automation_file.remote.s3 import S3Client, register_s3_ops, s3_instance
from automation_file.remote.sftp import SFTPClient, register_sftp_ops, sftp_instance
from automation_file.remote.url_validator import validate_http_url
+from automation_file.scheduler import (
+ CronExpression,
+ ScheduledJob,
+ Scheduler,
+ register_scheduler_ops,
+ schedule_add,
+ schedule_list,
+ schedule_remove,
+ schedule_remove_all,
+ scheduler,
+)
+from automation_file.server.action_acl import ActionACL, ActionNotPermittedException
from automation_file.server.http_server import HTTPActionServer, start_http_action_server
+from automation_file.server.metrics_server import MetricsServer, start_metrics_server
from automation_file.server.tcp_server import (
TCPActionServer,
start_autocontrol_socket_server,
)
+from automation_file.trigger import (
+ FileWatcher,
+ TriggerManager,
+ register_trigger_ops,
+ trigger_manager,
+ watch_list,
+ watch_start,
+ watch_stop,
+ watch_stop_all,
+)
+from automation_file.utils.deduplicate import find_duplicates
+from automation_file.utils.fast_find import fast_find, has_os_index, scandir_find
from automation_file.utils.file_discovery import get_dir_files_as_list
+from automation_file.utils.grep import GrepException, grep_files, iter_grep
+from automation_file.utils.rotate import RotateException, rotate_backups
if TYPE_CHECKING:
from automation_file.ui.launcher import (
@@ -117,9 +220,12 @@ def __getattr__(name: str) -> Any:
"build_default_registry",
"execute_action",
"execute_action_parallel",
+ "execute_action_dag",
"execute_files",
"validate_action",
"retry_on_transient",
+ "substitute",
+ "SubstitutionException",
"add_command_to_executor",
"read_action_json",
"write_action_json",
@@ -137,6 +243,17 @@ def __getattr__(name: str) -> Any:
"create_dir",
"remove_dir_tree",
"rename_dir",
+ "sync_dir",
+ "SyncException",
+ "create_tar",
+ "extract_tar",
+ "TarException",
+ "run_shell",
+ "ShellException",
+ "json_get",
+ "json_set",
+ "json_delete",
+ "JsonEditException",
"zip_dir",
"zip_file",
"zip_info",
@@ -147,6 +264,9 @@ def __getattr__(name: str) -> Any:
"unzip_all",
"safe_join",
"is_within",
+ "if_exists",
+ "if_newer",
+ "if_size_gt",
# Remote
"download_file",
"validate_http_url",
@@ -178,14 +298,109 @@ def __getattr__(name: str) -> Any:
"SFTPClient",
"sftp_instance",
"register_sftp_ops",
+ "FTPClient",
+ "FTPConnectOptions",
+ "FTPException",
+ "ftp_instance",
+ "register_ftp_ops",
+ "CrossBackendException",
+ "copy_between",
# Server / Project / Utils
"TCPActionServer",
"start_autocontrol_socket_server",
"HTTPActionServer",
"start_http_action_server",
+ "HTTPActionClient",
+ "HTTPActionClientException",
+ "ActionACL",
+ "ActionNotPermittedException",
"ProjectBuilder",
"create_project_dir",
"get_dir_files_as_list",
+ "fast_find",
+ "scandir_find",
+ "has_os_index",
+ "file_checksum",
+ "verify_checksum",
+ "ChecksumMismatchException",
+ "find_duplicates",
+ "grep_files",
+ "iter_grep",
+ "GrepException",
+ "rotate_backups",
+ "RotateException",
+ "ManifestException",
+ "write_manifest",
+ "verify_manifest",
+ "AuditException",
+ "AuditLog",
+ "IntegrityMonitor",
+ "CryptoException",
+ "encrypt_file",
+ "decrypt_file",
+ "generate_key",
+ "key_from_password",
+ "ACTION_COUNT",
+ "ACTION_DURATION",
+ "record_action",
+ "render_metrics",
+ "MetricsServer",
+ "start_metrics_server",
+ # Triggers
+ "FileWatcher",
+ "TriggerManager",
+ "register_trigger_ops",
+ "trigger_manager",
+ "watch_start",
+ "watch_stop",
+ "watch_stop_all",
+ "watch_list",
+ # Scheduler
+ "CronExpression",
+ "ScheduledJob",
+ "Scheduler",
+ "register_scheduler_ops",
+ "schedule_add",
+ "schedule_list",
+ "schedule_remove",
+ "schedule_remove_all",
+ "scheduler",
+ # Progress / cancellation
+ "CancellationToken",
+ "CancelledException",
+ "ProgressRegistry",
+ "ProgressReporter",
+ "progress_cancel",
+ "progress_clear",
+ "progress_list",
+ "progress_registry",
+ "register_progress_ops",
+ # Notifications
+ "DiscordSink",
+ "EmailSink",
+ "NotificationException",
+ "NotificationManager",
+ "NotificationSink",
+ "PagerDutySink",
+ "SlackSink",
+ "TeamsSink",
+ "TelegramSink",
+ "WebhookSink",
+ "notification_manager",
+ "notify_send",
+ "register_notify_ops",
+ # Config / secrets
+ "AutomationConfig",
+ "ConfigException",
+ "ConfigWatcher",
+ "ChainedSecretProvider",
+ "EnvSecretProvider",
+ "FileSecretProvider",
+ "SecretException",
+ "SecretNotFoundException",
+ "SecretProvider",
+ "default_provider",
+ "resolve_secret_refs",
# UI (lazy-loaded)
"launch_ui",
]
diff --git a/automation_file/client/__init__.py b/automation_file/client/__init__.py
new file mode 100644
index 0000000..3678359
--- /dev/null
+++ b/automation_file/client/__init__.py
@@ -0,0 +1,7 @@
+"""Client SDK for talking to a running ``HTTPActionServer``."""
+
+from __future__ import annotations
+
+from automation_file.client.http_client import HTTPActionClient, HTTPActionClientException
+
+__all__ = ["HTTPActionClient", "HTTPActionClientException"]
diff --git a/automation_file/client/http_client.py b/automation_file/client/http_client.py
new file mode 100644
index 0000000..d5a56b2
--- /dev/null
+++ b/automation_file/client/http_client.py
@@ -0,0 +1,132 @@
+"""Python SDK for :class:`~automation_file.server.http_server.HTTPActionServer`.
+
+``HTTPActionClient(base_url, *, shared_secret=None)`` wraps a single
+``requests.Session`` and exposes ``execute(actions)`` which POSTs the JSON
+action list to ``/actions``. The client is intentionally thin:
+it handles auth header assembly, response-code checking, and error
+translation, but makes no attempt to mirror ``ActionExecutor``'s API
+surface — callers pass the same action-list shape they would pass to
+``execute_action``.
+"""
+
+from __future__ import annotations
+
+from types import TracebackType
+from typing import Any
+
+import requests
+
+from automation_file.exceptions import FileAutomationException
+from automation_file.logging_config import file_automation_logger
+from automation_file.remote.url_validator import validate_http_url
+
+_DEFAULT_TIMEOUT = 30.0
+_ACTIONS_PATH = "/actions"
+
+
+class HTTPActionClientException(FileAutomationException):
+ """Raised when the server rejects a request or the response is malformed."""
+
+
+class HTTPActionClient:
+ """Synchronous SDK for a running :class:`HTTPActionServer`."""
+
+ def __init__(
+ self,
+ base_url: str,
+ *,
+ shared_secret: str | None = None,
+ timeout: float = _DEFAULT_TIMEOUT,
+ verify_loopback: bool = False,
+ ) -> None:
+ stripped = base_url.rstrip("/")
+ if not stripped:
+ raise HTTPActionClientException("base_url must be non-empty")
+ if verify_loopback:
+ validate_http_url(stripped)
+ self._base_url = stripped
+ self._shared_secret = shared_secret
+ self._timeout = float(timeout)
+ self._session = requests.Session()
+
+ @property
+ def base_url(self) -> str:
+ return self._base_url
+
+ def execute(self, actions: list | dict) -> Any:
+ """POST ``actions`` to ``/actions`` and return the decoded JSON body."""
+ if not isinstance(actions, (list, dict)):
+ raise HTTPActionClientException(
+ f"actions must be list or dict, got {type(actions).__name__}"
+ )
+ url = f"{self._base_url}{_ACTIONS_PATH}"
+ headers = {"Content-Type": "application/json"}
+ if self._shared_secret:
+ headers["Authorization"] = f"Bearer {self._shared_secret}"
+ try:
+ response = self._session.post(
+ url,
+ json=actions,
+ headers=headers,
+ timeout=self._timeout,
+ allow_redirects=False,
+ )
+ except requests.RequestException as err:
+ raise HTTPActionClientException(f"request to {url} failed: {err}") from err
+ return _decode_response(response)
+
+ def ping(self) -> bool:
+ """Best-effort reachability probe — returns True if the server responds."""
+ url = f"{self._base_url}{_ACTIONS_PATH}"
+ try:
+ response = self._session.request(
+ "OPTIONS", url, timeout=min(self._timeout, 5.0), allow_redirects=False
+ )
+ except requests.RequestException:
+ return False
+ # The server only handles POST /actions; OPTIONS yields 501 which
+ # still proves it's reachable. 401/403 also prove reachability.
+ return response.status_code < 500 or response.status_code == 501
+
+ def close(self) -> None:
+ self._session.close()
+
+ def __enter__(self) -> HTTPActionClient:
+ return self
+
+ def __exit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc: BaseException | None,
+ tb: TracebackType | None,
+ ) -> None:
+ self.close()
+
+
+def _decode_response(response: requests.Response) -> Any:
+ status = response.status_code
+ if status == 401:
+ raise HTTPActionClientException("unauthorized: missing or invalid shared secret")
+ if status == 403:
+ body = _safe_body(response)
+ raise HTTPActionClientException(f"forbidden: {body}")
+ if status == 404:
+ raise HTTPActionClientException("server does not expose /actions")
+ if status >= 400:
+ body = _safe_body(response)
+ raise HTTPActionClientException(f"server returned HTTP {status}: {body}")
+ try:
+ return response.json()
+ except ValueError as err:
+ file_automation_logger.error("http_client: bad JSON response: %r", err)
+ raise HTTPActionClientException(f"server returned invalid JSON: {err}") from err
+
+
+def _safe_body(response: requests.Response) -> str:
+ try:
+ data = response.json()
+ except ValueError:
+ return response.text[:200]
+ if isinstance(data, dict) and "error" in data:
+ return str(data["error"])
+ return str(data)[:200]
diff --git a/automation_file/core/action_executor.py b/automation_file/core/action_executor.py
index b7c7972..f9aadad 100644
--- a/automation_file/core/action_executor.py
+++ b/automation_file/core/action_executor.py
@@ -15,12 +15,15 @@
from __future__ import annotations
+import time
from collections.abc import Mapping
from concurrent.futures import ThreadPoolExecutor
from typing import Any
from automation_file.core.action_registry import ActionRegistry, build_default_registry
from automation_file.core.json_store import read_action_json
+from automation_file.core.metrics import record_action
+from automation_file.core.substitution import substitute as substitute_payload
from automation_file.exceptions import ExecuteActionException, ValidationException
from automation_file.logging_config import file_automation_logger
@@ -95,14 +98,19 @@ def execute_action(
action_list: list | Mapping[str, Any],
dry_run: bool = False,
validate_first: bool = False,
+ substitute: bool = False,
) -> dict[str, Any]:
"""Execute every action; return ``{"execute: ": result|repr(error)}``.
``dry_run=True`` logs and records the resolved name without invoking the
command. ``validate_first=True`` runs :meth:`validate` before touching
- any action so a typo aborts the whole batch up-front.
+ any action so a typo aborts the whole batch up-front. ``substitute=True``
+ expands ``${env:...}`` / ``${date:...}`` / ``${uuid}`` / ``${cwd}``
+ placeholders inside every string in the payload.
"""
actions = self._coerce(action_list)
+ if substitute:
+ actions = substitute_payload(actions) # type: ignore[assignment]
if validate_first:
self.validate(actions)
results: dict[str, Any] = {}
@@ -144,20 +152,15 @@ def add_command_to_executor(self, command_dict: Mapping[str, Any]) -> None:
# Internals ---------------------------------------------------------
def _run_one(self, action: list, dry_run: bool) -> Any:
+ name = _safe_action_name(action)
+ if dry_run:
+ return self._run_dry(action)
+ started = time.monotonic()
+ ok = False
try:
- if dry_run:
- name, kind, payload = self._parse_action(action)
- if self.registry.resolve(name) is None:
- raise ExecuteActionException(f"unknown action: {name!r}")
- file_automation_logger.info(
- "dry_run: %s kind=%s payload=%r",
- name,
- kind,
- payload,
- )
- return f"dry_run:{name}"
value = self._execute_event(action)
file_automation_logger.info("execute_action: %s", action)
+ ok = True
return value
except ExecuteActionException as error:
file_automation_logger.error("execute_action malformed: %r", error)
@@ -165,6 +168,24 @@ def _run_one(self, action: list, dry_run: bool) -> Any:
except Exception as error: # pylint: disable=broad-except
file_automation_logger.error("execute_action runtime error: %r", error)
return repr(error)
+ finally:
+ record_action(name, time.monotonic() - started, ok)
+
+ def _run_dry(self, action: list) -> Any:
+ try:
+ name, kind, payload = self._parse_action(action)
+ if self.registry.resolve(name) is None:
+ raise ExecuteActionException(f"unknown action: {name!r}")
+ except ExecuteActionException as error:
+ file_automation_logger.error("execute_action malformed: %r", error)
+ return repr(error)
+ file_automation_logger.info(
+ "dry_run: %s kind=%s payload=%r",
+ name,
+ kind,
+ payload,
+ )
+ return f"dry_run:{name}"
@staticmethod
def _coerce(action_list: list | Mapping[str, Any]) -> list:
@@ -182,6 +203,12 @@ def _coerce(action_list: list | Mapping[str, Any]) -> list:
return action_list
+def _safe_action_name(action: Any) -> str:
+ if isinstance(action, list) and action and isinstance(action[0], str):
+ return action[0]
+ return "unknown"
+
+
# Default shared executor — built once, mutated in place by plugins.
executor: ActionExecutor = ActionExecutor()
@@ -190,12 +217,14 @@ def execute_action(
action_list: list | Mapping[str, Any],
dry_run: bool = False,
validate_first: bool = False,
+ substitute: bool = False,
) -> dict[str, Any]:
"""Module-level shim that delegates to the shared executor."""
return executor.execute_action(
action_list,
dry_run=dry_run,
validate_first=validate_first,
+ substitute=substitute,
)
diff --git a/automation_file/core/action_registry.py b/automation_file/core/action_registry.py
index 26d882a..bc11948 100644
--- a/automation_file/core/action_registry.py
+++ b/automation_file/core/action_registry.py
@@ -66,7 +66,16 @@ def event_dict(self) -> dict[str, Command]:
def _local_commands() -> dict[str, Command]:
- from automation_file.local import dir_ops, file_ops, zip_ops
+ from automation_file.local import (
+ conditional,
+ dir_ops,
+ file_ops,
+ json_edit,
+ shell_ops,
+ sync_ops,
+ tar_ops,
+ zip_ops,
+ )
return {
# Files
@@ -81,6 +90,16 @@ def _local_commands() -> dict[str, Command]:
"FA_create_dir": dir_ops.create_dir,
"FA_remove_dir_tree": dir_ops.remove_dir_tree,
"FA_rename_dir": dir_ops.rename_dir,
+ "FA_sync_dir": sync_ops.sync_dir,
+ # Shell
+ "FA_run_shell": shell_ops.run_shell,
+ # JSON edit
+ "FA_json_get": json_edit.json_get,
+ "FA_json_set": json_edit.json_set,
+ "FA_json_delete": json_edit.json_delete,
+ # Tar
+ "FA_create_tar": tar_ops.create_tar,
+ "FA_extract_tar": tar_ops.extract_tar,
# Zip
"FA_zip_dir": zip_ops.zip_dir,
"FA_zip_file": zip_ops.zip_file,
@@ -90,6 +109,10 @@ def _local_commands() -> dict[str, Command]:
"FA_unzip_file": zip_ops.unzip_file,
"FA_read_zip_file": zip_ops.read_zip_file,
"FA_unzip_all": zip_ops.unzip_all,
+ # Conditional dispatch
+ "FA_if_exists": conditional.if_exists,
+ "FA_if_newer": conditional.if_newer,
+ "FA_if_size_gt": conditional.if_size_gt,
}
@@ -129,9 +152,42 @@ def _http_commands() -> dict[str, Command]:
return {"FA_download_file": http_download.download_file}
+def _utils_commands() -> dict[str, Command]:
+ from automation_file.core import checksum, crypto, manifest
+ from automation_file.remote import cross_backend
+ from automation_file.utils import deduplicate, fast_find, grep, rotate
+
+ return {
+ "FA_fast_find": fast_find.fast_find,
+ "FA_file_checksum": checksum.file_checksum,
+ "FA_verify_checksum": checksum.verify_checksum,
+ "FA_find_duplicates": deduplicate.find_duplicates,
+ "FA_execute_action_dag": _lazy_execute_action_dag,
+ "FA_write_manifest": manifest.write_manifest,
+ "FA_verify_manifest": manifest.verify_manifest,
+ "FA_grep": grep.grep_files,
+ "FA_rotate_backups": rotate.rotate_backups,
+ "FA_copy_between": cross_backend.copy_between,
+ "FA_encrypt_file": crypto.encrypt_file,
+ "FA_decrypt_file": crypto.decrypt_file,
+ }
+
+
+def _lazy_execute_action_dag(
+ nodes: list,
+ max_workers: int = 4,
+ fail_fast: bool = True,
+) -> dict[str, Any]:
+ """Deferred import shim so the registry module doesn't depend on the DAG executor."""
+ from automation_file.core.dag_executor import execute_action_dag
+
+ return execute_action_dag(nodes, max_workers=max_workers, fail_fast=fail_fast)
+
+
def _register_cloud_backends(registry: ActionRegistry) -> None:
from automation_file.remote.azure_blob import register_azure_blob_ops
from automation_file.remote.dropbox_api import register_dropbox_ops
+ from automation_file.remote.ftp import register_ftp_ops
from automation_file.remote.s3 import register_s3_ops
from automation_file.remote.sftp import register_sftp_ops
@@ -139,16 +195,58 @@ def _register_cloud_backends(registry: ActionRegistry) -> None:
register_azure_blob_ops(registry)
register_dropbox_ops(registry)
register_sftp_ops(registry)
+ register_ftp_ops(registry)
+
+
+def _register_trigger_ops(registry: ActionRegistry) -> None:
+ from automation_file.trigger import register_trigger_ops
+
+ register_trigger_ops(registry)
+
+
+def _register_scheduler_ops(registry: ActionRegistry) -> None:
+ from automation_file.scheduler import register_scheduler_ops
+
+ register_scheduler_ops(registry)
+
+
+def _register_progress_ops(registry: ActionRegistry) -> None:
+ from automation_file.core.progress import register_progress_ops
+
+ register_progress_ops(registry)
+
+
+def _register_notify_ops(registry: ActionRegistry) -> None:
+ from automation_file.notify import register_notify_ops
+
+ register_notify_ops(registry)
def build_default_registry() -> ActionRegistry:
- """Return a registry pre-populated with every built-in ``FA_*`` action."""
+ """Return a registry pre-populated with every built-in ``FA_*`` action.
+
+ After the built-ins are registered, any third-party package advertising
+ an ``automation_file.actions`` entry point is loaded so its commands
+ land in the same registry. Plugins may override built-in names.
+ """
registry = ActionRegistry()
registry.register_many(_local_commands())
registry.register_many(_http_commands())
+ registry.register_many(_utils_commands())
registry.register_many(_drive_commands())
_register_cloud_backends(registry)
+ _register_trigger_ops(registry)
+ _register_scheduler_ops(registry)
+ _register_progress_ops(registry)
+ _register_notify_ops(registry)
+ _load_plugins(registry)
file_automation_logger.info(
"action_registry: built default registry with %d commands", len(registry)
)
return registry
+
+
+def _load_plugins(registry: ActionRegistry) -> None:
+ from automation_file.core.plugins import load_entry_point_plugins
+
+ load_entry_point_plugins(registry.register_many)
diff --git a/automation_file/core/audit.py b/automation_file/core/audit.py
new file mode 100644
index 0000000..ce573e1
--- /dev/null
+++ b/automation_file/core/audit.py
@@ -0,0 +1,140 @@
+"""SQLite-backed audit log for executed actions.
+
+``AuditLog(db_path)`` opens (or creates) a single-table SQLite database and
+appends one row per action execution. Rows carry the timestamp, action name,
+a JSON-encoded snapshot of the payload, the result / error repr, and the
+duration in milliseconds.
+
+Writes use a short-lived connection per call (``check_same_thread=False``
+semantics) so the log is safe to share between background worker threads
+and the scheduler. Readers call :meth:`AuditLog.recent` to pull the most
+recent N rows.
+
+The module deliberately avoids buffering / background queues: every row is
+persisted synchronously with an ``INSERT`` inside a ``with connect(..)`` so
+a crash at most loses the currently-executing action.
+"""
+
+from __future__ import annotations
+
+import json
+import sqlite3
+import threading
+import time
+from contextlib import closing
+from pathlib import Path
+from typing import Any
+
+from automation_file.exceptions import FileAutomationException
+from automation_file.logging_config import file_automation_logger
+
+_SCHEMA = """
+CREATE TABLE IF NOT EXISTS audit (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ ts REAL NOT NULL,
+ action TEXT NOT NULL,
+ payload TEXT NOT NULL,
+ result TEXT,
+ error TEXT,
+ duration_ms REAL NOT NULL
+);
+CREATE INDEX IF NOT EXISTS idx_audit_ts ON audit (ts DESC);
+"""
+
+
+class AuditException(FileAutomationException):
+ """Raised when the audit log cannot be opened or written."""
+
+
+class AuditLog:
+ """Synchronous SQLite audit log."""
+
+ def __init__(self, db_path: str | Path) -> None:
+ self._db_path = Path(db_path)
+ self._lock = threading.Lock()
+ try:
+ self._db_path.parent.mkdir(parents=True, exist_ok=True)
+ with closing(self._connect()) as conn:
+ conn.executescript(_SCHEMA)
+ conn.commit()
+ except (OSError, sqlite3.DatabaseError) as err:
+ raise AuditException(f"cannot open audit log {self._db_path}: {err}") from err
+
+ def record(
+ self,
+ action: str,
+ payload: Any,
+ *,
+ result: Any = None,
+ error: BaseException | None = None,
+ duration_ms: float = 0.0,
+ ) -> None:
+ """Append a single audit row. Never raises — failures are logged only."""
+ row = (
+ time.time(),
+ action,
+ _safe_json(payload),
+ _safe_json(result) if result is not None else None,
+ repr(error) if error is not None else None,
+ float(duration_ms),
+ )
+ try:
+ with self._lock, closing(self._connect()) as conn:
+ conn.execute(
+ "INSERT INTO audit (ts, action, payload, result, error, duration_ms)"
+ " VALUES (?, ?, ?, ?, ?, ?)",
+ row,
+ )
+ conn.commit()
+ except sqlite3.DatabaseError as err:
+ file_automation_logger.error("audit.record failed: %r", err)
+
+ def recent(self, limit: int = 100) -> list[dict[str, Any]]:
+ """Return the newest ``limit`` rows, newest first."""
+ if limit <= 0:
+ return []
+ with closing(self._connect()) as conn:
+ cursor = conn.execute(
+ "SELECT id, ts, action, payload, result, error, duration_ms"
+ " FROM audit ORDER BY ts DESC LIMIT ?",
+ (limit,),
+ )
+ rows = cursor.fetchall()
+ return [
+ {
+ "id": row[0],
+ "ts": row[1],
+ "action": row[2],
+ "payload": json.loads(row[3]) if row[3] else None,
+ "result": json.loads(row[4]) if row[4] else None,
+ "error": row[5],
+ "duration_ms": row[6],
+ }
+ for row in rows
+ ]
+
+ def count(self) -> int:
+ with closing(self._connect()) as conn:
+ cursor = conn.execute("SELECT COUNT(*) FROM audit")
+ (total,) = cursor.fetchone()
+ return int(total)
+
+ def purge(self, older_than_seconds: float) -> int:
+ """Delete rows older than ``older_than_seconds`` and return the row count."""
+ if older_than_seconds <= 0:
+ raise AuditException("older_than_seconds must be positive")
+ cutoff = time.time() - older_than_seconds
+ with self._lock, closing(self._connect()) as conn:
+ cursor = conn.execute("DELETE FROM audit WHERE ts < ?", (cutoff,))
+ conn.commit()
+ return int(cursor.rowcount)
+
+ def _connect(self) -> sqlite3.Connection:
+ return sqlite3.connect(self._db_path, timeout=5.0)
+
+
+def _safe_json(value: Any) -> str:
+ try:
+ return json.dumps(value, default=repr, ensure_ascii=False)
+ except (TypeError, ValueError):
+ return json.dumps(repr(value), ensure_ascii=False)
diff --git a/automation_file/core/checksum.py b/automation_file/core/checksum.py
new file mode 100644
index 0000000..69b5080
--- /dev/null
+++ b/automation_file/core/checksum.py
@@ -0,0 +1,72 @@
+"""File checksum + integrity verification helpers.
+
+Streaming hashes so multi-GB files don't blow up memory. The hash algorithm
+is whatever :func:`hashlib.new` understands (``sha256``, ``sha1``, ``md5``,
+``blake2b``, ...). :func:`file_checksum` returns the hex digest;
+:func:`verify_checksum` does a constant-time compare against the expected
+value so callers can't leak timing on the check.
+"""
+
+from __future__ import annotations
+
+import hashlib
+import hmac
+import os
+from pathlib import Path
+
+from automation_file.exceptions import FileAutomationException, FileNotExistsException
+from automation_file.logging_config import file_automation_logger
+
+__all__ = [
+ "ChecksumMismatchException",
+ "file_checksum",
+ "verify_checksum",
+]
+
+_DEFAULT_ALGO = "sha256"
+_DEFAULT_CHUNK = 1024 * 1024 # 1 MiB
+
+
+class ChecksumMismatchException(FileAutomationException):
+ """Raised when a computed digest does not match the expected value."""
+
+
+def file_checksum(
+ path: str | os.PathLike[str],
+ algorithm: str = _DEFAULT_ALGO,
+ chunk_size: int = _DEFAULT_CHUNK,
+) -> str:
+ """Return the hex digest of ``path`` under ``algorithm``.
+
+ Reads the file in ``chunk_size`` blocks so the memory cost is bounded
+ regardless of file size. Raises :class:`FileNotExistsException` when the
+ path is missing and :class:`ValueError` for unknown algorithms.
+ """
+ target = Path(path)
+ if not target.is_file():
+ raise FileNotExistsException(f"checksum target missing: {target}")
+ try:
+ digest = hashlib.new(algorithm)
+ except ValueError as err:
+ raise ValueError(f"unknown hash algorithm: {algorithm!r}") from err
+ with target.open("rb") as handle:
+ for block in iter(lambda: handle.read(chunk_size), b""):
+ digest.update(block)
+ return digest.hexdigest()
+
+
+def verify_checksum(
+ path: str | os.PathLike[str],
+ expected: str,
+ algorithm: str = _DEFAULT_ALGO,
+ chunk_size: int = _DEFAULT_CHUNK,
+) -> bool:
+ """Return True iff ``path``'s digest matches ``expected`` (case-insensitive).
+
+ Uses :func:`hmac.compare_digest` so the match check is constant-time.
+ """
+ actual = file_checksum(path, algorithm=algorithm, chunk_size=chunk_size)
+ matched = hmac.compare_digest(actual.lower(), expected.lower())
+ if not matched:
+ file_automation_logger.warning("verify_checksum mismatch: %s algo=%s", path, algorithm)
+ return matched
diff --git a/automation_file/core/config.py b/automation_file/core/config.py
new file mode 100644
index 0000000..6b2b1f6
--- /dev/null
+++ b/automation_file/core/config.py
@@ -0,0 +1,202 @@
+"""TOML-based configuration for automation_file.
+
+Callers describe notification sinks, secret-provider roots, and scheduler
+defaults in a single ``automation_file.toml`` file. :class:`AutomationConfig`
+loads it, resolves ``${env:…}`` / ``${file:…}`` references via the secret
+provider chain, and exposes helpers to materialise runtime objects (sinks,
+etc.) without the caller poking at the raw dict.
+
+Minimal example::
+
+ [secrets]
+ file_root = "/run/secrets"
+
+ [[notify.sinks]]
+ type = "slack"
+ name = "team-alerts"
+ webhook_url = "${env:SLACK_WEBHOOK}"
+
+ [[notify.sinks]]
+ type = "email"
+ name = "ops-email"
+ host = "smtp.example.com"
+ port = 587
+ sender = "alerts@example.com"
+ recipients = ["ops@example.com"]
+ username = "${env:SMTP_USER}"
+ password = "${file:smtp_password}"
+
+ [defaults]
+ dedup_seconds = 120
+
+Only the sections the caller uses need to appear; everything is optional.
+"""
+
+from __future__ import annotations
+
+import sys
+from pathlib import Path
+from typing import Any
+
+if sys.version_info >= (3, 11):
+ import tomllib
+else: # pragma: no cover - exercised only on Python 3.10 runners
+ import tomli as tomllib
+
+from automation_file.core.secrets import (
+ ChainedSecretProvider,
+ default_provider,
+ resolve_secret_refs,
+)
+from automation_file.exceptions import FileAutomationException
+from automation_file.logging_config import file_automation_logger
+from automation_file.notify.manager import NotificationManager
+from automation_file.notify.sinks import (
+ EmailSink,
+ NotificationException,
+ NotificationSink,
+ SlackSink,
+ WebhookSink,
+)
+
+
+class ConfigException(FileAutomationException):
+ """Raised when the config file is missing, unparseable, or malformed."""
+
+
+class AutomationConfig:
+ """Parsed, secret-resolved view of an ``automation_file.toml`` document."""
+
+ def __init__(self, data: dict[str, Any], *, source: Path | None = None) -> None:
+ self._data = data
+ self._source = source
+
+ @classmethod
+ def load(
+ cls,
+ path: str | Path,
+ *,
+ provider: ChainedSecretProvider | None = None,
+ ) -> AutomationConfig:
+ """Parse ``path`` as TOML, resolve secret refs, and return the config."""
+ config_path = Path(path)
+ if not config_path.is_file():
+ raise ConfigException(f"config file not found: {config_path}")
+ try:
+ raw = tomllib.loads(config_path.read_text(encoding="utf-8"))
+ except (OSError, tomllib.TOMLDecodeError) as err:
+ raise ConfigException(f"cannot parse {config_path}: {err}") from err
+ secrets_section = raw.get("secrets") or {}
+ file_root = secrets_section.get("file_root")
+ effective_provider = provider or default_provider(file_root)
+ resolved = resolve_secret_refs(raw, effective_provider)
+ file_automation_logger.info("config loaded from %s", config_path)
+ return cls(resolved, source=config_path)
+
+ @property
+ def source(self) -> Path | None:
+ return self._source
+
+ @property
+ def raw(self) -> dict[str, Any]:
+ """Return a shallow copy of the resolved document."""
+ return dict(self._data)
+
+ def section(self, name: str) -> dict[str, Any]:
+ """Return one top-level section as a dict (empty if absent)."""
+ value = self._data.get(name)
+ if value is None:
+ return {}
+ if not isinstance(value, dict):
+ raise ConfigException(f"section {name!r} must be a table, got {type(value).__name__}")
+ return value
+
+ def notification_sinks(self) -> list[NotificationSink]:
+ """Instantiate every sink declared under ``[[notify.sinks]]``."""
+ notify_section = self.section("notify")
+ entries = notify_section.get("sinks") or []
+ if not isinstance(entries, list):
+ raise ConfigException("'notify.sinks' must be an array of tables")
+ sinks: list[NotificationSink] = []
+ for entry in entries:
+ if not isinstance(entry, dict):
+ raise ConfigException("each 'notify.sinks' entry must be a table")
+ sinks.append(_build_sink(entry))
+ return sinks
+
+ def apply_to(self, manager: NotificationManager) -> int:
+ """Register every configured sink into ``manager``. Returns the count.
+
+ Existing registrations are preserved; duplicates by name are replaced
+ (see :meth:`NotificationManager.register`).
+ """
+ count = 0
+ for sink in self.notification_sinks():
+ manager.register(sink)
+ count += 1
+ defaults = self.section("defaults")
+ if "dedup_seconds" in defaults:
+ try:
+ manager.dedup_seconds = float(defaults["dedup_seconds"])
+ except (TypeError, ValueError) as err:
+ raise ConfigException(
+ f"defaults.dedup_seconds must be a number, got {defaults['dedup_seconds']!r}"
+ ) from err
+ return count
+
+
+def _build_sink(entry: dict[str, Any]) -> NotificationSink:
+ sink_type = entry.get("type")
+ if not isinstance(sink_type, str):
+ raise ConfigException("each sink entry needs a 'type' string")
+ builder = _SINK_BUILDERS.get(sink_type)
+ if builder is None:
+ raise ConfigException(
+ f"unknown sink type {sink_type!r} (expected one of {sorted(_SINK_BUILDERS)})"
+ )
+ try:
+ return builder(entry)
+ except NotificationException:
+ raise
+ except (TypeError, ValueError) as err:
+ raise ConfigException(
+ f"invalid config for sink {entry.get('name') or sink_type!r}: {err}"
+ ) from err
+
+
+def _build_webhook(entry: dict[str, Any]) -> WebhookSink:
+ return WebhookSink(
+ url=entry["url"],
+ name=entry.get("name", "webhook"),
+ timeout=float(entry.get("timeout", 10.0)),
+ extra_headers=entry.get("extra_headers"),
+ )
+
+
+def _build_slack(entry: dict[str, Any]) -> SlackSink:
+ return SlackSink(
+ webhook_url=entry["webhook_url"],
+ name=entry.get("name", "slack"),
+ timeout=float(entry.get("timeout", 10.0)),
+ )
+
+
+def _build_email(entry: dict[str, Any]) -> EmailSink:
+ return EmailSink(
+ host=entry["host"],
+ port=int(entry["port"]),
+ sender=entry["sender"],
+ recipients=list(entry["recipients"]),
+ username=entry.get("username"),
+ password=entry.get("password"),
+ use_tls=bool(entry.get("use_tls", True)),
+ name=entry.get("name", "email"),
+ timeout=float(entry.get("timeout", 10.0)),
+ )
+
+
+_SINK_BUILDERS = {
+ "webhook": _build_webhook,
+ "slack": _build_slack,
+ "email": _build_email,
+}
diff --git a/automation_file/core/config_watcher.py b/automation_file/core/config_watcher.py
new file mode 100644
index 0000000..80c7679
--- /dev/null
+++ b/automation_file/core/config_watcher.py
@@ -0,0 +1,96 @@
+"""Polling hot-reload for ``automation_file.toml``.
+
+``ConfigWatcher`` runs a small background thread that checks the config file's
+mtime + size every ``interval`` seconds. When either changes it reloads via
+:meth:`AutomationConfig.load` and fires the user-supplied ``on_change``
+callback with the fresh config. Errors during reload are logged but do not
+terminate the watcher — the next successful reload picks up where we left
+off.
+
+We use polling rather than watchdog here: it's one file, cross-platform, and
+the reload cadence is inherently human-scale (seconds, not milliseconds).
+"""
+
+from __future__ import annotations
+
+import threading
+from collections.abc import Callable
+from pathlib import Path
+
+from automation_file.core.config import AutomationConfig, ConfigException
+from automation_file.core.secrets import ChainedSecretProvider
+from automation_file.exceptions import FileAutomationException
+from automation_file.logging_config import file_automation_logger
+
+OnChange = Callable[[AutomationConfig], None]
+
+
+class ConfigWatcher:
+ """Polls ``path`` and invokes ``on_change`` whenever the file changes."""
+
+ def __init__(
+ self,
+ path: str | Path,
+ on_change: OnChange,
+ *,
+ interval: float = 2.0,
+ provider: ChainedSecretProvider | None = None,
+ ) -> None:
+ if interval <= 0:
+ raise ConfigException("interval must be positive")
+ self._path = Path(path)
+ self._on_change = on_change
+ self._interval = interval
+ self._provider = provider
+ self._stop = threading.Event()
+ self._thread: threading.Thread | None = None
+ self._last_fingerprint: tuple[float, int] | None = None
+
+ def start(self) -> AutomationConfig:
+ """Load the config once, arm the watcher, and return the initial load."""
+ config = AutomationConfig.load(self._path, provider=self._provider)
+ self._last_fingerprint = self._fingerprint()
+ self._stop.clear()
+ thread = threading.Thread(target=self._run, name="fa-config-watcher", daemon=True)
+ thread.start()
+ self._thread = thread
+ file_automation_logger.info(
+ "config_watcher: watching %s (interval=%.2fs)", self._path, self._interval
+ )
+ return config
+
+ def stop(self, timeout: float = 5.0) -> None:
+ self._stop.set()
+ thread = self._thread
+ self._thread = None
+ if thread is not None and thread.is_alive():
+ thread.join(timeout=timeout)
+
+ def check_once(self) -> bool:
+ """Reload if the file changed since the last call. Returns True on reload."""
+ fingerprint = self._fingerprint()
+ if fingerprint == self._last_fingerprint:
+ return False
+ self._last_fingerprint = fingerprint
+ try:
+ config = AutomationConfig.load(self._path, provider=self._provider)
+ except FileAutomationException as err:
+ file_automation_logger.error("config_watcher: reload failed: %r", err)
+ return False
+ try:
+ self._on_change(config)
+ except FileAutomationException as err:
+ file_automation_logger.error("config_watcher: on_change raised: %r", err)
+ return True
+
+ def _fingerprint(self) -> tuple[float, int]:
+ try:
+ stat = self._path.stat()
+ except OSError:
+ return (0.0, 0)
+ return (stat.st_mtime, stat.st_size)
+
+ def _run(self) -> None:
+ while not self._stop.is_set():
+ self.check_once()
+ self._stop.wait(self._interval)
diff --git a/automation_file/core/crypto.py b/automation_file/core/crypto.py
new file mode 100644
index 0000000..1a9573e
--- /dev/null
+++ b/automation_file/core/crypto.py
@@ -0,0 +1,174 @@
+"""AES-256-GCM file encryption helpers.
+
+``encrypt_file(source, target, key)`` writes a self-describing envelope::
+
+ magic = b"FA-AESG" 7 bytes
+ version = 0x01 1 byte
+ flags = 0x00 1 byte (reserved)
+ aad_len = uint32 BE 4 bytes
+ nonce = 12 bytes
+ aad =
+ ciphertext + tag (rest — GCM tag is the trailing 16 bytes)
+
+``decrypt_file`` reads the same format, verifies the tag, and writes the
+plaintext to ``target``. Tampering (bit flips anywhere in the envelope
+except ``aad_len``) surfaces as :class:`CryptoException`.
+
+GCM has a hard plaintext limit of roughly 64 GiB per ``(key, nonce)``
+pair; since each encrypt generates a fresh nonce, the practical cap is
+per-file and is much larger than typical automation payloads. For files
+approaching that size, split before calling ``encrypt_file``.
+"""
+
+from __future__ import annotations
+
+import os
+from pathlib import Path
+
+from cryptography.exceptions import InvalidTag
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.ciphers.aead import AESGCM
+from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
+
+from automation_file.exceptions import FileAutomationException
+from automation_file.logging_config import file_automation_logger
+
+_MAGIC = b"FA-AESG"
+_VERSION = 0x01
+_NONCE_SIZE = 12
+_HEADER_FIXED_SIZE = len(_MAGIC) + 2 + 4 # magic + version + flags + aad_len
+_VALID_KEY_SIZES = frozenset({16, 24, 32})
+_DEFAULT_PBKDF2_ITERATIONS = 200_000
+
+
+class CryptoException(FileAutomationException):
+ """Raised when encryption / decryption fails (including on tamper)."""
+
+
+def generate_key(*, bits: int = 256) -> bytes:
+ """Return cryptographically random bytes suitable for AES-GCM."""
+ if bits not in (128, 192, 256):
+ raise CryptoException(f"bits must be 128 / 192 / 256, got {bits}")
+ return os.urandom(bits // 8)
+
+
+def key_from_password(
+ password: str,
+ salt: bytes,
+ *,
+ iterations: int = _DEFAULT_PBKDF2_ITERATIONS,
+ bits: int = 256,
+) -> bytes:
+ """Derive a symmetric key from ``password`` via PBKDF2-HMAC-SHA256."""
+ if not password:
+ raise CryptoException("password must be non-empty")
+ if len(salt) < 16:
+ raise CryptoException("salt must be at least 16 bytes")
+ if bits not in (128, 192, 256):
+ raise CryptoException(f"bits must be 128 / 192 / 256, got {bits}")
+ kdf = PBKDF2HMAC(
+ algorithm=hashes.SHA256(),
+ length=bits // 8,
+ salt=salt,
+ iterations=iterations,
+ )
+ return kdf.derive(password.encode("utf-8"))
+
+
+def encrypt_file(
+ source: str | os.PathLike[str],
+ target: str | os.PathLike[str],
+ key: bytes,
+ *,
+ associated_data: bytes = b"",
+) -> dict[str, int]:
+ """Encrypt ``source`` to ``target`` under AES-GCM. Returns a size summary."""
+ _validate_key(key)
+ if not isinstance(associated_data, (bytes, bytearray)):
+ raise CryptoException("associated_data must be bytes")
+ src = Path(source)
+ if not src.is_file():
+ raise CryptoException(f"source file not found: {src}")
+
+ plaintext = src.read_bytes()
+ nonce = os.urandom(_NONCE_SIZE)
+ aesgcm = AESGCM(bytes(key))
+ ciphertext = aesgcm.encrypt(nonce, plaintext, bytes(associated_data) or None)
+
+ envelope = _build_header(associated_data, nonce) + ciphertext
+ dst = Path(target)
+ dst.parent.mkdir(parents=True, exist_ok=True)
+ dst.write_bytes(envelope)
+ file_automation_logger.info(
+ "encrypt_file: %s -> %s (%d -> %d bytes)",
+ src,
+ dst,
+ len(plaintext),
+ len(envelope),
+ )
+ return {"plaintext_bytes": len(plaintext), "ciphertext_bytes": len(envelope)}
+
+
+def decrypt_file(
+ source: str | os.PathLike[str],
+ target: str | os.PathLike[str],
+ key: bytes,
+) -> dict[str, int]:
+ """Decrypt ``source`` to ``target``. Raises on invalid tag / header."""
+ _validate_key(key)
+ src = Path(source)
+ if not src.is_file():
+ raise CryptoException(f"source file not found: {src}")
+ envelope = src.read_bytes()
+ nonce, aad, ciphertext = _parse_envelope(envelope)
+ aesgcm = AESGCM(bytes(key))
+ try:
+ plaintext = aesgcm.decrypt(nonce, ciphertext, aad or None)
+ except InvalidTag as err:
+ raise CryptoException("authentication failed: wrong key or tampered data") from err
+
+ dst = Path(target)
+ dst.parent.mkdir(parents=True, exist_ok=True)
+ dst.write_bytes(plaintext)
+ file_automation_logger.info(
+ "decrypt_file: %s -> %s (%d -> %d bytes)",
+ src,
+ dst,
+ len(envelope),
+ len(plaintext),
+ )
+ return {"ciphertext_bytes": len(envelope), "plaintext_bytes": len(plaintext)}
+
+
+def _validate_key(key: bytes) -> None:
+ if not isinstance(key, (bytes, bytearray)):
+ raise CryptoException("key must be bytes")
+ if len(key) not in _VALID_KEY_SIZES:
+ raise CryptoException(
+ f"key length must be 16 / 24 / 32 bytes, got {len(key)}",
+ )
+
+
+def _build_header(aad: bytes, nonce: bytes) -> bytes:
+ aad_len = len(aad).to_bytes(4, "big")
+ return _MAGIC + bytes([_VERSION, 0x00]) + aad_len + nonce + bytes(aad)
+
+
+def _parse_envelope(envelope: bytes) -> tuple[bytes, bytes, bytes]:
+ if len(envelope) < _HEADER_FIXED_SIZE + _NONCE_SIZE + 16:
+ raise CryptoException("ciphertext envelope is shorter than the fixed header")
+ if not envelope.startswith(_MAGIC):
+ raise CryptoException("not an AES-GCM envelope (bad magic)")
+ version = envelope[len(_MAGIC)]
+ if version != _VERSION:
+ raise CryptoException(f"unsupported envelope version {version}")
+ aad_len = int.from_bytes(envelope[_HEADER_FIXED_SIZE - 4 : _HEADER_FIXED_SIZE], "big")
+ nonce_start = _HEADER_FIXED_SIZE
+ nonce_end = nonce_start + _NONCE_SIZE
+ aad_end = nonce_end + aad_len
+ if aad_end > len(envelope):
+ raise CryptoException("envelope truncated before aad end")
+ nonce = envelope[nonce_start:nonce_end]
+ aad = envelope[nonce_end:aad_end]
+ ciphertext = envelope[aad_end:]
+ return nonce, aad, ciphertext
diff --git a/automation_file/core/dag_executor.py b/automation_file/core/dag_executor.py
new file mode 100644
index 0000000..7bea7d3
--- /dev/null
+++ b/automation_file/core/dag_executor.py
@@ -0,0 +1,175 @@
+"""DAG executor — run actions in dependency order, parallelising independent branches.
+
+A DAG node is a dict:
+
+.. code-block:: python
+
+ {"id": "build", "action": ["FA_create_dir", {"dir_path": "build"}]}
+ {"id": "zip", "action": ["FA_zip_dir", ...], "depends_on": ["build"]}
+
+The executor performs Kahn-style topological scheduling: every node whose
+dependencies are satisfied becomes runnable and is dispatched to a shared
+thread pool immediately — so diamonds and wide fan-outs run in parallel
+without the caller hand-tuning ``max_workers`` around dependency edges.
+
+If a node raises, its transitive dependents are marked ``skipped`` by
+default (fail-fast semantics). Pass ``fail_fast=False`` to run dependents
+regardless (useful for cleanup steps).
+"""
+
+from __future__ import annotations
+
+import threading
+from collections import defaultdict, deque
+from collections.abc import Mapping
+from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait
+from typing import Any
+
+from automation_file.core.action_executor import executor as default_executor
+from automation_file.exceptions import DagException
+
+__all__ = ["execute_action_dag"]
+
+
+def execute_action_dag(
+ nodes: list[Mapping[str, Any]],
+ max_workers: int = 4,
+ fail_fast: bool = True,
+) -> dict[str, Any]:
+ """Run ``nodes`` in topological order, parallelising independent branches.
+
+ Each node is ``{"id": str, "action": [...], "depends_on": [id, ...]}``.
+ ``depends_on`` is optional (default ``[]``). Returns a dict mapping each
+ node id to either the action's return value, the repr of its exception,
+ or ``"skipped: "`` when ``fail_fast`` blocks a branch.
+
+ Raises :class:`DagException` for static errors detected before any action
+ runs: duplicate ids, unknown dependencies, or cycles.
+ """
+ graph, indegree = _build_graph(nodes)
+ node_map = {_require_id(node): node for node in nodes}
+ results: dict[str, Any] = {}
+ lock = threading.Lock()
+
+ ready: deque[str] = deque(node_id for node_id, count in indegree.items() if count == 0)
+
+ with ThreadPoolExecutor(max_workers=max_workers) as pool:
+ in_flight: dict[Future[Any], str] = {}
+
+ def submit(node_id: str) -> None:
+ action = node_map[node_id].get("action")
+ if not isinstance(action, list):
+ err = DagException(f"node {node_id!r} missing action list")
+ with lock:
+ results[node_id] = repr(err)
+ if fail_fast:
+ for dependent in graph.get(node_id, ()):
+ indegree[dependent] -= 1
+ _mark_skipped(dependent, node_id, graph, indegree, results, lock)
+ return
+ future = pool.submit(_run_action, action)
+ in_flight[future] = node_id
+
+ while ready or in_flight:
+ while ready:
+ submit(ready.popleft())
+ if not in_flight:
+ break
+ done, _ = wait(list(in_flight), return_when=FIRST_COMPLETED)
+ for future in done:
+ node_id = in_flight.pop(future)
+ failed = False
+ try:
+ value: Any = future.result()
+ except Exception as err: # pylint: disable=broad-except
+ value = repr(err)
+ failed = True
+ with lock:
+ results[node_id] = value
+ for dependent in graph.get(node_id, ()):
+ indegree[dependent] -= 1
+ if failed and fail_fast:
+ _mark_skipped(dependent, node_id, graph, indegree, results, lock)
+ elif indegree[dependent] == 0 and dependent not in results:
+ ready.append(dependent)
+
+ return results
+
+
+def _run_action(action: list) -> Any:
+ # Use the single-action path so exceptions surface as real exceptions
+ # rather than being swallowed by execute_action's per-action try/except.
+ return default_executor._execute_event(action)
+
+
+def _build_graph(
+ nodes: list[Mapping[str, Any]],
+) -> tuple[dict[str, list[str]], dict[str, int]]:
+ graph: dict[str, list[str]] = defaultdict(list)
+ indegree: dict[str, int] = {}
+ ids: set[str] = set()
+
+ for node in nodes:
+ node_id = _require_id(node)
+ if node_id in ids:
+ raise DagException(f"duplicate node id: {node_id!r}")
+ ids.add(node_id)
+ indegree[node_id] = 0
+
+ for node in nodes:
+ node_id = _require_id(node)
+ deps = node.get("depends_on", []) or []
+ if not isinstance(deps, list):
+ raise DagException(f"node {node_id!r} depends_on must be list")
+ for dep in deps:
+ if dep not in ids:
+ raise DagException(f"node {node_id!r} depends on unknown id {dep!r}")
+ if dep == node_id:
+ raise DagException(f"node {node_id!r} depends on itself")
+ graph[dep].append(node_id)
+ indegree[node_id] += 1
+
+ _detect_cycle(ids, graph, dict(indegree))
+ return dict(graph), indegree
+
+
+def _require_id(node: Mapping[str, Any]) -> str:
+ node_id = node.get("id")
+ if not isinstance(node_id, str) or not node_id:
+ raise DagException(f"node missing non-empty 'id': {node!r}")
+ return node_id
+
+
+def _detect_cycle(
+ ids: set[str],
+ graph: dict[str, list[str]],
+ indegree: dict[str, int],
+) -> None:
+ queue: deque[str] = deque(node_id for node_id, count in indegree.items() if count == 0)
+ visited = 0
+ while queue:
+ current = queue.popleft()
+ visited += 1
+ for dependent in graph.get(current, ()):
+ indegree[dependent] -= 1
+ if indegree[dependent] == 0:
+ queue.append(dependent)
+ if visited != len(ids):
+ raise DagException("cycle detected in DAG")
+
+
+def _mark_skipped(
+ dependent: str,
+ reason_id: str,
+ graph: dict[str, list[str]],
+ indegree: dict[str, int],
+ results: dict[str, Any],
+ lock: threading.Lock,
+) -> None:
+ with lock:
+ if dependent in results:
+ return
+ results[dependent] = f"skipped: dep {reason_id!r} failed"
+ for grandchild in graph.get(dependent, ()):
+ indegree[grandchild] -= 1
+ _mark_skipped(grandchild, dependent, graph, indegree, results, lock)
diff --git a/automation_file/core/fim.py b/automation_file/core/fim.py
new file mode 100644
index 0000000..12051ef
--- /dev/null
+++ b/automation_file/core/fim.py
@@ -0,0 +1,151 @@
+"""File integrity monitoring (FIM) — periodic manifest verification.
+
+``IntegrityMonitor(root, manifest_path)`` runs a background thread that
+re-verifies the tree against a previously-written manifest at a fixed
+interval. When drift is detected (missing or modified files) the monitor
+
+1. invokes the optional ``on_drift`` callback with the verification summary,
+2. emits an ``error``-level notification through the supplied
+ :class:`~automation_file.notify.NotificationManager` (defaults to the
+ process-wide singleton), and
+3. logs a single warning describing the counts.
+
+This is the "watchdog" side of manifests: once a baseline has been written
+with :func:`write_manifest`, a monitor keeps checking that the tree still
+matches and alerts when it does not. Extras (new files not in the manifest)
+do not count as drift by default — mirrors the posture of ``verify_manifest``.
+"""
+
+from __future__ import annotations
+
+import threading
+from collections.abc import Callable
+from pathlib import Path
+from typing import Any
+
+from automation_file.core.manifest import ManifestException, verify_manifest
+from automation_file.exceptions import FileAutomationException
+from automation_file.logging_config import file_automation_logger
+from automation_file.notify import NotificationManager, notification_manager
+
+OnDrift = Callable[[dict[str, Any]], None]
+
+
+class IntegrityMonitor:
+ """Periodically verify a manifest and fire alerts on drift."""
+
+ def __init__(
+ self,
+ root: str | Path,
+ manifest_path: str | Path,
+ *,
+ interval: float = 60.0,
+ on_drift: OnDrift | None = None,
+ manager: NotificationManager | None = None,
+ alert_on_extra: bool = False,
+ ) -> None:
+ if interval <= 0:
+ raise FileAutomationException("interval must be positive")
+ self._root = Path(root)
+ self._manifest_path = Path(manifest_path)
+ self._interval = float(interval)
+ self._on_drift = on_drift
+ self._manager = manager or notification_manager
+ self._alert_on_extra = bool(alert_on_extra)
+ self._stop = threading.Event()
+ self._thread: threading.Thread | None = None
+ self._last_summary: dict[str, Any] | None = None
+
+ @property
+ def last_summary(self) -> dict[str, Any] | None:
+ return self._last_summary
+
+ def start(self) -> None:
+ """Arm the monitor. The first verification runs on the next tick."""
+ if self._thread is not None and self._thread.is_alive():
+ return
+ self._stop.clear()
+ thread = threading.Thread(target=self._run, name="fa-integrity-monitor", daemon=True)
+ thread.start()
+ self._thread = thread
+ file_automation_logger.info(
+ "integrity_monitor: watching %s against %s (interval=%.1fs)",
+ self._root,
+ self._manifest_path,
+ self._interval,
+ )
+
+ def stop(self, timeout: float = 5.0) -> None:
+ self._stop.set()
+ thread = self._thread
+ self._thread = None
+ if thread is not None and thread.is_alive():
+ thread.join(timeout=timeout)
+
+ def check_once(self) -> dict[str, Any]:
+ """Run one verification pass and return the summary."""
+ try:
+ summary = verify_manifest(self._root, self._manifest_path)
+ except (ManifestException, FileAutomationException) as err:
+ file_automation_logger.error("integrity_monitor: verify failed: %r", err)
+ summary = {
+ "matched": [],
+ "missing": [],
+ "modified": [],
+ "extra": [],
+ "ok": False,
+ "error": repr(err),
+ }
+ self._last_summary = summary
+ if self._is_drift(summary):
+ self._handle_drift(summary)
+ return summary
+
+ def _is_drift(self, summary: dict[str, Any]) -> bool:
+ if summary.get("error"):
+ return True
+ if summary.get("missing") or summary.get("modified"):
+ return True
+ return bool(self._alert_on_extra and summary.get("extra"))
+
+ def _handle_drift(self, summary: dict[str, Any]) -> None:
+ file_automation_logger.warning(
+ "integrity_monitor: drift detected missing=%d modified=%d extra=%d",
+ len(summary.get("missing") or []),
+ len(summary.get("modified") or []),
+ len(summary.get("extra") or []),
+ )
+ if self._on_drift is not None:
+ try:
+ self._on_drift(summary)
+ except FileAutomationException as err:
+ file_automation_logger.error("integrity_monitor: on_drift raised: %r", err)
+ body = _format_body(summary)
+ try:
+ self._manager.notify(
+ subject=f"integrity drift: {self._root}",
+ body=body,
+ level="error",
+ )
+ except FileAutomationException as err:
+ file_automation_logger.error("integrity_monitor: notify failed: %r", err)
+
+ def _run(self) -> None:
+ while not self._stop.is_set():
+ self._stop.wait(self._interval)
+ if self._stop.is_set():
+ break
+ self.check_once()
+
+
+def _format_body(summary: dict[str, Any]) -> str:
+ parts: list[str] = []
+ if summary.get("error"):
+ parts.append(f"error: {summary['error']}")
+ for key in ("missing", "modified", "extra"):
+ items = summary.get(key) or []
+ if items:
+ preview = ", ".join(items[:5])
+ suffix = f" (+{len(items) - 5} more)" if len(items) > 5 else ""
+ parts.append(f"{key}: {preview}{suffix}")
+ return "\n".join(parts) if parts else "no drift detected"
diff --git a/automation_file/core/manifest.py b/automation_file/core/manifest.py
new file mode 100644
index 0000000..be409dc
--- /dev/null
+++ b/automation_file/core/manifest.py
@@ -0,0 +1,183 @@
+"""Directory-tree manifests — JSON snapshot of every file's checksum.
+
+A manifest is a simple JSON document recording every file under a root,
+its size, and a streaming digest (SHA-256 by default). Two operations::
+
+ write_manifest(root, manifest_path) # snapshot now
+ verify_manifest(root, manifest_path) # verify the tree still matches
+
+Use cases: release-artifact verification, backup integrity checks, detecting
+tampering on a sync target, or pre-flight checks before a rename / move.
+
+The document shape is intentionally small and human-readable::
+
+ {
+ "version": 1,
+ "algorithm": "sha256",
+ "root": "/abs/path/at/snapshot/time",
+ "created_at": "2026-04-21T10:15:30+00:00",
+ "files": {
+ "a.txt": {"size": 3, "checksum": "..."},
+ "nested/b.txt": {"size": 1_024, "checksum": "..."}
+ }
+ }
+
+Paths in the ``files`` mapping are POSIX-style (forward-slash) relative
+paths so the manifest round-trips across Windows and Unix.
+"""
+
+from __future__ import annotations
+
+import datetime as dt
+import json
+import os
+from pathlib import Path
+from typing import Any
+
+from automation_file.core.checksum import file_checksum
+from automation_file.exceptions import DirNotExistsException, FileAutomationException
+from automation_file.logging_config import file_automation_logger
+
+_MANIFEST_VERSION = 1
+_DEFAULT_ALGO = "sha256"
+
+
+class ManifestException(FileAutomationException):
+ """Raised for invalid manifest documents or unreadable manifest paths."""
+
+
+def write_manifest(
+ root: str | os.PathLike[str],
+ manifest_path: str | os.PathLike[str],
+ *,
+ algorithm: str = _DEFAULT_ALGO,
+) -> dict[str, Any]:
+ """Write a manifest for every file under ``root``. Returns the manifest dict."""
+ root_path = Path(root)
+ if not root_path.is_dir():
+ raise DirNotExistsException(str(root_path))
+ files: dict[str, dict[str, Any]] = {}
+ for relative in _walk_files(root_path):
+ absolute = root_path / relative
+ files[_posix_rel(relative)] = {
+ "size": absolute.stat().st_size,
+ "checksum": file_checksum(absolute, algorithm=algorithm),
+ }
+ manifest: dict[str, Any] = {
+ "version": _MANIFEST_VERSION,
+ "algorithm": algorithm,
+ "root": str(root_path.resolve()),
+ "created_at": dt.datetime.now(dt.timezone.utc).isoformat(),
+ "files": files,
+ }
+ Path(manifest_path).parent.mkdir(parents=True, exist_ok=True)
+ Path(manifest_path).write_text(json.dumps(manifest, indent=2), encoding="utf-8")
+ file_automation_logger.info(
+ "write_manifest: %d files under %s -> %s", len(files), root_path, manifest_path
+ )
+ return manifest
+
+
+def verify_manifest(
+ root: str | os.PathLike[str],
+ manifest_path: str | os.PathLike[str],
+) -> dict[str, Any]:
+ """Verify every file recorded in ``manifest_path`` still matches under ``root``.
+
+ Returns a summary dict::
+
+ {
+ "matched": ["a.txt"],
+ "missing": ["gone.txt"],
+ "modified": ["changed.txt"],
+ "extra": ["new.txt"], # present under root, not in manifest
+ "ok": False,
+ }
+
+ ``ok`` is True iff ``missing`` and ``modified`` are both empty (extras
+ are reported but do not fail verification — mirror ``sync_dir``'s
+ default non-deleting posture).
+ """
+ root_path = Path(root)
+ if not root_path.is_dir():
+ raise DirNotExistsException(str(root_path))
+ manifest = _load_manifest(manifest_path)
+ algorithm = manifest.get("algorithm", _DEFAULT_ALGO)
+ recorded = manifest.get("files") or {}
+
+ summary: dict[str, Any] = {
+ "matched": [],
+ "missing": [],
+ "modified": [],
+ "extra": [],
+ "ok": False,
+ }
+
+ for rel, meta in recorded.items():
+ _verify_one(root_path, rel, meta, algorithm, summary)
+
+ recorded_keys = set(recorded.keys())
+ for rel_path in _walk_files(root_path):
+ posix = _posix_rel(rel_path)
+ if posix not in recorded_keys:
+ summary["extra"].append(posix)
+
+ summary["ok"] = not summary["missing"] and not summary["modified"]
+ file_automation_logger.info(
+ "verify_manifest: ok=%s matched=%d missing=%d modified=%d extra=%d",
+ summary["ok"],
+ len(summary["matched"]),
+ len(summary["missing"]),
+ len(summary["modified"]),
+ len(summary["extra"]),
+ )
+ return summary
+
+
+def _verify_one(
+ root: Path,
+ relative: str,
+ meta: dict[str, Any],
+ algorithm: str,
+ summary: dict[str, Any],
+) -> None:
+ target = root / relative
+ if not target.is_file():
+ summary["missing"].append(relative)
+ return
+ expected = meta.get("checksum")
+ size = meta.get("size")
+ if isinstance(size, int) and target.stat().st_size != size:
+ summary["modified"].append(relative)
+ return
+ if not isinstance(expected, str) or file_checksum(target, algorithm=algorithm) != expected:
+ summary["modified"].append(relative)
+ return
+ summary["matched"].append(relative)
+
+
+def _load_manifest(path: str | os.PathLike[str]) -> dict[str, Any]:
+ manifest_path = Path(path)
+ if not manifest_path.is_file():
+ raise ManifestException(f"manifest not found: {manifest_path}")
+ try:
+ data = json.loads(manifest_path.read_text(encoding="utf-8"))
+ except (OSError, json.JSONDecodeError) as err:
+ raise ManifestException(f"cannot read manifest {manifest_path}: {err}") from err
+ if not isinstance(data, dict) or "files" not in data:
+ raise ManifestException(f"manifest missing 'files' mapping: {manifest_path}")
+ return data
+
+
+def _walk_files(root: Path) -> list[Path]:
+ entries: list[Path] = []
+ for dirpath, dirnames, filenames in os.walk(root, followlinks=False):
+ dirnames.sort()
+ base = Path(dirpath)
+ for name in sorted(filenames):
+ entries.append((base / name).relative_to(root))
+ return entries
+
+
+def _posix_rel(relative: Path) -> str:
+ return str(relative).replace("\\", "/")
diff --git a/automation_file/core/metrics.py b/automation_file/core/metrics.py
new file mode 100644
index 0000000..f9d7c7f
--- /dev/null
+++ b/automation_file/core/metrics.py
@@ -0,0 +1,65 @@
+"""Prometheus metrics — per-action counters and duration histogram.
+
+The module exposes two metrics that are updated from
+:class:`~automation_file.core.action_executor.ActionExecutor` on every
+call:
+
+* ``automation_file_actions_total{action, status}`` — counter incremented
+ with ``status="ok"`` or ``status="error"`` per action.
+* ``automation_file_action_duration_seconds{action}`` — histogram of wall
+ time spent inside the registered callable.
+
+:func:`render` returns the wire-format text and matching ``Content-Type``
+suitable for a ``GET /metrics`` handler. :func:`record_action` is the
+single write path — failures are swallowed so a broken metrics backend
+can never abort a real action.
+"""
+
+from __future__ import annotations
+
+from prometheus_client import CONTENT_TYPE_LATEST, REGISTRY, Counter, Histogram, generate_latest
+
+from automation_file.logging_config import file_automation_logger
+
+_DURATION_BUCKETS = (
+ 0.005,
+ 0.01,
+ 0.025,
+ 0.05,
+ 0.1,
+ 0.25,
+ 0.5,
+ 1.0,
+ 2.5,
+ 5.0,
+ 10.0,
+ 30.0,
+ 60.0,
+)
+
+ACTION_COUNT = Counter(
+ "automation_file_actions_total",
+ "Total actions executed, partitioned by outcome.",
+ labelnames=("action", "status"),
+)
+ACTION_DURATION = Histogram(
+ "automation_file_action_duration_seconds",
+ "Time spent inside a registered action callable.",
+ labelnames=("action",),
+ buckets=_DURATION_BUCKETS,
+)
+
+
+def record_action(action: str, duration_seconds: float, ok: bool) -> None:
+ """Record one action execution. Never raises."""
+ status = "ok" if ok else "error"
+ try:
+ ACTION_COUNT.labels(action=action, status=status).inc()
+ ACTION_DURATION.labels(action=action).observe(max(0.0, float(duration_seconds)))
+ except Exception as err: # pragma: no cover - defensive
+ file_automation_logger.error("metrics.record_action failed: %r", err)
+
+
+def render() -> tuple[bytes, str]:
+ """Return ``(payload, content_type)`` for a ``/metrics`` response."""
+ return generate_latest(REGISTRY), CONTENT_TYPE_LATEST
diff --git a/automation_file/core/plugins.py b/automation_file/core/plugins.py
new file mode 100644
index 0000000..297d2d2
--- /dev/null
+++ b/automation_file/core/plugins.py
@@ -0,0 +1,86 @@
+"""Entry-point plugin discovery.
+
+Third-party packages can register additional actions with
+``automation_file`` without the library having to import them directly.
+
+A plugin advertises itself in its ``pyproject.toml``::
+
+ [project.entry-points."automation_file.actions"]
+ my_plugin = "my_plugin:register"
+
+where ``register`` is a zero-argument callable returning a
+``Mapping[str, Callable]`` — the same shape you would pass to
+:func:`automation_file.add_command_to_executor`.
+
+:func:`load_entry_point_plugins` is invoked by
+:func:`automation_file.core.action_registry.build_default_registry` so
+installed plugins populate every freshly-built registry automatically.
+Plugin failures are logged and swallowed — one broken plugin must not
+break the library for everyone else.
+"""
+
+from __future__ import annotations
+
+from collections.abc import Callable, Mapping
+from importlib.metadata import EntryPoint, entry_points
+from typing import Any
+
+from automation_file.logging_config import file_automation_logger
+
+__all__ = ["ENTRY_POINT_GROUP", "load_entry_point_plugins"]
+
+ENTRY_POINT_GROUP = "automation_file.actions"
+
+
+def load_entry_point_plugins(
+ register: Callable[[Mapping[str, Callable[..., Any]]], None],
+) -> int:
+ """Discover and register every ``automation_file.actions`` entry point.
+
+ ``register`` receives one ``{name: callable}`` mapping per plugin and
+ is responsible for storing it (typically
+ :meth:`ActionRegistry.register_many`). Returns the number of plugins
+ that registered successfully.
+ """
+ loaded = 0
+ for entry in _iter_entry_points():
+ try:
+ factory = entry.load()
+ except Exception as err: # pylint: disable=broad-except
+ file_automation_logger.error(
+ "plugin load failed: %s (%s): %r", entry.name, entry.value, err
+ )
+ continue
+ try:
+ mapping = factory()
+ except Exception as err: # pylint: disable=broad-except
+ file_automation_logger.error(
+ "plugin factory raised: %s (%s): %r", entry.name, entry.value, err
+ )
+ continue
+ if not isinstance(mapping, Mapping):
+ file_automation_logger.error(
+ "plugin %s returned %s, expected Mapping",
+ entry.name,
+ type(mapping).__name__,
+ )
+ continue
+ try:
+ register(mapping)
+ except Exception as err: # pylint: disable=broad-except
+ file_automation_logger.error("plugin register failed: %s: %r", entry.name, err)
+ continue
+ file_automation_logger.info(
+ "plugin registered: %s -> %d commands", entry.name, len(mapping)
+ )
+ loaded += 1
+ return loaded
+
+
+def _iter_entry_points() -> list[EntryPoint]:
+ try:
+ return list(entry_points(group=ENTRY_POINT_GROUP))
+ except TypeError:
+ # importlib.metadata before 3.10 used a different API; the project
+ # targets 3.10+, so this branch exists only as defensive padding.
+ return list(entry_points().get(ENTRY_POINT_GROUP, []))
diff --git a/automation_file/core/progress.py b/automation_file/core/progress.py
new file mode 100644
index 0000000..92ea7c4
--- /dev/null
+++ b/automation_file/core/progress.py
@@ -0,0 +1,161 @@
+"""Transfer progress + cancellation primitives.
+
+Long-running transfers (HTTP downloads, S3 uploads/downloads, …) accept a
+named handle from the shared :data:`progress_registry`. The registry keeps a
+:class:`ProgressReporter` (bytes transferred, optional total) and a
+:class:`CancellationToken` per name so the GUI or a JSON action can observe
+progress or cancel mid-flight.
+
+Instrumentation is opt-in: callers pass ``progress_name="