diff --git a/skills/byted-arkclaw-call-records-simulate/.gitignore b/skills/byted-arkclaw-call-records-simulate/.gitignore new file mode 100644 index 00000000..b500fd9b --- /dev/null +++ b/skills/byted-arkclaw-call-records-simulate/.gitignore @@ -0,0 +1,5 @@ +tts_env/ +output/ +__pycache__/ +*.pyc +.DS_Store diff --git a/skills/byted-arkclaw-call-records-simulate/SKILL.md b/skills/byted-arkclaw-call-records-simulate/SKILL.md new file mode 100644 index 00000000..655fb1a5 --- /dev/null +++ b/skills/byted-arkclaw-call-records-simulate/SKILL.md @@ -0,0 +1,178 @@ +--- +name: byted-arkclaw-call-records-simulate +version: "1.0.0" +description: "通话记录模拟与语音合成技能。根据自然语言需求或结构化场景生成对话 JSON,并调用 edge-tts 合成为 MP3。适用于需要构造招聘邀约、电话销售、面试邀约等模拟通话录音,或为 ASR 测试和演示准备语音样本时。" +--- + +# 通话记录模拟(`byted-arkclaw-call-records-simulate`) + +根据调用者的自然语言提示,构建结构化的**通话记录 JSON**,再调用 `edge-tts` 将对话渲染为单个合成的 MP3 音频文件,用于 Demo、ASR 测试、语音数据集构造等场景。 + +## 输入与输出 + +### 输入 + +- 业务场景 +- 主叫 / 被叫角色设定 +- 预期通话结果 +- 轮次、时长、语气等补充要求 +- 若需复用已有候选人信息,只向调用者索要必要字段或文件路径,不要求先把资料复制到当前 skill 目录 + +### 输出 + +- `materials/*.json`:结构化通话脚本 +- `output/*.mp3`:合成后的模拟录音 +- 若已知候选人邮箱,可在 `materials/*.json` 中保留 `candidate_email` 字段,供后续复试邮件邀约链路使用 + +## 在总流程中的位置 + +- 测试与演示阶段:用于模拟招聘电话、验证 ASR 与 CRM 链路 +- 不负责候选人正式评估与 CRM 决策,只负责生成可控的模拟通话样本 + +## 设计模式 + +本 skill 主要采用: +- **Prompt → Structured Data**:将自然语言提示转换为标准化的 `materials/*.json` 通话记录 +- **Tool Wrapper**:封装 `edge-tts` Python SDK 的异步流式接口 +- **Pipeline**:`构建 JSON` → `校验` → `TTS 合成` → `输出 MP3` + +## 核心脚本与配置 + +所有功能脚本位于 `scripts/` 目录: +- `scripts/env_init.sh`:环境初始化(创建虚拟环境、安装 `edge-tts`) +- `scripts/generate_record.py`:根据用户提示生成通话记录 JSON 骨架(支持交互式/命令行两种模式) +- `scripts/tts_processor.py`:读取 JSON 素材,按角色逐句调用 `edge-tts` 合成,并拼接为单个 MP3 + +### 目录约定 + +``` +byted-arkclaw-call-records-simulate/ +├── SKILL.md +├── checklist.md +├── evals/ +│ └── evals.json +├── references/ +│ └── voices.md # 可用语音模型清单 +├── scripts/ +│ ├── env_init.sh +│ ├── generate_record.py +│ └── tts_processor.py +├── materials/ # 通话记录 JSON 素材 +│ ├── interview_accept.json +│ └── interview_rejection.json +└── output/ # 合成后的 MP3 输出目录 +``` + +## Gotchas + +- **网络依赖**:`edge-tts` 需访问 Microsoft Edge 在线 TTS 服务,离线或代理受限环境会失败(表现为 `WebSocket 403/无法连接`)。 +- **声音区分**:同一段对话中,**不同角色务必使用不同的 voice**,否则 ASR 的 speaker diarization 无法区分说话人。推荐男女声组合(如 `zh-CN-YunxiNeural` + `zh-CN-XiaoxiaoNeural`)。 +- **文本合规**:不得在合成音频中编造真实姓名、真实电话号码、真实公司内部信息;应使用化名 + 脱敏号码(如 `138****1234`)。 +- **拼接方式**:本 skill 采用**逐句追加二进制流**的方式拼接 MP3(`edge-tts` 输出为单一 codec 的 MP3 片段,直接 `bytes` 拼接即可被播放器解码)。若需严格的无缝编辑,请改用 `ffmpeg concat`。 + +## 工作流(严格按步骤执行) + +复制此清单并跟踪进度: + +```text +执行进度: +- [ ] Step 0: 前置检查 +- [ ] Step 1: 环境初始化 +- [ ] Step 2: 解析用户提示 → 生成通话记录 JSON +- [ ] Step 3: 用户确认 JSON +- [ ] Step 4: 调用 edge-tts 合成音频 +- [ ] Step 5: 结果呈现 +``` + +### Step 0: 前置检查(⚠️ 必须在第一轮对话中完成) + +1. **网络**:确认当前环境能访问 `speech.platform.bing.com`(edge-tts 后端),否则立即提醒用户需联网/切代理。 +2. **Python**:要求 `python3 ≥ 3.9`。 +3. **明确关键信息**:若用户提示缺少以下任一项,必须追问: + - 业务场景(猎头邀约 / 催收 / 售后回访 / 客服咨询 / 面试初筛 …) + - 主叫 / 被叫角色设定(性别、身份、姓氏) + - 预期结果(接受 / 拒绝 / 待定 / 投诉 …) + - 预计时长或对话轮次(默认 8–12 轮,约 1 分钟) +4. **输出文件名**:最终音频统一使用 `虚拟手机号-被叫人标识.mp3` 命名,如 `13111111111-陈先生.mp3`、`13999999999-刘女士.mp3`;若没有明确名字,则按性别退化为 `女士` / `先生`。 + +### Step 1: 环境初始化 + +```bash +source "$(dirname "$0")/scripts/env_init.sh" +``` + +该脚本会: +- 在 skill 根目录创建/复用 `tts_env/` 虚拟环境 +- 安装 / 校验 `edge-tts` 依赖 +- 导出 `CALL_SIM_WORKDIR` 指向 skill 根目录 + +### Step 2: 解析用户提示 → 生成通话记录 JSON + +调用 `generate_record.py` 将结构化参数落盘为 `materials/.json`: + +```bash +python scripts/generate_record.py \ + --name "FDE 工程师面试邀约(接受版)" \ + --scenario interview_invite \ + --outcome accept \ + --caller "猎头(张):zh-CN-XiaoxiaoNeural" \ + --callee "候选人(陈):zh-CN-YunxiNeural" \ + --candidate-email "chen@example.com" \ + --duration "约1分钟" \ + --out materials/fde_interview_accept.json +``` + +**Agent 责任**:根据用户自然语言提示,构造符合以下 schema 的对话内容并写入 `conversations` 字段。允许 Agent 在 `generate_record.py` 产出的骨架基础上,通过编辑 JSON 注入具体台词(推荐:先跑一次脚本生成骨架,再用文本编辑写入台词)。 + +**通话记录 JSON Schema**: +```json +{ + "name": "对话名称", + "duration": "预计时长", + "output_file": "虚拟手机号-被叫人标识.mp3(如 13111111111-陈先生.mp3)", + "scenario": "场景标签(可选)", + "outcome": "accept | reject | pending | complaint | ...(可选)", + "candidate_email": "候选人邮箱(可选,用于后续邮件邀约)", + "conversations": [ + { + "role": "角色名(如 猎头(张))", + "text": "具体台词", + "voice": "zh-CN-XiaoxiaoNeural" + } + ] +} +``` + +### Step 3: 用户确认 JSON(⚠️ 必须获得用户确认) + +在生成 JSON 之后、合成音频之前,**必须**将 JSON 主要内容(至少 name / duration / conversations 轮次与台词摘要)回显给用户,并明确暂停等待确认。得到"继续 / 确认 / OK"类指令后才能进入合成步骤。 + +### Step 4: 调用 edge-tts 合成音频 + +```bash +python scripts/tts_processor.py \ + --material materials/fde_interview_accept.json \ + --output ./output +``` + +脚本将: +1. 读取 JSON +2. 对每个 `conversations[i]` 调用 `edge_tts.Communicate(text, voice).stream()` +3. 逐句追加拼接为完整 MP3,写入 `output/<虚拟手机号>-<被叫人标识>.mp3` + +### Step 5: 结果呈现 + +向用户输出: +- 通话记录 JSON 的路径(`materials/.json`) +- 合成音频路径(`output/<虚拟手机号>-<被叫人标识>.mp3`) +- 总对话轮次、预计时长、使用的 voice 列表 +- 提醒下游可配合 `byted-arkclaw-local-batch-asr` 等 ASR skill 做回环测试 + +## 审查标准 + +执行完成后,Agent 应自检: +1. `materials/.json` 是否符合 schema,`conversations` 非空且每轮都有 `role / text / voice` +2. 是否已让用户确认 JSON 内容后再发起合成 +3. `output/<虚拟手机号>-<被叫人标识>.mp3` 是否成功生成且可用播放器播放 +4. 文案是否避免真实姓名、真实电话、敏感信息 +5. 不同角色是否使用了不同的 voice diff --git a/skills/byted-arkclaw-call-records-simulate/checklist.md b/skills/byted-arkclaw-call-records-simulate/checklist.md new file mode 100644 index 00000000..fa712be4 --- /dev/null +++ b/skills/byted-arkclaw-call-records-simulate/checklist.md @@ -0,0 +1,11 @@ +# byted-arkclaw-call-records-simulate 自检清单(Skill Hub) + +- SKILL.md 顶部包含 YAML frontmatter(name / version / description) +- description 用英文关键词覆盖通话记录 / TTS / 合成 / 猎头 / 催收 / 售后 / ASR 测试等场景 +- scripts/ 下同时包含 `env_init.sh`、`generate_record.py`、`tts_processor.py` +- `tts_processor.py` 对 JSON 做 schema 校验(`name` / `output_file` / 非空 `conversations`、每轮含 `role/text/voice`) +- 工作流中强制"先生成 JSON → 用户确认 → 再合成音频"两步走 +- references/voices.md 说明男女搭配原则,避免同 voice 无法区分说话人 +- evals/evals.json 至少覆盖:面试邀约、催收、ASR 测试素材、直接合成已有素材 4 类 +- 不在文档或代码中编造真实姓名、真实号码、敏感信息 +- skill 目录内不提交 `tts_env/`、`output/*.mp3`、`__pycache__/`、`.DS_Store` 等生成物(通过 .gitignore) diff --git a/skills/byted-arkclaw-call-records-simulate/references/voices.md b/skills/byted-arkclaw-call-records-simulate/references/voices.md new file mode 100644 index 00000000..476fec8e --- /dev/null +++ b/skills/byted-arkclaw-call-records-simulate/references/voices.md @@ -0,0 +1,28 @@ +# edge-tts 常用中文语音模型 + +> 完整列表可通过 `edge-tts --list-voices | grep zh-` 获取。以下为通话记录场景最常用的几个,按"主叫 / 被叫"搭配推荐。 + +## 普通话(zh-CN) + +| voice | 性别 | 音色特征 | 推荐角色 | +|-------|------|----------|----------| +| `zh-CN-XiaoxiaoNeural` | 女 | 标准、亲和,偏客服/猎头 | 主叫(销售 / 猎头 / 客服) | +| `zh-CN-XiaoyiNeural` | 女 | 温柔、偏年轻 | 被叫(候选人 / 客户) | +| `zh-CN-YunxiNeural` | 男 | 标准、沉稳 | 主叫 / 被叫(商务) | +| `zh-CN-YunyangNeural` | 男 | 浑厚、偏新闻播报 | 主叫(外呼通知 / 政务) | +| `zh-CN-YunjianNeural` | 男 | 自然、略松弛 | 被叫(技术候选人 / 客户) | +| `zh-CN-XiaochenNeural` | 女 | 清亮、利落 | 主叫(信贷 / 催收) | + +## 其他中文 + +| voice | 说明 | +|-------|------| +| `zh-HK-HiuGaaiNeural` / `zh-HK-WanLungNeural` | 粤语男女声 | +| `zh-TW-HsiaoChenNeural` / `zh-TW-YunJheNeural` | 台湾国语男女声 | + +## 搭配建议 + +- **男女组合更利于 ASR 说话人分离**:主叫 / 被叫各取一方性别。 +- **同性别搭配**:若必须同性别,至少在 voice 上区分(例如 `XiaoxiaoNeural` + `XiaoyiNeural`),避免让下游 speaker diarization 误判为同一人。 +- **严肃场景**:使用 `YunyangNeural` / `YunxiNeural` 主叫,语感更权威。 +- **轻松场景**:使用 `XiaoyiNeural` / `YunjianNeural`,语感更日常。 diff --git a/skills/byted-arkclaw-call-records-simulate/scripts/env_init.sh b/skills/byted-arkclaw-call-records-simulate/scripts/env_init.sh new file mode 100755 index 00000000..94592268 --- /dev/null +++ b/skills/byted-arkclaw-call-records-simulate/scripts/env_init.sh @@ -0,0 +1,53 @@ +#!/bin/bash + +# ============================================================================== +# byted-arkclaw-call-records-simulate 环境初始化脚本 +# 用法: +# 方式 A(推荐,确保在 skill 根目录执行): +# cd path/to/byted-arkclaw-call-records-simulate && source scripts/env_init.sh +# 方式 B: +# source path/to/byted-arkclaw-call-records-simulate/scripts/env_init.sh +# ============================================================================== + +# 兼容性地解析脚本自身目录: +# 优先使用 BASH_SOURCE,退化到 $0,最后退到当前目录。 +_src="${BASH_SOURCE[0]:-$0}" +if [ -z "$_src" ] || [ "$_src" = "bash" ] || [ "$_src" = "-bash" ]; then + # 被 source 且拿不到路径时,假定 PWD 即为 skill 根目录 + SKILL_ROOT="$(pwd)" +else + SCRIPT_DIR="$(cd "$(dirname "$_src")" && pwd)" + SKILL_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" +fi + +# 如果解析出来的 SKILL_ROOT 里没有 SKILL.md,则回退到 PWD +if [ ! -f "${SKILL_ROOT}/SKILL.md" ] && [ -f "$(pwd)/SKILL.md" ]; then + SKILL_ROOT="$(pwd)" +fi + +echo "📁 SKILL_ROOT=${SKILL_ROOT}" + +# 1. 创建/复用虚拟环境 +if [ ! -d "${SKILL_ROOT}/tts_env" ]; then + echo "📦 首次初始化,创建虚拟环境 ${SKILL_ROOT}/tts_env ..." + python3 -m venv "${SKILL_ROOT}/tts_env" +fi + +# 2. 激活虚拟环境 +# shellcheck disable=SC1091 +source "${SKILL_ROOT}/tts_env/bin/activate" + +# 3. 校验/安装依赖 +if ! python -c "import edge_tts" >/dev/null 2>&1; then + echo "📦 安装 edge-tts ..." + pip install --quiet --upgrade pip + pip install --quiet edge-tts +fi + +# 4. 导出工作目录 & 准备目录 +export CALL_SIM_WORKDIR="${SKILL_ROOT}" +mkdir -p "${SKILL_ROOT}/materials" "${SKILL_ROOT}/output" + +echo "✅ 环境就绪: CALL_SIM_WORKDIR=${CALL_SIM_WORKDIR}" +echo " - Python: $(python --version)" +echo " - edge-tts: $(python -c 'import edge_tts, sys; print(edge_tts.__version__)' 2>/dev/null || echo 'installed')" diff --git a/skills/byted-arkclaw-call-records-simulate/scripts/generate_record.py b/skills/byted-arkclaw-call-records-simulate/scripts/generate_record.py new file mode 100755 index 00000000..6fa01c0e --- /dev/null +++ b/skills/byted-arkclaw-call-records-simulate/scripts/generate_record.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""根据用户提示生成通话记录 JSON 骨架。 + +两种使用方式: + +1. 命令行直连(推荐给 Agent 使用): + + python scripts/generate_record.py \ + --name "FDE 面试邀约" \ + --scenario interview_invite \ + --outcome accept \ + --caller "猎头(张):zh-CN-XiaoxiaoNeural" \ + --callee "候选人(陈):zh-CN-YunxiNeural" \ + --duration "约1分钟" \ + --output-file 13111111111-陈先生.mp3 \ + --turns 10 \ + --out materials/fde_interview_accept.json + + 脚本会产出带占位台词(``)的 JSON 骨架,Agent 负责再写入实际台词。 + +2. 交互式: + + python scripts/generate_record.py --interactive --out materials/my.json +""" + +import argparse +import json +import random +import re +import sys +from pathlib import Path + + +VALID_OUTCOMES = {"accept", "reject", "pending", "complaint", "followup", "other"} +PHONE_PREFIXES = ("13", "15", "17", "18", "19") +FEMALE_VOICE_HINTS = ("Xiaoxiao", "Xiaoyi", "Xiaochen", "Xiaohan", "Xiaomeng") +MALE_VOICE_HINTS = ("Yunxi", "Yunyang", "Yunjian", "Yunhao", "Yunze") + + +def _parse_role_voice(raw: str, role_name: str) -> dict: + """解析 "角色名:voice" 字符串。""" + if ":" not in raw: + raise ValueError( + f"--{role_name} 需要形如 '角色名:zh-CN-XiaoxiaoNeural' 的格式,收到: {raw}" + ) + role, voice = raw.split(":", 1) + role = role.strip() + voice = voice.strip() + if not role or not voice: + raise ValueError(f"--{role_name} 角色名与 voice 均不能为空: {raw}") + return {"role": role, "voice": voice} + + +def _generate_virtual_phone_filename() -> str: + """生成 11 位虚拟手机号。""" + prefix = random.choice(PHONE_PREFIXES) + suffix = "".join(random.choices("0123456789", k=9)) + return f"{prefix}{suffix}" + + +def _infer_gender(callee_role: str, callee_voice: str) -> str: + """根据角色名和 voice 推断性别,返回 female/male/unknown。""" + role = callee_role.strip() + if re.search(r"(女士|小姐|女生|女性|女)", role): + return "female" + if re.search(r"(先生|男生|男性|男)", role): + return "male" + + for hint in FEMALE_VOICE_HINTS: + if hint in callee_voice: + return "female" + for hint in MALE_VOICE_HINTS: + if hint in callee_voice: + return "male" + return "unknown" + + +def _extract_callee_name(callee_role: str, callee_voice: str) -> str: + """从被叫角色中提取用于文件名的姓名标签。""" + role = callee_role.strip() + gender = _infer_gender(callee_role, callee_voice) + match = re.search(r"[((]([^))]+)[))]", role) + if match: + label = re.sub(r"\s+", "", match.group(1)) + else: + # 没有括号时,尽量去掉常见身份前缀,保留剩余姓名部分 + label = re.sub( + r"^(被叫|候选人|客户|用户|联系人|面试者|接听人|对方)[::\s-]*", + "", + role, + ).strip() + label = re.sub(r"\s+", "", label) + + if re.search(r"(女士|先生|小姐)$", label): + return label + if re.fullmatch(r"[\u4e00-\u9fff]{1,2}", label): + if gender == "female": + return f"{label}女士" + if gender == "male": + return f"{label}先生" + return label + if label: + return label + if gender == "female": + return "女士" + if gender == "male": + return "先生" + return "未知" + + +def _normalize_output_file(output_file: str | None, callee_role: str, callee_voice: str) -> str: + """将输出文件名规范为 `手机号-被叫人姓名.mp3`。""" + callee_name = _extract_callee_name(callee_role, callee_voice) + if output_file: + file_name = Path(output_file).name + if re.fullmatch(rf"1\d{{10}}-{re.escape(callee_name)}\.mp3", file_name): + return file_name + return f"{_generate_virtual_phone_filename()}-{callee_name}.mp3" + + +def build_skeleton( + name: str, + scenario: str, + outcome: str, + caller: dict, + callee: dict, + duration: str, + output_file: str, + turns: int, + candidate_email: str = "", +) -> dict: + """构造通话记录骨架,caller/callee 交替发言。""" + if turns < 2: + raise ValueError("turns 至少为 2") + if outcome not in VALID_OUTCOMES: + raise ValueError(f"outcome 必须属于 {sorted(VALID_OUTCOMES)},收到: {outcome}") + + conversations = [] + for i in range(turns): + speaker = caller if i % 2 == 0 else callee + conversations.append( + { + "role": speaker["role"], + "text": f"", + "voice": speaker["voice"], + } + ) + + material = { + "name": name, + "duration": duration, + "output_file": _normalize_output_file( + output_file, callee["role"], callee["voice"] + ), + "scenario": scenario, + "outcome": outcome, + "conversations": conversations, + } + if candidate_email: + material["candidate_email"] = candidate_email.strip() + return material + + +def _interactive() -> dict: + print("== 进入交互式生成模式 ==") + name = input("对话名称: ").strip() + scenario = input("场景标签 (如 interview_invite / collection / aftersales): ").strip() or "general" + outcome = input(f"预期结果 {sorted(VALID_OUTCOMES)}: ").strip() or "other" + caller_raw = input("主叫 '角色名:voice' (如 猎头(张):zh-CN-XiaoxiaoNeural): ").strip() + callee_raw = input("被叫 '角色名:voice' (如 候选人(陈):zh-CN-YunxiNeural): ").strip() + duration = input("预计时长 (默认 约1分钟): ").strip() or "约1分钟" + output_file = input("输出文件名(默认自动生成,如 13111111111-陈.mp3): ").strip() + candidate_email = input("候选人邮箱(可选,用于后续邮件邀约): ").strip() + turns_raw = input("对话轮次 (默认 10): ").strip() or "10" + + return build_skeleton( + name=name, + scenario=scenario, + outcome=outcome, + caller=_parse_role_voice(caller_raw, "caller"), + callee=_parse_role_voice(callee_raw, "callee"), + duration=duration, + output_file=output_file, + turns=int(turns_raw), + candidate_email=candidate_email, + ) + + +def main() -> int: + parser = argparse.ArgumentParser(description="通话记录 JSON 骨架生成器") + parser.add_argument("--interactive", action="store_true", help="进入交互式输入") + parser.add_argument("--name", help="对话名称") + parser.add_argument("--scenario", default="general", help="场景标签") + parser.add_argument( + "--outcome", + default="other", + choices=sorted(VALID_OUTCOMES), + help="预期结果", + ) + parser.add_argument("--caller", help="主叫 '角色名:voice'") + parser.add_argument("--callee", help="被叫 '角色名:voice'") + parser.add_argument("--duration", default="约1分钟", help="预计时长") + parser.add_argument("--output-file", help="输出 mp3 文件名") + parser.add_argument("--candidate-email", default="", help="候选人邮箱(可选,用于后续邮件邀约)") + parser.add_argument("--turns", type=int, default=10, help="对话轮次(偶数更均衡)") + parser.add_argument("--out", required=True, help="输出 JSON 路径") + args = parser.parse_args() + + try: + if args.interactive: + material = _interactive() + else: + missing = [k for k in ("name", "caller", "callee") if not getattr(args, k.replace("-", "_"), None)] + if missing: + parser.error(f"非交互式模式下必须提供: --{', --'.join(missing)}") + material = build_skeleton( + name=args.name, + scenario=args.scenario, + outcome=args.outcome, + caller=_parse_role_voice(args.caller, "caller"), + callee=_parse_role_voice(args.callee, "callee"), + duration=args.duration, + output_file=args.output_file, + turns=args.turns, + candidate_email=args.candidate_email, + ) + except ValueError as e: + print(f"❌ 参数错误: {e}", file=sys.stderr) + return 2 + + out_path = Path(args.out) + out_path.parent.mkdir(parents=True, exist_ok=True) + with open(out_path, "w", encoding="utf-8") as f: + json.dump(material, f, ensure_ascii=False, indent=2) + print(f"✅ 通话记录骨架已写入: {out_path}") + print(f"📞 输出音频文件名: {material['output_file']}") + print("👉 下一步: 手动/Agent 填充 conversations[*].text 中的 占位台词") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/skills/byted-arkclaw-call-records-simulate/scripts/tts_processor.py b/skills/byted-arkclaw-call-records-simulate/scripts/tts_processor.py new file mode 100755 index 00000000..f7d0a4a4 --- /dev/null +++ b/skills/byted-arkclaw-call-records-simulate/scripts/tts_processor.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""基于 edge-tts 的通话记录合成工具。 + +读取符合 schema 的通话记录 JSON,按 conversations 顺序逐句合成 MP3 并拼接为单一文件。 + +用法: + python scripts/tts_processor.py --material materials/xxx.json [--output ./output] +""" + +import argparse +import asyncio +import json +import random +import re +import sys +from pathlib import Path + +import edge_tts + + +REQUIRED_CONV_KEYS = {"role", "text", "voice"} +REQUIRED_TOP_KEYS = {"name", "output_file", "conversations"} +PHONE_PREFIXES = ("13", "15", "17", "18", "19") +FEMALE_VOICE_HINTS = ("Xiaoxiao", "Xiaoyi", "Xiaochen", "Xiaohan", "Xiaomeng") +MALE_VOICE_HINTS = ("Yunxi", "Yunyang", "Yunjian", "Yunhao", "Yunze") + + +def _validate_material(material: dict, json_path: Path) -> None: + missing = REQUIRED_TOP_KEYS - material.keys() + if missing: + raise ValueError(f"素材 {json_path} 缺少顶层字段: {sorted(missing)}") + convs = material["conversations"] + if not isinstance(convs, list) or not convs: + raise ValueError(f"素材 {json_path} 的 conversations 必须为非空列表") + for idx, conv in enumerate(convs): + miss = REQUIRED_CONV_KEYS - conv.keys() + if miss: + raise ValueError( + f"素材 {json_path} conversations[{idx}] 缺少字段: {sorted(miss)}" + ) + if not conv["text"].strip(): + raise ValueError(f"素材 {json_path} conversations[{idx}].text 为空") + + +def _generate_virtual_phone_filename() -> str: + prefix = random.choice(PHONE_PREFIXES) + suffix = "".join(random.choices("0123456789", k=9)) + return f"{prefix}{suffix}" + + +def _infer_gender(role: str, voice: str) -> str: + if re.search(r"(女士|小姐|女生|女性|女)", role): + return "female" + if re.search(r"(先生|男生|男性|男)", role): + return "male" + + for hint in FEMALE_VOICE_HINTS: + if hint in voice: + return "female" + for hint in MALE_VOICE_HINTS: + if hint in voice: + return "male" + return "unknown" + + +def _extract_callee_name(material: dict) -> str: + conversations = material.get("conversations", []) + if len(conversations) >= 2: + callee = conversations[1] + elif conversations: + callee = conversations[-1] + else: + callee = {} + + role = callee.get("role", "").strip() + voice = callee.get("voice", "") + gender = _infer_gender(role, voice) + + match = re.search(r"[((]([^))]+)[))]", role) + if match: + label = re.sub(r"\s+", "", match.group(1)) + else: + label = re.sub( + r"^(被叫|候选人|客户|用户|联系人|面试者|接听人|对方)[::\s-]*", + "", + role, + ).strip() + label = re.sub(r"\s+", "", label) + + if re.search(r"(女士|先生|小姐)$", label): + return label + if re.fullmatch(r"[\u4e00-\u9fff]{1,2}", label): + if gender == "female": + return f"{label}女士" + if gender == "male": + return f"{label}先生" + return label + if label: + return label + if gender == "female": + return "女士" + if gender == "male": + return "先生" + return "未知" + + +def _normalize_output_file(output_file: str, material: dict) -> str: + file_name = Path(output_file).name + callee_name = _extract_callee_name(material) + if re.fullmatch(rf"1\d{{10}}-{re.escape(callee_name)}\.mp3", file_name): + return file_name + return f"{_generate_virtual_phone_filename()}-{callee_name}.mp3" + + +async def generate_audio_from_json(json_path: str, output_dir: str = "./output") -> str: + """从通话记录 JSON 合成 MP3 并返回输出路径。""" + json_file = Path(json_path) + with open(json_file, "r", encoding="utf-8") as f: + material = json.load(f) + + _validate_material(material, json_file) + + print(f"📖 正在处理素材: {material['name']}") + if material.get("duration"): + print(f"⏱️ 预计时长: {material['duration']}") + if material.get("scenario"): + print(f"🏷️ 场景: {material['scenario']} 结果: {material.get('outcome', 'n/a')}") + + final_audio = bytearray() + total = len(material["conversations"]) + for i, conv in enumerate(material["conversations"], start=1): + print(f"🎙️ [{i}/{total}] {conv['role']} ({conv['voice']}) ...") + communicate = edge_tts.Communicate(conv["text"], conv["voice"]) + async for chunk in communicate.stream(): + if chunk["type"] == "audio": + final_audio.extend(chunk["data"]) + + output_dir_path = Path(output_dir) + output_dir_path.mkdir(parents=True, exist_ok=True) + normalized_output_name = _normalize_output_file(material["output_file"], material) + if normalized_output_name != material["output_file"]: + print(f"📞 输出文件名已规范为虚拟手机号: {normalized_output_name}") + output_file = output_dir_path / normalized_output_name + with open(output_file, "wb") as f: + f.write(bytes(final_audio)) + + size_kb = output_file.stat().st_size / 1024 + print(f"\n✅ 完成!音频文件已保存为: {output_file} ({size_kb:.1f} KB)") + return str(output_file) + + +def main() -> int: + parser = argparse.ArgumentParser(description="通用通话记录 TTS 合成工具 (edge-tts)") + parser.add_argument("--material", "-m", required=True, help="通话记录 JSON 路径") + parser.add_argument("--output", "-o", default="./output", help="音频输出目录") + args = parser.parse_args() + + try: + asyncio.run(generate_audio_from_json(args.material, args.output)) + except FileNotFoundError as e: + print(f"❌ 文件不存在: {e}", file=sys.stderr) + return 2 + except ValueError as e: + print(f"❌ 素材校验失败: {e}", file=sys.stderr) + return 3 + except Exception as e: # noqa: BLE001 + print(f"❌ 合成失败: {e}", file=sys.stderr) + return 1 + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/skills/byted-arkclaw-jd-resume-match/.gitignore b/skills/byted-arkclaw-jd-resume-match/.gitignore new file mode 100644 index 00000000..bd9474bf --- /dev/null +++ b/skills/byted-arkclaw-jd-resume-match/.gitignore @@ -0,0 +1,5 @@ +.venv/ +output/ +__pycache__/ +*.pyc +.DS_Store diff --git a/skills/byted-arkclaw-jd-resume-match/SKILL.md b/skills/byted-arkclaw-jd-resume-match/SKILL.md new file mode 100644 index 00000000..4e833e34 --- /dev/null +++ b/skills/byted-arkclaw-jd-resume-match/SKILL.md @@ -0,0 +1,159 @@ +--- +name: byted-arkclaw-jd-resume-match +version: "1.0.0" +description: "对 1 个 JD PDF 与多位候选人简历做批量文本抽取和人岗匹配评估,并可结合 ASR 转写更新画像后写入 CRM。适用于招聘初筛建档和通话后复评。" +--- + +# JD 与简历匹配评估(`byted-arkclaw-jd-resume-match`) + +这个 skill 支持两个原子场景: +- **批量初筛**:`1 个 JD PDF + 多位候选人简历 PDF` +- **单候选人复评**:`1 个 JD PDF + 1 份简历 PDF + 1 份通话转写结果` + +它先用本地脚本对 PDF 做文本抽取,再把 JD、简历、转写结果整理为统一 `bundle.json`。随后由 AI 对候选人与 JD 的匹配度、优劣势、是否建议进入下一步电话沟通进行总结,形成可审查的结构化 `assessment.json`,最后写入 `byted-arkclaw-local-hr-crm`。 + +## 全局唯一 Key + +- 候选人必须以**电话号码**作为全局唯一 key +- 初筛建档和通话后补充录入都必须关联到同一个电话号码 +- 初筛建档阶段,`prepare_match_bundle.py` 必须从**简历 PDF 转文本结果**中识别手机号 +- `--phone-source`、ASR `source`、文件名只作为辅助线索,不是初筛建档的主来源 +- 若任一候选人无法从简历文本中解析出手机号,分析包生成应直接失败,而不是生成无法入库的候选人记录 + +## 核心脚本 + +- `scripts/env_init.sh`:初始化 Python 环境并安装 PDF 文本抽取依赖 +- `scripts/extract_pdf_text.py`:对单个 PDF 做文本抽取,优先直接提取,必要时走 OCR 回退 +- `scripts/prepare_match_bundle.py`:整合 `1 个 JD + 多份简历 + 可选转写结果`,输出统一分析包 +- `scripts/upsert_crm_profile.py`:把 AI 产出的单候选人或多候选人结构化画像写入 `byted-arkclaw-local-hr-crm` + +## 输入与输出 + +### 输入 + +- 必填:`1 个 JD PDF` +- 初筛模式:`多位候选人简历 PDF` +- 复评模式:`单候选人简历 PDF + 通话录音 ASR 转写结果` + +## 文件来源规则 + +- JD、简历、转写文件的位置由调用者提供 +- 只需要向调用者索要文件路径,并直接使用这些路径 +- 不要求调用者先把文件上传、复制或重命名到 skill 目录下 +- 只有在调用者明确要求落地中间产物时,才在当前 skill 下写 `output/` + +### 输出 + +- `bundle.json`:批量或单候选人的统一分析输入包 +- `assessment.json`:AI 产出的结构化评估结果 +- CRM 入库结果:候选人初筛结论或最终复评结果 + +## 在总流程中的位置 + +- 初筛阶段:负责把 `JD + 多份简历` 转为结构化候选人初筛结果,并写入 CRM 初步建档 +- 通话后阶段:负责结合 `JD + 简历 + 录音转写` 更新同一候选人的匹配结论、优劣势和最终建议 +- 两个阶段都必须复用同一个手机号,确保更新的是同一候选人档案 + +## 触发条件 + +在以下场景调用本 skill: +- 用户提供岗位 JD PDF 和候选人简历 PDF,希望评估匹配度 +- 用户提供 `1 个 JD PDF + 多位候选人简历 PDF`,希望做批量初筛 +- 用户希望结合通话录音识别结果,形成完整候选人画像 +- 用户希望把评估结论、依据和转写原文写入 CRM 供后续审查 + +## 工作流 + +### Step 1: 初始化环境 + +```bash +cd byted-arkclaw-jd-resume-match +source ./scripts/env_init.sh +``` + +### Step 2: 生成分析输入包 + +批量初筛: + +```bash +python ./scripts/prepare_match_bundle.py \ + --jd-pdf \ + --resume-dir \ + --output ./output/resume_screen_bundle.json \ + --screening-stage resume_screened +``` + +单候选人复评: + +```bash +python ./scripts/prepare_match_bundle.py \ + --jd-pdf \ + --resume-pdf \ + --transcript \ + --phone-source 13999999999-刘女士.mp3 \ + --output ./output/call_review_bundle.json \ + --screening-stage call_completed +``` + +说明: +- `--resume-pdf` 可重复传入多次 +- `--resume-dir` 会扫描目录中的 PDF +- `--resume-manifest` 支持从清单文件批量读取简历路径 +- `--transcript` 支持 `txt`、`json`、`meta.json`、`summary.json` +- Agent 应先向调用者索要 JD、简历、转写文件路径,而不是要求文件先上传到 skill 目录 +- 多候选人模式下,`--transcript` 和 `--phone-source` 可不传;如需传入,数量需与简历数量一致,或只传 1 个供所有候选人复用 +- 每位候选人必须能解析出手机号,推荐方式是: + - 保证简历正文中包含可识别的手机号 + - `--phone-source` 可用于补充姓名或在通话后复评时核对候选人身份 + - ASR `source` 可用于通话阶段回查原始录音 + +### Step 3: AI 生成评估结论 + +Agent 读取 `bundle.json` 后,必须输出一个 `assessment.json`,初筛模式与复评模式都至少包括: +- 候选人既往项目经验 +- 技术能力总结 +- 学历水平 +- 工作年限是否符合 JD +- JD 匹配分 +- 候选人优势与劣势 +- 是否建议进入下一步电话沟通,或电话后是否建议推进 +- AI 判断结论 +- AI 判断依据 +- 若存在通话录音,则附通话转写原文全文,不能只保留摘要 + +字段规范见:`references/assessment-schema.md` + +### Step 4: 写入 CRM + +单候选人: + +```bash +python ./scripts/upsert_crm_profile.py \ + --profile-json ./output/assessment.json \ + --phone-source 13999999999-刘女士.mp3 +``` + +多候选人: + +```bash +python ./scripts/upsert_crm_profile.py \ + --profile-json ./output/assessment.json +``` + +批量模式下,脚本会优先从每位候选人的 `phone_source`、`candidate_hint.source_file`、`candidate_hint.phone` 中恢复候选人标识。 + +## 输出物 + +- `bundle.json`:原始抽取文本 + 候选人列表 + transcript + 候选人识别线索 +- `assessment.json`:AI 结构化评估结果,可为单候选人对象或候选人列表 +- CRM JSON:最终入库后的候选人画像 + +## 关键要求 + +- 结论必须区分“录音事实”和“AI 推断” +- `ai_match_evidence` 必须写明依据来自简历、JD、通话中的哪些信息 +- `transcript_text` 必须以全量原始转写文本入库,供后续审查 +- 初筛模式必须给出 `screening_decision`、`screening_reason`、`strengths_summary`、`weaknesses_summary` +- 通话后复评必须补充 `final_match_score`、`final_recommendation` +- `assessment.json` 中必须保留 `phone` 字段,保证后续 CRM 更新命中同一候选人 +- 学历、工作年限、项目经验等关键信息不明确时,写“待确认”而不是编造 diff --git a/skills/byted-arkclaw-jd-resume-match/checklist.md b/skills/byted-arkclaw-jd-resume-match/checklist.md new file mode 100644 index 00000000..2a7aec57 --- /dev/null +++ b/skills/byted-arkclaw-jd-resume-match/checklist.md @@ -0,0 +1,7 @@ +# byted-arkclaw-jd-resume-match 自检清单 + +- `SKILL.md` frontmatter 包含 `name` 和中文 `description` +- `scripts/` 下包含环境初始化、PDF抽取、分析包生成、CRM入库脚本 +- 支持输入 JD PDF、简历 PDF、ASR 转写结果 +- 输出 `bundle.json` 与 AI 生成的 `assessment.json` +- CRM 写入包含转写原文、项目经验、技术能力、学历、年限匹配、AI结论与依据 diff --git a/skills/byted-arkclaw-jd-resume-match/references/assessment-schema.md b/skills/byted-arkclaw-jd-resume-match/references/assessment-schema.md new file mode 100644 index 00000000..3085f12e --- /dev/null +++ b/skills/byted-arkclaw-jd-resume-match/references/assessment-schema.md @@ -0,0 +1,74 @@ +# 评估结果 Schema + +AI 在读取 `bundle.json` 后,应输出一个 `assessment.json`。 + +## 单候选人结构 + +推荐结构如下: + +```json +{ + "phone": "13999999999", + "candidate_name": "刘女士", + "email": "liu@example.com", + "is_qualified": true, + "screening_stage": "resume_screened", + "screening_decision": "建议沟通", + "screening_reason": "核心技术栈与 JD 高度相关,项目经验匹配度较高。", + "strengths_summary": "后端架构经验扎实,做过高并发与消息队列项目。", + "weaknesses_summary": "当前跳槽意愿一般,录音中表达了稳定诉求。", + "gender": "女", + "industry": "互联网", + "current_position": "高级后端工程师", + "years_of_exp": 8, + "job_switch_intent": "中", + "candidate_focus": "薪资、团队稳定性、远程办公", + "notes": "录音中表示当前工作稳定,但可长期保持联系。", + "transcript_text": "完整通话转写原文", + "project_experience": "负责支付网关、风控平台、微服务治理等项目。", + "technical_capability": "Java、Spring Cloud、MySQL、Kafka、云原生部署。", + "education_level": "本科", + "jd_years_match": "符合", + "jd_match_score": 78, + "final_match_score": 74, + "final_recommendation": "建议推进一面", + "ai_match_conclusion": "整体匹配度较高,技术栈与项目背景较贴近 JD。", + "ai_match_evidence": "简历显示 8 年后端经验,项目中使用微服务与消息队列;录音中候选人关注稳定性且暂无强烈跳槽意愿。", + "last_call_date": "2026-04-25" +} +``` + +## 多候选人批量结构 + +```json +{ + "candidates": [ + { + "phone_source": "13999999999-刘女士.mp3", + "candidate_name": "刘女士", + "screening_stage": "resume_screened", + "screening_decision": "建议沟通", + "screening_reason": "..." + } + ] +} +``` + +## 字段原则 + +- `phone` 是全局唯一 key,初筛建档时应来自简历文本抽取结果,通话后更新也必须使用同一个手机号 +- `email` 建议来自简历文本抽取结果,用于后续邮件邀约复试 +- `screening_stage` 推荐值:`resume_screened` / `call_pending` / `call_completed` / `final_reviewed` +- `screening_decision` 推荐值:`建议沟通` / `建议补充信息` / `建议淘汰` +- `screening_reason` 说明为何给出初筛结论 +- `strengths_summary` / `weaknesses_summary` 必须概括候选人优劣势 +- `transcript_text` 必须保留原始转写全文,供后续审查,不能写成摘要版 +- `project_experience` 总结既往项目经历,强调与 JD 相关的项目 +- `technical_capability` 总结技术栈、系统能力、工程能力 +- `education_level` 只写从简历或对话中明确得到的学历 +- `jd_years_match` 推荐值:`符合` / `部分符合` / `不符合` / `待确认` +- `jd_match_score` 建议为 `0-100` 整数 +- `final_match_score` 建议为电话沟通后的最终分,范围 `0-100` +- `final_recommendation` 推荐值:`建议推进` / `保留观察` / `不推荐推进` +- `ai_match_conclusion` 给出一句总结性判断 +- `ai_match_evidence` 必须写出依据来源,区分“简历信息”和“通话信息” diff --git a/skills/byted-arkclaw-jd-resume-match/scripts/env_init.sh b/skills/byted-arkclaw-jd-resume-match/scripts/env_init.sh new file mode 100755 index 00000000..a4cbfd19 --- /dev/null +++ b/skills/byted-arkclaw-jd-resume-match/scripts/env_init.sh @@ -0,0 +1,50 @@ +#!/bin/bash + +set -e + +if [ -n "${ZSH_VERSION:-}" ]; then + SCRIPT_PATH="${(%):-%N}" +else + SCRIPT_PATH="${BASH_SOURCE[0]}" +fi + +SCRIPT_DIR="$(cd "$(dirname "${SCRIPT_PATH}")" && pwd)" +SKILL_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" +ROOT_VENV="$(cd "${SKILL_ROOT}/.." && pwd)/.venv" + +reuse_venv() { + local path="$1" + if [ -d "$path" ] && [ -x "$path/bin/python" ]; then + source "$path/bin/activate" + if python - <<'PY' >/dev/null 2>&1 +import importlib.util +mods = ["pypdf", "fitz", "yaml"] +raise SystemExit(0 if all(importlib.util.find_spec(m) for m in mods) else 1) +PY + then + return 0 + fi + fi + return 1 +} + +if ! reuse_venv "$ROOT_VENV"; then + if [ ! -d "${SKILL_ROOT}/.venv" ]; then + python3 -m venv "${SKILL_ROOT}/.venv" + fi + source "${SKILL_ROOT}/.venv/bin/activate" + python -m pip install -U pip setuptools wheel + python -m pip install pypdf pymupdf pyyaml pillow +fi + +export JD_RESUME_MATCH_ROOT="$SKILL_ROOT" +mkdir -p "$SKILL_ROOT/output" + +echo "✅ byted-arkclaw-jd-resume-match 环境初始化完成" +echo "- skill root: $JD_RESUME_MATCH_ROOT" +echo "- python: $(command -v python)" +if command -v tesseract >/dev/null 2>&1; then + echo "- OCR fallback: tesseract 可用" +else + echo "- OCR fallback: 未检测到 tesseract,仅保证可提取可复制文本 PDF" +fi diff --git a/skills/byted-arkclaw-jd-resume-match/scripts/extract_pdf_text.py b/skills/byted-arkclaw-jd-resume-match/scripts/extract_pdf_text.py new file mode 100755 index 00000000..bb72d7c9 --- /dev/null +++ b/skills/byted-arkclaw-jd-resume-match/scripts/extract_pdf_text.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 + +from pathlib import Path +import sys + +SCRIPT_DIR = Path(__file__).resolve().parent +if str(SCRIPT_DIR) not in sys.path: + sys.path.insert(0, str(SCRIPT_DIR)) + +from jd_resume_match_runtime.pdf_extract import main + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/skills/byted-arkclaw-jd-resume-match/scripts/jd_resume_match_runtime/__init__.py b/skills/byted-arkclaw-jd-resume-match/scripts/jd_resume_match_runtime/__init__.py new file mode 100644 index 00000000..da6f400e --- /dev/null +++ b/skills/byted-arkclaw-jd-resume-match/scripts/jd_resume_match_runtime/__init__.py @@ -0,0 +1,4 @@ +from .pdf_extract import extract_pdf_text +from .transcript import load_transcript_payload + +__all__ = ["extract_pdf_text", "load_transcript_payload"] diff --git a/skills/byted-arkclaw-jd-resume-match/scripts/jd_resume_match_runtime/pdf_extract.py b/skills/byted-arkclaw-jd-resume-match/scripts/jd_resume_match_runtime/pdf_extract.py new file mode 100644 index 00000000..eac1041b --- /dev/null +++ b/skills/byted-arkclaw-jd-resume-match/scripts/jd_resume_match_runtime/pdf_extract.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +import json +import shutil +import subprocess +import tempfile +from pathlib import Path +from typing import Any + +from pypdf import PdfReader + + +def _direct_extract(pdf_path: Path) -> tuple[str, list[dict[str, Any]]]: + reader = PdfReader(str(pdf_path)) + pages = [] + texts = [] + for idx, page in enumerate(reader.pages, start=1): + text = (page.extract_text() or "").strip() + pages.append({"page": idx, "text_length": len(text)}) + if text: + texts.append(text) + return "\n\n".join(texts).strip(), pages + + +def _ocr_extract(pdf_path: Path) -> tuple[str, list[str]]: + if not shutil.which("tesseract"): + return "", ["未检测到 tesseract,无法执行 OCR 回退。"] + + try: + import fitz + except Exception: + return "", ["未安装 PyMuPDF,无法执行 OCR 回退。"] + + warnings = [] + texts = [] + with tempfile.TemporaryDirectory(prefix="jd-resume-ocr-") as tmp: + doc = fitz.open(str(pdf_path)) + for index, page in enumerate(doc, start=1): + image_path = Path(tmp) / f"page_{index}.png" + pix = page.get_pixmap(matrix=fitz.Matrix(2, 2), alpha=False) + pix.save(str(image_path)) + try: + result = subprocess.run( + ["tesseract", str(image_path), "stdout"], + capture_output=True, + text=True, + check=True, + ) + texts.append(result.stdout.strip()) + except subprocess.CalledProcessError as exc: + warnings.append(f"第 {index} 页 OCR 失败: {exc.stderr.strip() or exc}") + return "\n\n".join(t for t in texts if t).strip(), warnings + + +def extract_pdf_text(pdf_path: str | Path) -> dict[str, Any]: + path = Path(pdf_path).expanduser().resolve() + if not path.exists(): + raise FileNotFoundError(f"PDF 不存在: {path}") + + direct_text, pages = _direct_extract(path) + warnings: list[str] = [] + method = "direct" + text = direct_text + + if len(direct_text) < 120: + ocr_text, ocr_warnings = _ocr_extract(path) + warnings.extend(ocr_warnings) + if len(ocr_text) > len(direct_text): + text = ocr_text + method = "ocr" + + if not text: + warnings.append("未能从 PDF 中提取到有效文本。") + + return { + "path": str(path), + "file_name": path.name, + "method": method, + "page_count": len(pages), + "text_length": len(text), + "warnings": warnings, + "text": text, + } + + +def main() -> int: + import argparse + + parser = argparse.ArgumentParser(description="提取 PDF 文本,必要时回退 OCR") + parser.add_argument("pdf_path") + parser.add_argument("-o", "--output") + args = parser.parse_args() + + payload = extract_pdf_text(args.pdf_path) + if args.output: + Path(args.output).write_text( + json.dumps(payload, ensure_ascii=False, indent=2) + "\n", + encoding="utf-8", + ) + else: + print(json.dumps(payload, ensure_ascii=False, indent=2)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/skills/byted-arkclaw-jd-resume-match/scripts/jd_resume_match_runtime/transcript.py b/skills/byted-arkclaw-jd-resume-match/scripts/jd_resume_match_runtime/transcript.py new file mode 100644 index 00000000..0bb94524 --- /dev/null +++ b/skills/byted-arkclaw-jd-resume-match/scripts/jd_resume_match_runtime/transcript.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + + +def _read_text(path: Path) -> str: + return path.read_text(encoding="utf-8").strip() + + +def _segments_to_text(payload: Any) -> str: + if isinstance(payload, list): + parts = [] + for item in payload: + if isinstance(item, dict) and item.get("text"): + parts.append(str(item["text"]).strip()) + return "\n".join(part for part in parts if part).strip() + return "" + + +def load_transcript_payload(transcript_path: str | Path) -> dict[str, Any]: + path = Path(transcript_path).expanduser().resolve() + if not path.exists(): + raise FileNotFoundError(f"转写结果不存在: {path}") + + suffix = path.suffix.lower() + warnings: list[str] = [] + + if suffix == ".txt": + return { + "path": str(path), + "format": "txt", + "source": None, + "text": _read_text(path), + "warnings": warnings, + } + + if suffix == ".json": + payload = json.loads(_read_text(path) or "{}") + if isinstance(payload, dict) and payload.get("output_path"): + nested = load_transcript_payload(payload["output_path"]) + nested["source"] = payload.get("source") or nested.get("source") + return nested + + if isinstance(payload, dict) and payload.get("results"): + for item in payload["results"]: + if item.get("status") == "completed" and item.get("output_path"): + nested = load_transcript_payload(item["output_path"]) + nested["source"] = item.get("source") or nested.get("source") + return nested + warnings.append("summary.json 中未找到成功的 transcript 输出。") + + if isinstance(payload, dict) and payload.get("text"): + return { + "path": str(path), + "format": "json", + "source": payload.get("source"), + "text": str(payload["text"]).strip(), + "warnings": warnings, + } + + text = _segments_to_text(payload) + return { + "path": str(path), + "format": "json", + "source": payload.get("source") if isinstance(payload, dict) else None, + "text": text, + "warnings": warnings, + } + + return { + "path": str(path), + "format": suffix.lstrip("."), + "source": None, + "text": _read_text(path), + "warnings": warnings, + } diff --git a/skills/byted-arkclaw-jd-resume-match/scripts/prepare_match_bundle.py b/skills/byted-arkclaw-jd-resume-match/scripts/prepare_match_bundle.py new file mode 100755 index 00000000..bf945e63 --- /dev/null +++ b/skills/byted-arkclaw-jd-resume-match/scripts/prepare_match_bundle.py @@ -0,0 +1,256 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import argparse +import json +import re +import sys +from datetime import datetime +from pathlib import Path + +SCRIPT_DIR = Path(__file__).resolve().parent +if str(SCRIPT_DIR) not in sys.path: + sys.path.insert(0, str(SCRIPT_DIR)) + +from jd_resume_match_runtime import extract_pdf_text, load_transcript_payload + +PHONE_NAME_FILE_PATTERN = re.compile( + r"^(1[3-9]\d{9})(?:-([^/\\]+?))?(?:\.[^.]+)?$", + re.IGNORECASE, +) +PHONE_IN_TEXT_PATTERN = re.compile(r"(? dict[str, str]: + raw = (value or "").strip() + if not raw: + return {"phone": "", "candidate_name": ""} + name = Path(raw).name + match = PHONE_NAME_FILE_PATTERN.fullmatch(name) + if not match: + return {"phone": "", "candidate_name": ""} + return { + "phone": match.group(1) or "", + "candidate_name": (match.group(2) or "").strip(), + } + + +def parse_phone_from_resume_text(text: str) -> str: + match = PHONE_IN_TEXT_PATTERN.search(text or "") + if not match: + return "" + return "".join(match.groups()) + + +def parse_email_from_resume_text(text: str) -> str: + match = EMAIL_IN_TEXT_PATTERN.search(text or "") + if not match: + return "" + return match.group(0) + + +def resolve_candidate_identity( + resume_payload: dict, + transcript_payload: dict, + phone_source: str, +) -> tuple[str, dict[str, str]]: + resume_phone = parse_phone_from_resume_text(resume_payload.get("text", "")) + if resume_phone: + fallback_name = parse_phone_source(phone_source).get("candidate_name", "") + if not fallback_name: + fallback_name = parse_phone_source(transcript_payload.get("source") or "").get( + "candidate_name", "" + ) + if not fallback_name: + fallback_name = parse_phone_source(resume_payload.get("path", "")).get( + "candidate_name", "" + ) + return resume_payload.get("path", ""), { + "phone": resume_phone, + "candidate_name": fallback_name, + } + + identity_sources = [ + phone_source, + transcript_payload.get("source") or "", + resume_payload.get("path", ""), + ] + for source in identity_sources: + hint = parse_phone_source(source) + if hint.get("phone"): + return source, hint + raise ValueError( + f"候选人简历中缺少可解析的手机号:{resume_payload.get('path', '')}。" + "初筛建档必须从简历文本抽取结果中识别手机号,若抽取失败请检查简历 PDF 文本质量。" + ) + + +def load_manifest(path: str | Path) -> list[str]: + manifest = Path(path).expanduser().resolve() + base_dir = manifest.parent + results = [] + for raw_line in manifest.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#"): + continue + item = Path(line) + if not item.is_absolute(): + item = (base_dir / item).resolve() + results.append(str(item)) + return results + + +def discover_resumes(args: argparse.Namespace) -> list[str]: + resumes: list[str] = [] + for item in args.resume_pdf or []: + resumes.append(str(Path(item).expanduser().resolve())) + + if args.resume_manifest: + resumes.extend(load_manifest(args.resume_manifest)) + + if args.resume_dir: + resume_dir = Path(args.resume_dir).expanduser().resolve() + for file in sorted(resume_dir.glob("*.pdf")): + resumes.append(str(file.resolve())) + + deduped = [] + seen = set() + for item in resumes: + if item not in seen: + seen.add(item) + deduped.append(item) + return deduped + + +def normalize_optional_list(values: list[str] | None, total: int) -> list[str]: + items = list(values or []) + if not items: + return [""] * total + if len(items) == 1 and total > 1: + return items * total + if len(items) != total: + raise ValueError(f"可选参数数量不匹配,期望 1 或 {total},实际为 {len(items)}") + return items + + +def build_candidate_entry( + resume_pdf: str, + transcript_path: str, + phone_source: str, + screening_stage: str, +) -> dict: + resume_payload = extract_pdf_text(resume_pdf) + transcript_payload = ( + load_transcript_payload(transcript_path) if transcript_path else {"path": "", "format": "", "source": None, "text": "", "warnings": []} + ) + source_file, candidate_hint = resolve_candidate_identity( + resume_payload=resume_payload, + transcript_payload=transcript_payload, + phone_source=phone_source, + ) + + return { + "phone": candidate_hint["phone"], + "email": parse_email_from_resume_text(resume_payload.get("text", "")), + "resume": resume_payload, + "transcript": transcript_payload, + "candidate_hint": { + "source_file": source_file, + **candidate_hint, + }, + "screening_stage": screening_stage, + } + + +def main() -> int: + parser = argparse.ArgumentParser(description="整理 1 个 JD 与多位候选人的简历/转写结果为统一分析包") + parser.add_argument("--jd-pdf", required=True) + parser.add_argument("--resume-pdf", action="append", help="候选人简历 PDF,可重复传入多次") + parser.add_argument("--resume-dir", help="批量简历目录,扫描其中的 PDF") + parser.add_argument("--resume-manifest", help="简历清单文件,每行一个 PDF 路径") + parser.add_argument("--transcript", action="append", help="候选人对应的转写结果,可不传") + parser.add_argument("--phone-source", action="append", help="候选人电话或原始录音标识,可不传") + parser.add_argument("--screening-stage", default="resume_screened", help="本次分析阶段标记") + parser.add_argument("-o", "--output", required=True) + args = parser.parse_args() + + resumes = discover_resumes(args) + if not resumes: + raise SystemExit("❌ 至少需要提供一个候选人简历:--resume-pdf / --resume-dir / --resume-manifest") + + transcripts = normalize_optional_list(args.transcript, len(resumes)) + phone_sources = normalize_optional_list(args.phone_source, len(resumes)) + + jd_payload = extract_pdf_text(args.jd_pdf) + candidates = [] + for resume_pdf, transcript_path, phone_source in zip(resumes, transcripts, phone_sources): + candidates.append( + build_candidate_entry( + resume_pdf=resume_pdf, + transcript_path=transcript_path, + phone_source=phone_source, + screening_stage=args.screening_stage, + ) + ) + + bundle = { + "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "mode": "batch_candidates", + "jd": jd_payload, + "candidate_count": len(candidates), + "candidates": candidates, + "required_crm_fields": [ + "phone", + "email", + "candidate_name", + "screening_stage", + "screening_decision", + "screening_reason", + "strengths_summary", + "weaknesses_summary", + "transcript_text", + "project_experience", + "technical_capability", + "education_level", + "years_of_exp", + "jd_years_match", + "jd_match_score", + "final_match_score", + "final_recommendation", + "ai_match_conclusion", + "ai_match_evidence", + ], + } + + output_path = Path(args.output).expanduser().resolve() + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text( + json.dumps(bundle, ensure_ascii=False, indent=2) + "\n", + encoding="utf-8", + ) + print( + json.dumps( + { + "output": str(output_path), + "jd_text_length": jd_payload["text_length"], + "candidate_count": len(candidates), + "candidates": [ + { + "resume": item["resume"]["file_name"], + "candidate_hint": item["candidate_hint"], + "transcript_length": len(item["transcript"].get("text", "")), + } + for item in candidates + ], + }, + ensure_ascii=False, + indent=2, + ) + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/skills/byted-arkclaw-jd-resume-match/scripts/upsert_crm_profile.py b/skills/byted-arkclaw-jd-resume-match/scripts/upsert_crm_profile.py new file mode 100755 index 00000000..d112143d --- /dev/null +++ b/skills/byted-arkclaw-jd-resume-match/scripts/upsert_crm_profile.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import argparse +import importlib.util +import json +from pathlib import Path + +ALLOWED_FIELDS = { + "email", + "candidate_name", + "is_qualified", + "gender", + "industry", + "current_position", + "years_of_exp", + "job_switch_intent", + "candidate_focus", + "notes", + "transcript_text", + "project_experience", + "technical_capability", + "education_level", + "jd_years_match", + "jd_match_score", + "screening_stage", + "screening_decision", + "screening_reason", + "strengths_summary", + "weaknesses_summary", + "final_match_score", + "final_recommendation", + "ai_match_conclusion", + "ai_match_evidence", + "last_call_date", +} + + +def resolve_full_transcript(item: dict) -> str: + transcript = item.get("transcript") + if isinstance(transcript, dict): + text = transcript.get("text") + if text: + return str(text).strip() + text = item.get("transcript_text") + if text: + return str(text).strip() + return "" + + +def load_crm_module(): + skill_root = Path(__file__).resolve().parents[1] + crm_main = skill_root.parent / "byted-arkclaw-local-hr-crm" / "scripts" / "main.py" + if not crm_main.exists(): + raise FileNotFoundError(f"未找到 CRM 脚本: {crm_main}") + + spec = importlib.util.spec_from_file_location("byted_arkclaw_local_hr_crm_main", crm_main) + if spec is None or spec.loader is None: + raise RuntimeError("无法加载 CRM 模块") + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +def main() -> int: + parser = argparse.ArgumentParser(description="将 assessment.json 写入 CRM") + parser.add_argument("--profile-json", required=True) + parser.add_argument("--phone-source", default="") + args = parser.parse_args() + + payload = json.loads( + Path(args.profile_json).expanduser().resolve().read_text(encoding="utf-8") + ) + crm_module = load_crm_module() + + if isinstance(payload, dict) and isinstance(payload.get("crm_payload"), dict): + payload = payload["crm_payload"] + + if isinstance(payload, list): + candidates = payload + elif isinstance(payload, dict) and isinstance(payload.get("candidates"), list): + candidates = payload["candidates"] + else: + candidates = [payload] + + outputs = [] + for item in candidates: + if not isinstance(item, dict): + continue + phone_source = ( + args.phone_source + or item.get("phone") + or item.get("phone_source") + or item.get("source_file") + or (item.get("candidate_hint") or {}).get("source_file", "") + or (item.get("candidate_hint") or {}).get("phone", "") + ) + if not phone_source: + raise SystemExit("❌ 缺少候选人手机号,无法写入 CRM") + + fields = { + key: value for key, value in item.items() if key in ALLOWED_FIELDS and value is not None + } + full_transcript = resolve_full_transcript(item) + if full_transcript: + # Always persist the full original transcript, not an AI summary. + fields["transcript_text"] = full_transcript + outputs.append(crm_module.main("upsert", phone_source, **fields)) + + print("\n\n".join(outputs)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/skills/byted-arkclaw-local-batch-asr/.gitignore b/skills/byted-arkclaw-local-batch-asr/.gitignore new file mode 100644 index 00000000..bd9474bf --- /dev/null +++ b/skills/byted-arkclaw-local-batch-asr/.gitignore @@ -0,0 +1,5 @@ +.venv/ +output/ +__pycache__/ +*.pyc +.DS_Store diff --git a/skills/byted-arkclaw-local-batch-asr/SKILL.md b/skills/byted-arkclaw-local-batch-asr/SKILL.md new file mode 100644 index 00000000..4434d250 --- /dev/null +++ b/skills/byted-arkclaw-local-batch-asr/SKILL.md @@ -0,0 +1,179 @@ +--- +name: byted-arkclaw-local-batch-asr +version: "1.0.0" +description: "基于 FunASR 的本地批量语音转写技能。可对单个文件、整个目录或 manifest 中的音视频文件进行本地转文字,并导出多种文本格式。适用于需要隐私友好的本地 ASR、批量转写或替代远程语音识别流程时。" +--- + +# 本地批量语音转写(`byted-arkclaw-local-batch-asr`) + +基于本地 `FunASR + PyTorch` 运行批量音频/视频转文字流程,支持单文件、整个目录、或 manifest 文件列表输入,适合在 `arkclaw-hiring-workflow` 工作流中替代远程 `byted-las-asr-pro`。 + +## 输入与输出 + +### 输入 + +- 本地音频或视频文件 +- 或包含多文件路径的目录 / manifest +- 输入文件路径由调用者提供,支持绝对路径或相对路径;不要求先上传到当前 skill 的固定目录 + +### 输出 + +- 每个文件的 `transcript.` +- 每个文件的 `meta.json` +- 批次级 `summary.json`、`summary.csv`、`result.md` + +## 在总流程中的位置 + +- 通话后阶段:负责把真实电话录音转换为可审查的转写结果 +- 不负责做人岗匹配、优劣势判断或 CRM 决策,只负责稳定地产出转写文本与批处理结果 + +## 设计模式 + +本 skill 主要采用: +- **Tool Wrapper**:封装本地 Python 脚本调用 +- **Pipeline**:前置检查 -> 环境初始化 -> 本地批量转写 -> 汇总结果 +- **Local-first**:不依赖外部 ASR API,优先保护录音隐私 + +## 核心脚本与配置 + +- `scripts/env_init.sh`:初始化或复用本地 Python 虚拟环境,安装依赖并补齐 `ffmpeg` 入口 +- `scripts/check_format.sh`:本地容器格式预检查 +- `scripts/transcribe_batch.py`:批量转写主脚本 +- `scripts/generate_result.md.sh`:根据批处理结果目录生成 Markdown 摘要 +- `scripts/local_batch_asr_runtime/`:本地 ASR 运行时模块,包含模型加载、设备检测、格式输出 + +## 能力范围 + +- 支持输入:单文件、目录递归扫描、manifest 文本文件 +- 支持格式:`wav/mp3/m4a/flac/aac/mp4/avi/mkv/mov` +- 支持输出:`txt/json/srt/ass/md` +- 支持生成汇总:`summary.json`、`summary.csv`、`result.md` +- 支持最佳努力说话人分离:若当前模型/结果不支持,将自动回退为单说话人文本 +- 支持断点式批量处理:失败文件记录在汇总中,不阻断整体任务 + +## 工作流(严格按步骤执行) + +复制此清单并跟踪进度: + +```text +执行进度: +- [ ] Step 0: 前置检查 +- [ ] Step 1: 初始化环境 +- [ ] Step 2: 输入准备 +- [ ] Step 3: 本地批量转写 +- [ ] Step 4: 结果汇总 +- [ ] Step 5: 结果呈现 +``` + +### Step 0: 前置检查 + +1. 确认输入是本地可访问路径:单文件、目录、或 manifest 文件。 +2. 优先用 `scripts/check_format.sh` 检查文件扩展名。 +3. 若输入是目录,确认是否需要递归扫描,以及是否需要限制文件数。 +4. 若后续要导入 CRM,建议保留源文件名,方便从文件名提取手机号与姓名。 + +### Step 1: 初始化环境 + +```bash +source "$(dirname "$0")/scripts/env_init.sh" +workdir="$LOCAL_BATCH_ASR_WORKDIR" +``` + +脚本会: +- 在当前 skill 下创建并使用 `.venv` +- 安装 `funasr`、`modelscope`、`torch`、`torchaudio`、`imageio-ffmpeg`、`librosa` +- 自动创建 `ffmpeg` 可执行入口 + +### Step 2: 输入准备 + +#### 单文件 + +```bash +./scripts/check_format.sh +``` + +#### 目录批量 + +```bash +find -type f | sed 's#^#- #' +``` + +#### manifest 列表 + +`manifest.txt` 每行一个由调用者提供的绝对路径或相对路径: + +```text +./calls/a.wav +./calls/b.mp3 +./calls/call.mp4 +``` + +### Step 3: 本地批量转写 + +#### 单文件 + +```bash +source ./scripts/env_init.sh +python ./scripts/transcribe_batch.py -f txt +``` + +#### 整个目录 + +```bash +source ./scripts/env_init.sh +python ./scripts/transcribe_batch.py --recursive -f txt -o ./output/run_001 +``` + +#### manifest 批量 + +```bash +source ./scripts/env_init.sh +python ./scripts/transcribe_batch.py --manifest -f json -o ./output/run_manifest +``` + +### Step 4: 结果汇总 + +```bash +./scripts/generate_result.md.sh ./output/run_001 > ./output/run_001/result.md +``` + +输出目录结构: + +```text +./output/run_001/ +├── summary.json +├── summary.csv +├── result.md +└── files/ + ├── / + │ ├── transcript.txt + │ └── meta.json +``` + +### Step 5: 结果呈现 + +向用户展示: +1. 成功/失败文件数 +2. 输出目录路径 +3. `summary.csv` 和 `summary.json` 路径 +4. 一段文本预览 +5. 如果需要,可继续把结果导入 `byted-arkclaw-local-hr-crm` + +## Gotchas + +- 首次运行会下载模型,耗时较长且占用较大磁盘空间。 +- 本地 `FunASR` 的说话人分离能力依赖模型与时间戳支持,当前实现采用“最佳努力 + 自动回退”。 +- 如果只需要稳定文本,建议默认输出 `txt` 或 `json`。 +- 若没有系统 `ffmpeg`,脚本会通过 `imageio-ffmpeg` 提供本地二进制入口。 + +## 参考资料 + +- `references/output-formats.md`:输出结构与汇总文件说明 + +## 审查标准 + +执行完成后,Agent 应自检: +1. `scripts/env_init.sh` 能正常初始化环境 +2. `scripts/transcribe_batch.py` 能处理单文件和目录输入 +3. 结果目录包含 `summary.json` / `summary.csv` +4. skill 目录中不提交 `.venv`、`output`、`__pycache__` 等生成物 diff --git a/skills/byted-arkclaw-local-batch-asr/checklist.md b/skills/byted-arkclaw-local-batch-asr/checklist.md new file mode 100644 index 00000000..60e4694f --- /dev/null +++ b/skills/byted-arkclaw-local-batch-asr/checklist.md @@ -0,0 +1,11 @@ +# byted-arkclaw-local-batch-asr 自检清单(Skill Hub) + +- `SKILL.md` 顶部包含 YAML frontmatter(`name` / `version` / `description`) +- `description` 明确说明是本地批量 ASR,并指出适用场景与触发时机 +- `scripts/` 下至少包含 `env_init.sh`、`check_format.sh`、`transcribe_batch.py`、`generate_result.md.sh` +- 所有执行相关代码均集中在 `scripts/` 下,避免在 skill 根目录放独立运行时代码目录 +- `SKILL.md` 中明确体现 skill 的目录约定:`SKILL.md` 为入口,`scripts/` 放代码,`references/` 放资料 +- 文档中明确说明与 `byted-las-asr-pro` 的替换关系与能力差异 +- 支持单文件、目录、manifest 三种输入方式 +- 支持生成 `summary.json` 和 `summary.csv` 两种汇总结果 +- skill 目录内不提交 `.venv/`、`output/`、`__pycache__/`、`.DS_Store` 等生成物 diff --git a/skills/byted-arkclaw-local-batch-asr/references/output-formats.md b/skills/byted-arkclaw-local-batch-asr/references/output-formats.md new file mode 100644 index 00000000..12ec8ec1 --- /dev/null +++ b/skills/byted-arkclaw-local-batch-asr/references/output-formats.md @@ -0,0 +1,49 @@ +# byted-arkclaw-local-batch-asr 输出说明 + +## 单文件输出 + +每个输入文件都会生成一个独立目录: + +```text +output//files// +├── transcript. +└── meta.json +``` + +## `meta.json` 字段 + +```json +{ + "source": "/abs/path/to/audio.mp3", + "status": "completed", + "format": "txt", + "output_path": "/abs/path/to/transcript.txt", + "speaker_count": 1, + "segments": 1, + "error": null +} +``` + +## 批量汇总输出 + +### `summary.json` + +- 包含本次运行的配置、成功/失败数、每个文件的处理结果 + +### `summary.csv` + +字段: +- `source` +- `status` +- `format` +- `output_path` +- `speaker_count` +- `segments` +- `error` + +## 推荐格式 + +- `txt`:最适合 CRM 入库前人工查看 +- `json`:最适合后续结构化处理 +- `srt`:适合视频字幕 +- `md`:适合形成面试/通话纪要 diff --git a/skills/byted-arkclaw-local-batch-asr/scripts/check_format.sh b/skills/byted-arkclaw-local-batch-asr/scripts/check_format.sh new file mode 100755 index 00000000..e1750ef9 --- /dev/null +++ b/skills/byted-arkclaw-local-batch-asr/scripts/check_format.sh @@ -0,0 +1,23 @@ +#!/bin/bash +# ============================================================================== +# 音频/视频容器格式预检查 +# Usage: scripts/check_format.sh +# ============================================================================== + +FILE_PATH="$1" +if [ -z "$FILE_PATH" ]; then + echo "❌ 错误: 请提供文件路径" + exit 1 +fi + +EXT=$(echo "$FILE_PATH" | awk -F. '{print tolower($NF)}') +ALLOWED_FORMATS="wav mp3 m4a flac aac mp4 avi mkv mov ogg" + +if [[ " $ALLOWED_FORMATS " =~ " $EXT " ]]; then + echo "✅ 格式检查通过: $EXT" + exit 0 +fi + +echo "⚠️ 警告: 文件扩展名 '$EXT' 不在推荐列表中" +echo " 推荐格式: $ALLOWED_FORMATS" +exit 1 diff --git a/skills/byted-arkclaw-local-batch-asr/scripts/env_init.sh b/skills/byted-arkclaw-local-batch-asr/scripts/env_init.sh new file mode 100755 index 00000000..3ef441ab --- /dev/null +++ b/skills/byted-arkclaw-local-batch-asr/scripts/env_init.sh @@ -0,0 +1,37 @@ +#!/bin/bash + +# ============================================================================== +# byted-arkclaw-local-batch-asr 环境初始化脚本 +# Usage: source scripts/env_init.sh +# ============================================================================== + +set -e + +if [ -n "${ZSH_VERSION:-}" ]; then + SCRIPT_PATH="${(%):-%N}" +else + SCRIPT_PATH="${BASH_SOURCE[0]}" +fi + +SCRIPT_DIR="$(cd "$(dirname "${SCRIPT_PATH}")" && pwd)" +SKILL_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" + +if [ ! -d "${SKILL_ROOT}/.venv" ]; then + python3 -m venv "${SKILL_ROOT}/.venv" +fi + +source "${SKILL_ROOT}/.venv/bin/activate" +python -m pip install -U pip setuptools wheel +python -m pip install funasr modelscope imageio-ffmpeg librosa torch torchaudio + +FFMPEG_PATH=$(python -c 'import imageio_ffmpeg; print(imageio_ffmpeg.get_ffmpeg_exe())') +ln -sf "$FFMPEG_PATH" "$(dirname "$(command -v python)")/ffmpeg" + +export LOCAL_BATCH_ASR_ROOT="$SKILL_ROOT" +export LOCAL_BATCH_ASR_WORKDIR="${SKILL_ROOT}/output/$(date +%Y%m%d_%H%M%S)" +mkdir -p "$LOCAL_BATCH_ASR_WORKDIR" + +echo "✅ byted-arkclaw-local-batch-asr 环境初始化完成" +echo "- skill root: $LOCAL_BATCH_ASR_ROOT" +echo "- workdir: $LOCAL_BATCH_ASR_WORKDIR" +echo "- python: $(command -v python)" diff --git a/skills/byted-arkclaw-local-batch-asr/scripts/generate_result.md.sh b/skills/byted-arkclaw-local-batch-asr/scripts/generate_result.md.sh new file mode 100755 index 00000000..a8d0a0b0 --- /dev/null +++ b/skills/byted-arkclaw-local-batch-asr/scripts/generate_result.md.sh @@ -0,0 +1,62 @@ +#!/bin/bash +# ============================================================================== +# 根据批处理目录生成 markdown 汇总 +# Usage: scripts/generate_result.md.sh +# ============================================================================== + +RUN_DIR="$1" +if [ -z "$RUN_DIR" ]; then + echo "❌ 错误: 请提供 run_dir" + exit 1 +fi + +SUMMARY_JSON="$RUN_DIR/summary.json" +SUMMARY_CSV="$RUN_DIR/summary.csv" + +if [ ! -f "$SUMMARY_JSON" ]; then + echo "❌ 错误: 未找到 $SUMMARY_JSON" + exit 1 +fi + +SUCCESS_COUNT=$(python3 - < str: + try: + import torch + + if torch.cuda.is_available(): + return "cuda" + + if ( + platform.system() == "Darwin" + and hasattr(torch.backends, "mps") + and torch.backends.mps.is_available() + ): + return "mps" + except Exception: + pass + + return "cpu" diff --git a/skills/byted-arkclaw-local-batch-asr/scripts/local_batch_asr_runtime/transcriber.py b/skills/byted-arkclaw-local-batch-asr/scripts/local_batch_asr_runtime/transcriber.py new file mode 100644 index 00000000..e46df4be --- /dev/null +++ b/skills/byted-arkclaw-local-batch-asr/scripts/local_batch_asr_runtime/transcriber.py @@ -0,0 +1,361 @@ +import json +import re +import subprocess +import tempfile +from collections import OrderedDict +from pathlib import Path +from typing import Any + +import imageio_ffmpeg + +from .core.device import get_device_with_fallback + +SUPPORTED_FORMATS = ("mp3", "wav", "m4a", "flac", "aac", "ogg") +SUPPORTED_VIDEO_FORMATS = ("mp4", "avi", "mkv", "mov") +OUTPUT_FORMATS = ("txt", "json", "srt", "ass", "md") + +_MODEL_CACHE: dict[tuple[str, bool], Any] = {} + + +def _format_seconds(seconds: float, for_srt: bool = False, for_ass: bool = False) -> str: + total_ms = max(0, int(round(seconds * 1000))) + hours = total_ms // 3_600_000 + minutes = (total_ms % 3_600_000) // 60_000 + secs = (total_ms % 60_000) // 1000 + ms = total_ms % 1000 + + if for_srt: + return f"{hours:02d}:{minutes:02d}:{secs:02d},{ms:03d}" + + if for_ass: + centiseconds = ms // 10 + return f"{hours}:{minutes:02d}:{secs:02d}.{centiseconds:02d}" + + return f"{hours:02d}:{minutes:02d}:{secs:02d}.{ms:03d}" + + +def _normalize_time(value: Any) -> float: + if value is None: + return 0.0 + if isinstance(value, (list, tuple)) and value: + value = value[0] + try: + numeric = float(value) + except (TypeError, ValueError): + return 0.0 + return numeric / 1000.0 if numeric > 1000 else numeric + + +def _speaker_label(raw_value: Any) -> str: + if raw_value in (None, "", -1): + return "Speaker A" + if isinstance(raw_value, str): + value = raw_value.strip() + if value.lower().startswith("speaker "): + return value + if value.lower().startswith("spk"): + suffix = value.split("-", 1)[-1].split("_", 1)[-1] + return f"Speaker {suffix.upper()}" + return value + if isinstance(raw_value, (int, float)): + return f"Speaker {chr(ord('A') + int(raw_value))}" + return "Speaker A" + + +def _clean_text(text: str) -> str: + cleaned = re.sub(r"<\s*\|.*?\|\s*>", " ", text) + cleaned = re.sub(r"\s+", " ", cleaned) + return cleaned.strip() + + +def _extract_time_pair(sentence: Any) -> tuple[float, float]: + if not isinstance(sentence, dict): + return 0.0, 0.0 + start_raw = sentence.get("start") + if start_raw is None: + start_raw = sentence.get("start_time") + end_raw = sentence.get("end") + if end_raw is None: + end_raw = sentence.get("end_time") + timestamp = sentence.get("timestamp") + if (start_raw is None or end_raw is None) and isinstance(timestamp, (list, tuple)) and timestamp: + start_raw = timestamp[0] + end_raw = timestamp[-1] + return _normalize_time(start_raw), _normalize_time(end_raw) + + +def _prepare_wav(input_path: Path, tmp_dir: Path) -> Path: + output_path = tmp_dir / "prepared.wav" + ffmpeg_path = imageio_ffmpeg.get_ffmpeg_exe() + cmd = [ + ffmpeg_path, + "-y", + "-i", + str(input_path), + "-vn", + "-ac", + "1", + "-ar", + "16000", + str(output_path), + ] + subprocess.run(cmd, check=True, capture_output=True) + return output_path + + +def _load_model(diarize: bool): + key = (get_device_with_fallback(), diarize) + if key in _MODEL_CACHE: + return _MODEL_CACHE[key] + + from funasr import AutoModel + + kwargs = { + "model": "iic/SenseVoiceSmall", + "vad_model": "fsmn-vad", + "punc_model": "ct-punc", + "device": key[0], + "disable_update": True, + } + if diarize: + kwargs["spk_model"] = "cam++" + + model = AutoModel(**kwargs) + _MODEL_CACHE[key] = model + return model + + +def _normalize_segments(raw_result: Any, diarize: bool) -> list[dict[str, Any]]: + if isinstance(raw_result, list) and raw_result: + item = raw_result[0] + elif isinstance(raw_result, dict): + item = raw_result + else: + item = {} + + sentence_info = item.get("sentence_info") or item.get("sentence_infos") or [] + segments: list[dict[str, Any]] = [] + for sentence in sentence_info: + start, end = _extract_time_pair(sentence) + text = _clean_text((sentence.get("text") or "").strip()) + speaker = _speaker_label( + sentence.get("speaker") + or sentence.get("speaker_id") + or sentence.get("spk") + or sentence.get("spkid") + ) + if text: + segments.append( + { + "text": text, + "start": start, + "end": max(end, start), + "speaker_id": speaker if diarize else "Speaker A", + "confidence": sentence.get("confidence"), + "is_overlap": bool(sentence.get("is_overlap", False)), + "words": sentence.get("words", []), + } + ) + + if segments: + return segments + + text = _clean_text((item.get("text") or "").strip()) + if text: + return [ + { + "text": text, + "start": 0.0, + "end": 0.0, + "speaker_id": "Speaker A", + "confidence": item.get("confidence"), + "is_overlap": False, + "words": item.get("words", []), + } + ] + + raise RuntimeError("ASR did not return any transcript text.") + + +def _write_txt(segments: list[dict[str, Any]], output_path: Path) -> None: + lines = [] + for segment in segments: + overlap = "[OVERLAP] " if segment["is_overlap"] else "" + lines.append(f"{overlap}[{_format_seconds(segment['start'])}] {segment['speaker_id']}: {segment['text']}") + output_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + + +def _write_json(segments: list[dict[str, Any]], output_path: Path) -> None: + payload = [] + for segment in segments: + payload.append( + { + "text": segment["text"], + "start": int(round(segment["start"] * 1000)), + "end": int(round(segment["end"] * 1000)), + "confidence": segment["confidence"], + "speaker_id": segment["speaker_id"], + "is_overlap": segment["is_overlap"], + "words": segment["words"], + } + ) + output_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") + + +def _write_srt(segments: list[dict[str, Any]], output_path: Path) -> None: + blocks = [] + for idx, segment in enumerate(segments, start=1): + end = segment["end"] if segment["end"] > segment["start"] else segment["start"] + 2 + blocks.append( + "\n".join( + [ + str(idx), + f"{_format_seconds(segment['start'], for_srt=True)} --> {_format_seconds(end, for_srt=True)}", + f"[{segment['speaker_id']}] {segment['text']}", + ] + ) + ) + output_path.write_text("\n\n".join(blocks) + "\n", encoding="utf-8") + + +def _write_ass(segments: list[dict[str, Any]], output_path: Path) -> None: + speakers = list(OrderedDict((seg["speaker_id"], None) for seg in segments).keys()) + colors = ["&H00FFFF", "&H00FFFF00", "&H00FF00FF", "&H0000FF00", "&H0000A5FF"] + styles = [ + "Style: Default,Arial,16,&H00FFFFFF,&H000000FF,&H00000000,&H00000000,0,0,0,0,100,100,0,0,1,2,2,2,10,10,10,1" + ] + style_map = {"Default": "Default"} + for idx, speaker in enumerate(speakers): + style_name = speaker.replace(" ", "") + style_map[speaker] = style_name + styles.append( + "Style: " + f"{style_name},Arial,16,{colors[idx % len(colors)]},&H000000FF,&H00000000,&H00000000,0,0,0,0,100,100,0,0,1,2,2,2,10,10,10,1" + ) + + dialogues = [] + for segment in segments: + end = segment["end"] if segment["end"] > segment["start"] else segment["start"] + 2 + text = segment["text"].replace("\\", "\\\\").replace("\n", "\\N") + dialogues.append( + "Dialogue: 0," + f"{_format_seconds(segment['start'], for_ass=True)}," + f"{_format_seconds(end, for_ass=True)}," + f"{style_map[segment['speaker_id']]},,0,0,0,,{text}" + ) + + content = "\n".join( + [ + "[Script Info]", + "Title: Transcription", + "ScriptType: v4.00+", + "PlayResX: 1920", + "PlayResY: 1080", + "", + "[V4+ Styles]", + "Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding", + *styles, + "", + "[Events]", + "Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text", + *dialogues, + "", + ] + ) + output_path.write_text(content, encoding="utf-8") + + +def _write_md(segments: list[dict[str, Any]], output_path: Path) -> None: + grouped: OrderedDict[str, list[dict[str, Any]]] = OrderedDict() + for segment in segments: + grouped.setdefault(segment["speaker_id"], []).append(segment) + + lines = [] + for speaker, items in grouped.items(): + lines.append(f"## {speaker}") + lines.append("") + for item in items: + prefix = "[OVERLAP] " if item["is_overlap"] else "" + lines.append(f"- [{_format_seconds(item['start'])[:8]}] {prefix}{item['text']}") + lines.append("") + output_path.write_text("\n".join(lines).rstrip() + "\n", encoding="utf-8") + + +def _write_output(segments: list[dict[str, Any]], output_path: Path, format_name: str) -> None: + writers = { + "txt": _write_txt, + "json": _write_json, + "srt": _write_srt, + "ass": _write_ass, + "md": _write_md, + } + writers[format_name](segments, output_path) + + +def _run_generate(model: Any, prepared_audio: Path, with_timestamps: bool) -> Any: + kwargs = { + "input": str(prepared_audio), + "batch_size_s": 60, + "merge_vad": True, + } + if with_timestamps: + kwargs["sentence_timestamp"] = True + return model.generate(**kwargs) + + +def transcribe( + input_file: str | Path, + output_dir: str | Path | None = None, + format: str = "txt", + diarize: bool = True, + progress_callback=None, +) -> dict[str, Any]: + input_path = Path(input_file).expanduser().resolve() + if not input_path.exists(): + raise FileNotFoundError(f"File not found: {input_path}") + if format not in OUTPUT_FORMATS: + raise ValueError(f"Unsupported output format: {format}") + + suffix = input_path.suffix.lower().lstrip(".") + if suffix not in SUPPORTED_FORMATS and suffix not in SUPPORTED_VIDEO_FORMATS: + raise ValueError(f"Unsupported media format: {input_path.suffix}") + + out_dir = Path(output_dir).expanduser().resolve() if output_dir else input_path.parent + out_dir.mkdir(parents=True, exist_ok=True) + output_path = out_dir / f"transcript.{format}" + + if progress_callback: + progress_callback(5, 100) + + with tempfile.TemporaryDirectory(prefix="byted-arkclaw-local-batch-asr-") as tmp: + prepared_audio = _prepare_wav(input_path, Path(tmp)) + if progress_callback: + progress_callback(20, 100) + + model = _load_model(diarize=diarize) + try: + raw_result = _run_generate(model, prepared_audio, with_timestamps=diarize) + except Exception: + if not diarize: + raise + model = _load_model(diarize=False) + raw_result = _run_generate(model, prepared_audio, with_timestamps=False) + diarize = False + + if progress_callback: + progress_callback(80, 100) + + segments = _normalize_segments(raw_result, diarize=diarize) + _write_output(segments, output_path, format) + + if progress_callback: + progress_callback(100, 100) + + speakers = list(OrderedDict((seg["speaker_id"], None) for seg in segments).keys()) + return { + "text": "\n".join(seg["text"] for seg in segments), + "output_path": str(output_path), + "segments": segments, + "speakers": speakers, + "diarization_enabled": diarize, + } diff --git a/skills/byted-arkclaw-local-batch-asr/scripts/transcribe_batch.py b/skills/byted-arkclaw-local-batch-asr/scripts/transcribe_batch.py new file mode 100755 index 00000000..34689ede --- /dev/null +++ b/skills/byted-arkclaw-local-batch-asr/scripts/transcribe_batch.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +"""Local batch transcription runner for byted-arkclaw-local-batch-asr.""" + +from __future__ import annotations + +import argparse +import csv +import json +import sys +from pathlib import Path + +SCRIPT_DIR = Path(__file__).resolve().parent +SKILL_ROOT = SCRIPT_DIR.parent +if str(SCRIPT_DIR) not in sys.path: + sys.path.insert(0, str(SCRIPT_DIR)) + +from local_batch_asr_runtime import SUPPORTED_FORMATS, SUPPORTED_VIDEO_FORMATS, transcribe + +MEDIA_SUFFIXES = set(SUPPORTED_FORMATS) | set(SUPPORTED_VIDEO_FORMATS) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Batch transcribe local audio/video files with FunASR." + ) + parser.add_argument("input_path", nargs="?", help="Single file or directory to process") + parser.add_argument("-o", "--output-dir", help="Output run directory") + parser.add_argument("-f", "--format", choices=["txt", "json", "srt", "ass", "md"], default="txt") + parser.add_argument("--manifest", help="Text file with one input path per line") + parser.add_argument("--recursive", action="store_true", help="Recursively scan directories") + parser.add_argument("--pattern", default="*", help="Filename glob when scanning directories") + parser.add_argument("--limit", type=int, default=0, help="Stop after N matched files (0 = unlimited)") + parser.add_argument("--no-diarize", action="store_true", help="Disable speaker diarization attempt") + parser.add_argument("--continue-on-error", action="store_true", help="Continue even if one file fails") + return parser.parse_args() + + +def discover_inputs(args: argparse.Namespace) -> list[Path]: + if args.manifest: + manifest_path = Path(args.manifest).expanduser().resolve() + base_dir = manifest_path.parent + results = [] + for raw_line in manifest_path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#"): + continue + candidate = Path(line) + if not candidate.is_absolute(): + candidate = (base_dir / candidate).resolve() + results.append(candidate) + return results + + if not args.input_path: + raise ValueError("input_path and --manifest cannot both be empty") + + input_path = Path(args.input_path).expanduser().resolve() + if input_path.is_file(): + return [input_path] + + if not input_path.is_dir(): + raise FileNotFoundError(f"Input path not found: {input_path}") + + iterator = input_path.rglob(args.pattern) if args.recursive else input_path.glob(args.pattern) + results = [] + for item in iterator: + if item.is_file() and item.suffix.lower().lstrip(".") in MEDIA_SUFFIXES: + results.append(item.resolve()) + if args.limit and len(results) >= args.limit: + break + return sorted(results) + + +def safe_stem(path: Path) -> str: + return "".join(ch if ch.isalnum() or ch in ("-", "_") else "_" for ch in path.stem) or "file" + + +def write_summary(run_dir: Path, results: list[dict]) -> None: + summary = { + "run_dir": str(run_dir), + "success_count": sum(1 for item in results if item["status"] == "completed"), + "failure_count": sum(1 for item in results if item["status"] != "completed"), + "results": results, + } + (run_dir / "summary.json").write_text( + json.dumps(summary, indent=2, ensure_ascii=False) + "\n", + encoding="utf-8", + ) + + with (run_dir / "summary.csv").open("w", encoding="utf-8", newline="") as fh: + writer = csv.DictWriter( + fh, + fieldnames=["source", "status", "format", "output_path", "speaker_count", "segments", "error"], + ) + writer.writeheader() + for item in results: + writer.writerow( + { + "source": item.get("source", ""), + "status": item.get("status", ""), + "format": item.get("format", ""), + "output_path": item.get("output_path", ""), + "speaker_count": item.get("speaker_count", 0), + "segments": item.get("segments", 0), + "error": item.get("error", ""), + } + ) + + +def main() -> int: + args = parse_args() + inputs = discover_inputs(args) + if not inputs: + print(json.dumps({"error": "No supported media files found."}, ensure_ascii=False)) + return 1 + + run_dir = Path(args.output_dir).expanduser().resolve() if args.output_dir else (SKILL_ROOT / "output" / "run_latest") + files_dir = run_dir / "files" + files_dir.mkdir(parents=True, exist_ok=True) + + results: list[dict] = [] + exit_code = 0 + total = len(inputs) + + for index, input_path in enumerate(inputs, start=1): + print(f"[{index}/{total}] Processing {input_path}") + file_dir = files_dir / safe_stem(input_path) + file_dir.mkdir(parents=True, exist_ok=True) + + try: + result = transcribe( + input_path, + output_dir=file_dir, + format=args.format, + diarize=not args.no_diarize, + ) + record = { + "source": str(input_path), + "status": "completed", + "format": args.format, + "output_path": result["output_path"], + "speaker_count": len(result.get("speakers", [])), + "segments": len(result.get("segments", [])), + "error": None, + } + except Exception as exc: + exit_code = 1 + record = { + "source": str(input_path), + "status": "failed", + "format": args.format, + "output_path": None, + "speaker_count": 0, + "segments": 0, + "error": str(exc), + } + if not args.continue_on_error: + results.append(record) + (file_dir / "meta.json").write_text( + json.dumps(record, indent=2, ensure_ascii=False) + "\n", + encoding="utf-8", + ) + write_summary(run_dir, results) + print(json.dumps({"error": str(exc), "source": str(input_path)}, ensure_ascii=False)) + return exit_code + + results.append(record) + (file_dir / "meta.json").write_text( + json.dumps(record, indent=2, ensure_ascii=False) + "\n", + encoding="utf-8", + ) + + write_summary(run_dir, results) + print( + json.dumps( + { + "run_dir": str(run_dir), + "success_count": sum(1 for item in results if item["status"] == "completed"), + "failure_count": sum(1 for item in results if item["status"] != "completed"), + "summary_json": str(run_dir / "summary.json"), + "summary_csv": str(run_dir / "summary.csv"), + }, + ensure_ascii=False, + indent=2, + ) + ) + return exit_code + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/skills/byted-arkclaw-local-hr-crm/.gitignore b/skills/byted-arkclaw-local-hr-crm/.gitignore new file mode 100644 index 00000000..ecaeade1 --- /dev/null +++ b/skills/byted-arkclaw-local-hr-crm/.gitignore @@ -0,0 +1,4 @@ +__pycache__/ +crm_data.json +backups/ +*.pyc diff --git a/skills/byted-arkclaw-local-hr-crm/SKILL.md b/skills/byted-arkclaw-local-hr-crm/SKILL.md new file mode 100644 index 00000000..a033fd2b --- /dev/null +++ b/skills/byted-arkclaw-local-hr-crm/SKILL.md @@ -0,0 +1,251 @@ +--- +name: byted-arkclaw-local-hr-crm +displayName: 候选人CRM数据库 +description: 基于 JSON 文件的候选人 CRM 工具,支持初筛建档、通话后更新、查询和导出。适用于保存简历画像、ASR 转写原文、匹配结论及其依据。 +version: "1.0.0" +category: 数据管理/CRM +author: 系统 +icon: 📋 +parameters: + - name: action + type: string + required: true + description: "操作类型:upsert(新增或更新候选人)/query(查询候选人)/list(列出全部候选人)/export(导出Markdown报表)" + - name: phone + type: string + required: false + description: "候选人电话号码(upsert和query时必填),兼容 `13812341234`、`13812341234-刘`、`13812341234-刘.mp3`" + - name: candidate_name + type: string + required: false + description: "候选人姓名,未显式传入时可从文件名中解析" + - name: email + type: string + required: false + description: "候选人邮箱,优先从简历文本中抽取,用于后续邮件邀约" + - name: is_qualified + type: bool + required: false + description: "是否为有效候选人" + - name: gender + type: string + required: false + description: "候选人性别:男/女/未知" + - name: industry + type: string + required: false + description: "所在行业,如:互联网、金融、制造业、教育" + - name: current_position + type: string + required: false + description: "当前职位,如:高级Java工程师、产品总监" + - name: years_of_exp + type: int + required: false + description: "工作年限" + - name: job_switch_intent + type: string + required: false + description: "跳槽意向:高/中/低" + - name: candidate_focus + type: string + required: false + description: "候选人关注重点,如:薪资涨幅、晋升空间、工作地点、远程办公" + - name: notes + type: string + required: false + description: "备注信息" + - name: transcript_text + type: string + required: false + description: "音频转写原文,入库后用于后续审查与复核" + - name: project_experience + type: string + required: false + description: "候选人既往项目经验总结" + - name: technical_capability + type: string + required: false + description: "候选人技术能力总结" + - name: education_level + type: string + required: false + description: "学历水平,如本科/硕士/博士" + - name: jd_years_match + type: string + required: false + description: "工作年限是否符合JD要求,如符合/部分符合/不符合/待确认" + - name: jd_match_score + type: int + required: false + description: "JD匹配分,范围0-100" + - name: ai_match_conclusion + type: string + required: false + description: "AI 对候选人与 JD 匹配度的结论" + - name: ai_match_evidence + type: string + required: false + description: "AI 判断依据,需写明来自简历、JD、录音的哪些信息" +--- +# 候选人CRM数据库 Skill + +## 功能说明 +基于本地 JSON 文件实现轻量级候选人关系管理,支持候选人档案的增删改查与报表导出。以电话号码为唯一 key,适用于招聘猎头场景下的候选人画像存储与检索;支持保存邮箱,供后续邮件邀约复试使用,并可把音频转写原文、项目经验、技术能力、学历、初筛结论、最终结论与依据一并写入数据库供后续审查。 + +## 输入与输出 + +### 输入 + +- 初筛建档阶段: + - 候选人标识:`phone`,且它是全局唯一 key + - 简历侧字段:`email`、`project_experience`、`technical_capability`、`education_level`、`years_of_exp` + - 初筛判断字段:`screening_stage`、`screening_decision`、`screening_reason`、`strengths_summary`、`weaknesses_summary`、`jd_match_score` +- 通话后更新阶段: + - 必须复用同一个 `phone` + - 录音转写字段:`transcript_text` + - 通话补充字段:`job_switch_intent`、`candidate_focus`、`notes` + - 最终判断字段:`final_match_score`、`final_recommendation`、`ai_match_conclusion`、`ai_match_evidence` + +### 输出 + +- `upsert`:返回单个候选人的最新画像摘要 +- `query`:返回候选人的完整画像与转写原文 +- `list`:返回候选人列表概览 +- `export`:返回招聘者可读的 Markdown 报表 + +## 在总流程中的位置 + +- 初筛阶段:接收 `byted-arkclaw-jd-resume-match` 产出的结构化画像,完成候选人初步建档 +- 通话后阶段:接收 `byted-arkclaw-local-batch-asr` 与 AI 复评结果,对同一候选人做增量更新 +- 无论是初步建档还是后续补录,CRM 都只以电话号码为唯一 key,不允许用文件名或姓名替代唯一标识 + +## 设计模式 +本 skill 主要采用: +- **Tool Wrapper**:封装 Python 脚本调用 +- **数据持久化**:本地 JSON 文件存储 + +## 核心脚本 +所有功能脚本位于 `scripts/` 目录: +- `scripts/main.py`: 主脚本,提供 `upsert`/`query`/`list`/`export` 四种操作 + +## 配置说明 +CRM 数据文件路径及字段默认值,详见 `config.yaml` + +## 触发条件 +- 通话录音分析完成后,需要录入候选人数据 +- JD 与简历批量初筛完成后,需要对候选人做初步建档 +- 用户查询某候选人信息:「查一下 13812341234 的候选人画像」 +- 需要导出候选人列表或生成汇总报表 + +## 使用方法 + +### 前置准备 +确保 Python 3 环境可用,无需额外依赖(使用标准库)。 + +### 调用示例 + +#### 1. 新增或更新候选人 (upsert) +```bash +# 完整字段示例 +python scripts/main.py --action upsert --phone 13812341234 --candidate_name 刘女士 --email liu@example.com --is_qualified true --gender 男 --industry 互联网 --current_position 高级Java工程师 --years_of_exp 8 --job_switch_intent 高 --candidate_focus 薪资涨幅、技术栈匹配 --notes "目前在职,期望薪资涨幅30%" --transcript_text "您好,请问是刘女士吗?..." --project_experience "负责支付平台和风控平台建设" --technical_capability "Java、Spring Cloud、MySQL、Kafka" --education_level 本科 --jd_years_match 符合 --jd_match_score 82 --ai_match_conclusion "技术和项目背景较贴合 JD" --ai_match_evidence "简历中有 8 年后端经验,录音中确认做过云原生部署" + +# 部分字段示例(仅更新特定字段) +python scripts/main.py --action upsert --phone 13812341234 --job_switch_intent 中 + +# 从音频文件名解析电话号码与姓名 +python scripts/main.py --action upsert --phone 13999999999-刘女士.mp3 --gender 女 --industry 金融 --current_position 风控经理 +``` + +#### 2. 查询候选人 (query) +```bash +python scripts/main.py --action query --phone 13812341234 +python scripts/main.py --action query --phone 13999999999-刘女士.mp3 +``` + +#### 3. 列出全部候选人 (list) +```bash +python scripts/main.py --action list +``` + +#### 4. 导出 Markdown 报表 (export) +```bash +python scripts/main.py --action export +``` + +## 参数说明 +| 参数名 | 类型 | 必填 | 说明 | +|--------|------|------|------| +| action | string | 是 | 操作类型:`upsert`/`query`/`list`/`export` | +| phone | string | 条件必填 | 候选人电话号码,兼容 `手机号` / `手机号-姓名` / `手机号-姓名.mp3` | +| candidate_name | string | 否 | 候选人姓名;若未提供,可从文件名解析 | +| email | string | 否 | 候选人邮箱,通常来自简历文本抽取 | +| is_qualified | bool | 否 | 是否为有效候选人 | +| gender | string | 否 | 候选人性别:`男`/`女`/`未知` | +| industry | string | 否 | 所在行业 | +| current_position | string | 否 | 当前职位 | +| years_of_exp | int | 否 | 工作年限 | +| job_switch_intent | string | 否 | 跳槽意向:`高`/`中`/`低` | +| candidate_focus | string | 否 | 关注重点 | +| notes | string | 否 | 备注信息 | +| transcript_text | string | 否 | 音频转写原文,保存到数据库供后续审查 | +| project_experience | string | 否 | 候选人既往项目经验总结 | +| technical_capability | string | 否 | 候选人技术能力总结 | +| education_level | string | 否 | 学历水平 | +| jd_years_match | string | 否 | 工作年限是否符合 JD | +| jd_match_score | int | 否 | JD 匹配分,范围 `0-100` | +| ai_match_conclusion | string | 否 | AI 对候选人与 JD 匹配度的结论 | +| ai_match_evidence | string | 否 | AI 判断依据 | + +## 返回示例 + +### upsert +``` +✅ 候选人 138****1234 数据已更新 + 姓名: 刘女士 | 邮箱: liu@example.com | 有效候选人: 是 + 性别: 男 | 行业: 互联网 | 职位: 高级Java工程师 + 工作年限: 8年 + 跳槽意向: 高 + 关注重点: 薪资涨幅、技术栈匹配 + 项目经验: 负责支付平台和风控平台建设 + 技术能力: Java、Spring Cloud、MySQL、Kafka + 学历水平: 本科 | 年限匹配: 符合 | 匹配分: 82分 + AI结论: 技术和项目背景较贴合 JD + AI依据: 简历中有 8 年后端经验,录音中确认做过云原生部署 + 转写原文: 已保存 + 更新时间: 2026-04-19 10:00 +``` + +### query +``` +📋 候选人 138****1234 档案 + 姓名: 刘女士 | 邮箱: liu@example.com | 有效候选人: 是 | 性别: 男 + 行业: 互联网 | 职位: 高级Java工程师 | 工作年限: 8年 + 跳槽意向: 高 + 关注重点: 薪资涨幅、技术栈匹配 + 项目经验: 负责支付平台和风控平台建设 + 技术能力: Java、Spring Cloud、MySQL、Kafka + 学历水平: 本科 + JD年限匹配: 符合 | JD匹配分: 82分 + AI结论: 技术和项目背景较贴合 JD + AI依据: 简历中有 8 年后端经验,录音中确认做过云原生部署 + 最近通话: 2026-04-19 + 备注: 目前在职,期望薪资涨幅30%,可接受北京上海 + 转写原文: + 您好,请问是刘女士吗?我是猎头顾问... +``` + +### list +``` +📊 全部候选人列表 (2人) + [1] 138****1234 - 刘女士 - 男 - 互联网 - 高级Java工程师 - 8年 - 高意向 + [2] 139****5678 - 王女士 - 女 - 金融 - 风控经理 - 5年 - 低意向 +``` + +### export +```markdown +| 电话号码 | 姓名 | 邮箱 | 有效候选人 | 性别 | 行业 | 职位 | 工作年限 | 学历 | JD年限匹配 | JD匹配分 | AI结论 | 最近通话 | +|----------|------|------|-----------|------|------|------|---------|------|------------|----------|--------|----------| +| 138****1234 | 刘女士 | liu@example.com | 是 | 男 | 互联网 | 高级Java工程师 | 8 | 本科 | 符合 | 82 | 技术和项目背景较贴合 JD | 2026-04-19 | +| 139****5678 | 王女士 | wang@example.com | 否 | 女 | 金融 | 风控经理 | 5 | 本科 | 部分符合 | 64 | 经验方向相关但技术栈有偏差 | 2026-04-18 | +``` diff --git a/skills/byted-arkclaw-local-hr-crm/requirements.txt b/skills/byted-arkclaw-local-hr-crm/requirements.txt new file mode 100644 index 00000000..3aecde93 --- /dev/null +++ b/skills/byted-arkclaw-local-hr-crm/requirements.txt @@ -0,0 +1 @@ +pyyaml>=6.0 diff --git a/skills/byted-arkclaw-local-hr-crm/scripts/main.py b/skills/byted-arkclaw-local-hr-crm/scripts/main.py new file mode 100644 index 00000000..50efd70e --- /dev/null +++ b/skills/byted-arkclaw-local-hr-crm/scripts/main.py @@ -0,0 +1,543 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import json +import logging +import os +import re +from datetime import datetime + +import yaml + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# 脚本所在目录 +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +# Skill 根目录(scripts 目录的上一级) +SKILL_ROOT = os.path.dirname(SCRIPT_DIR) + +CRM_CONFIG = { + 'data_path': './crm_data.json', + 'backup_enabled': True, + 'max_backups': 5 +} + +VALID_GENDERS = ['男', '女', '未知'] +VALID_INTENTS = ['高', '中', '低'] +PHONE_PATTERN = re.compile(r'^1[3-9]\d{9}$') +PHONE_NAME_FILE_PATTERN = re.compile(r'^(1[3-9]\d{9})(?:-([^/\\]+?))?(?:\.(?:mp3|wav|m4a))?$', re.IGNORECASE) + + +def load_config(): + config_path = os.path.join(SKILL_ROOT, 'config.yaml') + if not os.path.exists(config_path): + logger.warning(f"配置文件不存在: {config_path},使用默认配置") + return + try: + with open(config_path, 'r', encoding='utf-8') as f: + config = yaml.safe_load(f) + if not config: + return + if 'crm' in config: + CRM_CONFIG.update(config['crm']) + logger.info("配置文件加载成功") + except Exception as e: + logger.warning(f"配置文件加载失败: {e},使用默认配置") + + +load_config() + + +def _get_data_path() -> str: + data_path = CRM_CONFIG['data_path'] + if not os.path.isabs(data_path): + data_path = os.path.join(SKILL_ROOT, data_path) + return data_path + + +def _load_crm_data() -> dict: + data_path = _get_data_path() + if not os.path.exists(data_path): + return {} + try: + with open(data_path, 'r', encoding='utf-8') as f: + return json.load(f) + except (json.JSONDecodeError, IOError) as e: + logger.error(f"CRM数据文件读取失败: {e}") + return {} + + +def _save_crm_data(data: dict): + data_path = _get_data_path() + + if CRM_CONFIG.get('backup_enabled') and os.path.exists(data_path): + _create_backup(data_path) + + os.makedirs(os.path.dirname(data_path) or '.', exist_ok=True) + with open(data_path, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + logger.info(f"CRM数据已保存,共{len(data)}条记录") + + +def _create_backup(data_path: str): + backup_dir = os.path.join(os.path.dirname(data_path), 'backups') + os.makedirs(backup_dir, exist_ok=True) + + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + backup_path = os.path.join(backup_dir, f"crm_data_{timestamp}.json") + + try: + with open(data_path, 'r', encoding='utf-8') as src: + content = src.read() + with open(backup_path, 'w', encoding='utf-8') as dst: + dst.write(content) + except IOError as e: + logger.warning(f"备份失败: {e}") + return + + max_backups = CRM_CONFIG.get('max_backups', 5) + backups = sorted( + [f for f in os.listdir(backup_dir) if f.startswith('crm_data_') and f.endswith('.json')] + ) + while len(backups) > max_backups: + old = backups.pop(0) + try: + os.remove(os.path.join(backup_dir, old)) + except IOError: + pass + + +def _mask_phone(phone: str) -> str: + if len(phone) >= 7: + return phone[:3] + '****' + phone[-4:] + return phone + + +def _validate_phone(phone: str) -> bool: + return bool(phone and PHONE_PATTERN.match(phone)) + + +def _parse_phone_input(phone_input: str) -> tuple[str, str]: + """解析 phone 参数,兼容纯手机号、手机号-姓名、手机号-姓名.mp3。""" + value = (phone_input or '').strip() + if not value: + return '', '' + + file_name = os.path.basename(value) + match = PHONE_NAME_FILE_PATTERN.fullmatch(file_name) + if match: + phone = match.group(1) + candidate_name = (match.group(2) or '').strip() + return phone, candidate_name + + return value, '' + + +def upsert_candidate(phone: str, **fields) -> str: + phone, parsed_name = _parse_phone_input(phone) + if not _validate_phone(phone): + return f"❌ 电话号码格式无效: {phone},请提供11位手机号" + + data = _load_crm_data() + now = datetime.now().strftime('%Y-%m-%d %H:%M') + is_new = phone not in data + + if is_new: + record = { + 'phone': phone, + 'candidate_name': '', + 'email': '', + 'is_qualified': False, + 'gender': '未知', + 'industry': '', + 'current_position': '', + 'years_of_exp': None, + 'job_switch_intent': '', + 'candidate_focus': '', + 'notes': '', + 'transcript_text': '', + 'project_experience': '', + 'technical_capability': '', + 'education_level': '', + 'jd_years_match': '', + 'jd_match_score': None, + 'screening_stage': '', + 'screening_decision': '', + 'screening_reason': '', + 'strengths_summary': '', + 'weaknesses_summary': '', + 'final_match_score': None, + 'final_recommendation': '', + 'ai_match_conclusion': '', + 'ai_match_evidence': '', + 'created_at': now, + 'updated_at': now, + 'last_call_date': now[:10] + } + else: + record = data[phone] + record['updated_at'] = now + + if parsed_name and not record.get('candidate_name'): + record['candidate_name'] = parsed_name + + if 'candidate_name' in fields and fields['candidate_name']: + record['candidate_name'] = str(fields['candidate_name']).strip() + + if 'email' in fields and fields['email']: + record['email'] = str(fields['email']).strip() + + if 'is_qualified' in fields and fields['is_qualified'] is not None: + record['is_qualified'] = bool(fields['is_qualified']) + + if 'gender' in fields and fields['gender']: + gender = fields['gender'] + if gender in VALID_GENDERS: + record['gender'] = gender + else: + logger.warning(f"无效性别值: {gender},保留原值") + + if 'industry' in fields and fields['industry']: + record['industry'] = str(fields['industry']) + + if 'current_position' in fields and fields['current_position']: + record['current_position'] = str(fields['current_position']) + + if 'years_of_exp' in fields and fields['years_of_exp'] is not None: + try: + years = int(fields['years_of_exp']) + if 0 <= years < 100: + record['years_of_exp'] = years + else: + logger.warning(f"工作年限超出合理范围: {years}") + except (ValueError, TypeError): + logger.warning(f"无效工作年限值: {fields['years_of_exp']}") + + if 'job_switch_intent' in fields and fields['job_switch_intent']: + intent = fields['job_switch_intent'] + if intent in VALID_INTENTS: + record['job_switch_intent'] = intent + else: + logger.warning(f"无效跳槽意向值: {intent},保留原值") + + if 'candidate_focus' in fields and fields['candidate_focus']: + record['candidate_focus'] = str(fields['candidate_focus']) + + if 'notes' in fields and fields['notes']: + record['notes'] = str(fields['notes']) + + if 'transcript_text' in fields and fields['transcript_text']: + record['transcript_text'] = str(fields['transcript_text']).strip() + + if 'project_experience' in fields and fields['project_experience']: + record['project_experience'] = str(fields['project_experience']).strip() + + if 'technical_capability' in fields and fields['technical_capability']: + record['technical_capability'] = str(fields['technical_capability']).strip() + + if 'education_level' in fields and fields['education_level']: + record['education_level'] = str(fields['education_level']).strip() + + if 'jd_years_match' in fields and fields['jd_years_match']: + record['jd_years_match'] = str(fields['jd_years_match']).strip() + + if 'jd_match_score' in fields and fields['jd_match_score'] is not None: + try: + score = int(fields['jd_match_score']) + if 0 <= score <= 100: + record['jd_match_score'] = score + else: + logger.warning(f"JD匹配分超出合理范围: {score}") + except (ValueError, TypeError): + logger.warning(f"无效JD匹配分值: {fields['jd_match_score']}") + + if 'screening_stage' in fields and fields['screening_stage']: + record['screening_stage'] = str(fields['screening_stage']).strip() + + if 'screening_decision' in fields and fields['screening_decision']: + record['screening_decision'] = str(fields['screening_decision']).strip() + + if 'screening_reason' in fields and fields['screening_reason']: + record['screening_reason'] = str(fields['screening_reason']).strip() + + if 'strengths_summary' in fields and fields['strengths_summary']: + record['strengths_summary'] = str(fields['strengths_summary']).strip() + + if 'weaknesses_summary' in fields and fields['weaknesses_summary']: + record['weaknesses_summary'] = str(fields['weaknesses_summary']).strip() + + if 'final_match_score' in fields and fields['final_match_score'] is not None: + try: + score = int(fields['final_match_score']) + if 0 <= score <= 100: + record['final_match_score'] = score + else: + logger.warning(f"最终匹配分超出合理范围: {score}") + except (ValueError, TypeError): + logger.warning(f"无效最终匹配分值: {fields['final_match_score']}") + + if 'final_recommendation' in fields and fields['final_recommendation']: + record['final_recommendation'] = str(fields['final_recommendation']).strip() + + if 'ai_match_conclusion' in fields and fields['ai_match_conclusion']: + record['ai_match_conclusion'] = str(fields['ai_match_conclusion']).strip() + + if 'ai_match_evidence' in fields and fields['ai_match_evidence']: + record['ai_match_evidence'] = str(fields['ai_match_evidence']).strip() + + if 'last_call_date' in fields and fields['last_call_date']: + record['last_call_date'] = str(fields['last_call_date']) + + data[phone] = record + _save_crm_data(data) + + masked = _mask_phone(phone) + action_word = "新增" if is_new else "更新" + qualified_str = "是" if record['is_qualified'] else "否" + exp_str = f"{record['years_of_exp']}年" if record['years_of_exp'] is not None else "未知" + name_str = record.get('candidate_name') or '未知' + transcript_flag = "已保存" if record.get('transcript_text') else "未保存" + jd_score_str = ( + f"{record['jd_match_score']}分" + if record.get('jd_match_score') is not None + else "未评估" + ) + final_score_str = ( + f"{record['final_match_score']}分" + if record.get('final_match_score') is not None + else "未评估" + ) + + return ( + f"✅ 候选人 {masked} 数据已{action_word}\n" + f" 姓名: {name_str} | 邮箱: {record.get('email') or '无'} | 有效候选人: {qualified_str}\n" + f" 性别: {record['gender']} | 行业: {record['industry'] or '未知'} | 职位: {record['current_position'] or '未知'}\n" + f" 工作年限: {exp_str}\n" + f" 流程阶段: {record['screening_stage'] or '未设置'} | 初筛结论: {record['screening_decision'] or '未评估'}\n" + f" 初筛依据: {record['screening_reason'] or '无'}\n" + f" 跳槽意向: {record['job_switch_intent'] or '未评估'}\n" + f" 关注重点: {record['candidate_focus'] or '无'}\n" + f" 项目经验: {record['project_experience'] or '无'}\n" + f" 技术能力: {record['technical_capability'] or '无'}\n" + f" 学历水平: {record['education_level'] or '未知'} | 年限匹配: {record['jd_years_match'] or '未评估'} | 匹配分: {jd_score_str}\n" + f" 候选人优势: {record['strengths_summary'] or '无'}\n" + f" 候选人劣势: {record['weaknesses_summary'] or '无'}\n" + f" 最终推荐: {record['final_recommendation'] or '未评估'} | 最终得分: {final_score_str}\n" + f" AI结论: {record['ai_match_conclusion'] or '无'}\n" + f" AI依据: {record['ai_match_evidence'] or '无'}\n" + f" 转写原文: {transcript_flag}\n" + f" 更新时间: {record['updated_at']}" + ) + + +def query_candidate(phone: str) -> str: + phone, _ = _parse_phone_input(phone) + if not _validate_phone(phone): + return f"❌ 电话号码格式无效: {phone},请提供11位手机号" + + data = _load_crm_data() + if phone not in data: + return f"📋 未找到电话号码 {_mask_phone(phone)} 的候选人档案" + + r = data[phone] + masked = _mask_phone(phone) + qualified_str = "是" if r.get('is_qualified') else "否" + exp_str = f"{r['years_of_exp']}年" if r.get('years_of_exp') is not None else "未知" + name_str = r.get('candidate_name') or '未知' + jd_score_str = ( + f"{r['jd_match_score']}分" + if r.get('jd_match_score') is not None + else "未评估" + ) + final_score_str = ( + f"{r['final_match_score']}分" + if r.get('final_match_score') is not None + else "未评估" + ) + transcript_text = r.get('transcript_text') or '' + transcript_block = ( + f"\n 转写原文:\n{_indent_multiline(transcript_text, ' ')}" + if transcript_text + else "\n 转写原文: 无" + ) + + return ( + f"📋 候选人 {masked} 档案\n" + f" 姓名: {name_str} | 邮箱: {r.get('email') or '无'} | 有效候选人: {qualified_str} | 性别: {r.get('gender', '未知')}\n" + f" 行业: {r.get('industry') or '未知'} | 职位: {r.get('current_position') or '未知'} | 工作年限: {exp_str}\n" + f" 流程阶段: {r.get('screening_stage') or '未设置'} | 初筛结论: {r.get('screening_decision') or '未评估'}\n" + f" 初筛依据: {r.get('screening_reason') or '无'}\n" + f" 跳槽意向: {r.get('job_switch_intent') or '未评估'}\n" + f" 关注重点: {r.get('candidate_focus') or '无'}\n" + f" 项目经验: {r.get('project_experience') or '无'}\n" + f" 技术能力: {r.get('technical_capability') or '无'}\n" + f" 学历水平: {r.get('education_level') or '未知'}\n" + f" JD年限匹配: {r.get('jd_years_match') or '未评估'} | JD匹配分: {jd_score_str}\n" + f" 候选人优势: {r.get('strengths_summary') or '无'}\n" + f" 候选人劣势: {r.get('weaknesses_summary') or '无'}\n" + f" 最终推荐: {r.get('final_recommendation') or '未评估'} | 最终得分: {final_score_str}\n" + f" AI结论: {r.get('ai_match_conclusion') or '无'}\n" + f" AI依据: {r.get('ai_match_evidence') or '无'}\n" + f" 最近通话: {r.get('last_call_date', '无记录')}\n" + f" 备注: {r.get('notes') or '无'}\n" + f" 创建时间: {r.get('created_at', '')} | 更新时间: {r.get('updated_at', '')}" + f"{transcript_block}" + ) + + +def list_candidates() -> str: + data = _load_crm_data() + if not data: + return "📋 CRM 中暂无候选人数据" + + lines = [f"📋 候选人列表(共 {len(data)} 条记录)\n"] + for phone, r in sorted(data.items()): + masked = _mask_phone(phone) + flag = "🔴" if r.get('is_qualified') else "🟢" + intent = r.get('job_switch_intent', '') + position = r.get('current_position', '') or '-' + industry = r.get('industry', '') or '-' + name = r.get('candidate_name', '') or '-' + lines.append( + f" {flag} {masked} | {name} | {r.get('email') or '-'} | {r.get('gender', '未知')} | {industry} | {position} | 意向:{intent or '-'}" + ) + return '\n'.join(lines) + + +def export_markdown() -> str: + data = _load_crm_data() + if not data: + return "📋 CRM 中暂无候选人数据,无法导出" + + lines = [ + f"### 候选人CRM报表({datetime.now().strftime('%Y-%m-%d')})\n", + "| 电话号码 | 姓名 | 邮箱 | 阶段 | 初筛结论 | 最终推荐 | 有效候选人 | 职位 | 工作年限 | 学历 | JD匹配分 | 最终得分 | 最近通话 |", + "|----------|------|------|------|----------|----------|-----------|------|---------|------|----------|----------|----------|" + ] + + qualified_count = 0 + for phone, r in sorted(data.items()): + masked = _mask_phone(phone) + is_qualified = r.get('is_qualified', False) + if is_qualified: + qualified_count += 1 + qualified_str = "是" if is_qualified else "否" + exp_str = str(r['years_of_exp']) if r.get('years_of_exp') is not None else "-" + lines.append( + f"| {masked} | {r.get('candidate_name') or '-'} | {r.get('email') or '-'} | {r.get('screening_stage') or '-'} | {r.get('screening_decision') or '-'} | " + f"{r.get('final_recommendation') or '-'} | {qualified_str} | {r.get('current_position') or '-'} | {exp_str} | " + f"{r.get('education_level') or '-'} | {r.get('jd_match_score') if r.get('jd_match_score') is not None else '-'} | " + f"{r.get('final_match_score') if r.get('final_match_score') is not None else '-'} | {r.get('last_call_date', '-')} |" + ) + + lines.append(f"\n> 共 {len(data)} 位候选人,其中有效候选人 {qualified_count} 位") + return '\n'.join(lines) + + +def _indent_multiline(text: str, prefix: str) -> str: + return '\n'.join(f"{prefix}{line}" if line else prefix.rstrip() for line in text.splitlines()) + + +def main(action: str, phone: str = '', **kwargs): + try: + if action == 'upsert': + if not phone: + return "❌ upsert 操作需要提供 phone 参数" + return upsert_candidate(phone, **kwargs) + + elif action == 'query': + if not phone: + return "❌ query 操作需要提供 phone 参数" + return query_candidate(phone) + + elif action == 'list': + return list_candidates() + + elif action == 'export': + return export_markdown() + + else: + return f"❌ 不支持的操作类型: {action},可用操作: upsert/query/list/export" + + except Exception as e: + logger.error(f"CRM操作失败: {str(e)}") + return f"❌ CRM操作失败: {str(e)}" + + +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser(description='候选人CRM数据库管理工具') + parser.add_argument('--action', required=True, + help='操作类型: upsert/query/list/export') + parser.add_argument('--phone', default='', + help='候选人电话号码,兼容 13812341234 / 13812341234-刘 / 13812341234-刘.mp3') + parser.add_argument('--candidate_name', default=None, + help='候选人姓名,未提供时可从文件名中解析') + parser.add_argument('--email', default=None, + help='候选人邮箱,优先从简历文本抽取') + parser.add_argument('--is_qualified', type=lambda x: x.lower() in ('true', '1', 'yes'), + default=None, help='是否为有效候选人') + parser.add_argument('--gender', default=None, + help='候选人性别: 男/女/未知') + parser.add_argument('--industry', default=None, + help='所在行业') + parser.add_argument('--current_position', default=None, + help='当前职位') + parser.add_argument('--years_of_exp', type=int, default=None, + help='工作年限') + parser.add_argument('--job_switch_intent', default=None, + help='跳槽意向: 高/中/低') + parser.add_argument('--candidate_focus', default=None, + help='关注重点') + parser.add_argument('--notes', default=None, + help='备注信息') + parser.add_argument('--transcript_text', default=None, + help='音频转写原文,用于审查留档') + parser.add_argument('--project_experience', default=None, + help='项目经验总结') + parser.add_argument('--technical_capability', default=None, + help='技术能力总结') + parser.add_argument('--education_level', default=None, + help='学历水平,如本科/硕士/博士') + parser.add_argument('--jd_years_match', default=None, + help='工作年限是否符合JD要求,如符合/部分符合/不符合') + parser.add_argument('--jd_match_score', type=int, default=None, + help='JD匹配分,范围0-100') + parser.add_argument('--screening_stage', default=None, + help='流程阶段,如 resume_screened/call_pending/call_completed/final_reviewed') + parser.add_argument('--screening_decision', default=None, + help='初筛结论,如建议沟通/建议补充信息/建议淘汰') + parser.add_argument('--screening_reason', default=None, + help='初筛判断依据') + parser.add_argument('--strengths_summary', default=None, + help='候选人优势总结') + parser.add_argument('--weaknesses_summary', default=None, + help='候选人劣势总结') + parser.add_argument('--final_match_score', type=int, default=None, + help='电话沟通后的最终匹配分,范围0-100') + parser.add_argument('--final_recommendation', default=None, + help='最终推荐结论,如推荐推进/保留观察/不推荐推进') + parser.add_argument('--ai_match_conclusion', default=None, + help='AI 对候选人与JD匹配度的结论') + parser.add_argument('--ai_match_evidence', default=None, + help='AI 判断依据') + args = parser.parse_args() + + fields = {} + for key in [ + 'candidate_name', 'is_qualified', 'gender', 'industry', 'current_position', + 'email', + 'years_of_exp', 'job_switch_intent', 'candidate_focus', 'notes', + 'transcript_text', 'project_experience', 'technical_capability', + 'education_level', 'jd_years_match', 'jd_match_score', + 'screening_stage', 'screening_decision', 'screening_reason', + 'strengths_summary', 'weaknesses_summary', 'final_match_score', + 'final_recommendation', + 'ai_match_conclusion', 'ai_match_evidence' + ]: + val = getattr(args, key) + if val is not None: + fields[key] = val + + result = main(args.action, args.phone, **fields) + print(result)