|
1 | 1 | """Team mapping configuration for developer-to-team assignment.""" |
2 | 2 |
|
| 3 | +import logging |
3 | 4 | import re |
| 5 | +from datetime import date, timedelta |
4 | 6 | from pathlib import Path |
5 | | -from typing import Dict, Optional |
| 7 | +from typing import Dict, List, Optional |
6 | 8 |
|
7 | 9 | import yaml |
8 | 10 |
|
| 11 | +logger = logging.getLogger(__name__) |
| 12 | + |
9 | 13 |
|
10 | 14 | def _parse_team_assignments_text(content: str) -> Dict[str, str]: |
11 | 15 | """ |
@@ -159,3 +163,152 @@ def get_team_for_repo(owner: str, repo: str, mapping: Optional[Dict[str, str]] = |
159 | 163 | Kept for backward compatibility; returns empty string. |
160 | 164 | """ |
161 | 165 | return "" |
| 166 | + |
| 167 | + |
| 168 | +# --------------------------------------------------------------------------- |
| 169 | +# Developer tenure (start/end dates) for headcount calculations |
| 170 | +# --------------------------------------------------------------------------- |
| 171 | + |
| 172 | +_TENURE_FILENAME = "developer-tenure.yaml" |
| 173 | + |
| 174 | + |
| 175 | +def _parse_date_field(val: object) -> Optional[date]: |
| 176 | + if not val: |
| 177 | + return None |
| 178 | + try: |
| 179 | + return date.fromisoformat(str(val)) |
| 180 | + except (ValueError, TypeError): |
| 181 | + return None |
| 182 | + |
| 183 | + |
| 184 | +def _parse_leaves(raw_leaves: object) -> List[Dict[str, date]]: |
| 185 | + """Parse a list of ``{from: ..., to: ...}`` leave ranges.""" |
| 186 | + if not isinstance(raw_leaves, list): |
| 187 | + return [] |
| 188 | + result: List[Dict[str, date]] = [] |
| 189 | + for entry in raw_leaves: |
| 190 | + if not isinstance(entry, dict): |
| 191 | + continue |
| 192 | + leave_from = _parse_date_field(entry.get("from")) |
| 193 | + leave_to = _parse_date_field(entry.get("to")) |
| 194 | + if leave_from and leave_to: |
| 195 | + result.append({"from": leave_from, "to": leave_to}) |
| 196 | + return result |
| 197 | + |
| 198 | + |
| 199 | +def load_developer_tenure( |
| 200 | + cwd: Optional[Path] = None, |
| 201 | +) -> Dict[str, dict]: |
| 202 | + """Load developer start/end dates and leave ranges from developer-tenure.yaml. |
| 203 | +
|
| 204 | + Returns dict keyed by GitHub username:: |
| 205 | +
|
| 206 | + {"alice": {"start": date(2024,1,1), "end": None, "leaves": [...]}, ...} |
| 207 | +
|
| 208 | + Each leave entry is ``{"from": date, "to": date}``. |
| 209 | + """ |
| 210 | + base = cwd or Path.cwd() |
| 211 | + path = base / _TENURE_FILENAME |
| 212 | + if not path.exists(): |
| 213 | + logger.warning("%s not found – headcount will fall back to CSV-derived counts", _TENURE_FILENAME) |
| 214 | + return {} |
| 215 | + try: |
| 216 | + data = yaml.safe_load(path.read_text(encoding="utf-8")) |
| 217 | + except Exception as exc: |
| 218 | + logger.warning("Failed to parse %s: %s", _TENURE_FILENAME, exc) |
| 219 | + return {} |
| 220 | + if not isinstance(data, dict): |
| 221 | + return {} |
| 222 | + devs_raw = data.get("developers", data) |
| 223 | + if not isinstance(devs_raw, dict): |
| 224 | + return {} |
| 225 | + |
| 226 | + result: Dict[str, dict] = {} |
| 227 | + for username, info in devs_raw.items(): |
| 228 | + if not isinstance(info, dict): |
| 229 | + continue |
| 230 | + start_date = _parse_date_field(info.get("start")) |
| 231 | + if start_date is None: |
| 232 | + continue |
| 233 | + result[str(username).strip()] = { |
| 234 | + "start": start_date, |
| 235 | + "end": _parse_date_field(info.get("end")), |
| 236 | + "leaves": _parse_leaves(info.get("leaves")), |
| 237 | + } |
| 238 | + return result |
| 239 | + |
| 240 | + |
| 241 | +def _is_on_leave(week_start: date, week_end: date, leaves: List[Dict[str, date]]) -> bool: |
| 242 | + """True if any leave range fully covers the week (from <= week_start and to >= week_end).""" |
| 243 | + for lv in leaves: |
| 244 | + if lv["from"] <= week_start and lv["to"] >= week_end: |
| 245 | + return True |
| 246 | + return False |
| 247 | + |
| 248 | + |
| 249 | +def get_active_headcount( |
| 250 | + week_start: date, |
| 251 | + team: Optional[str] = None, |
| 252 | + tenure: Optional[Dict[str, dict]] = None, |
| 253 | + team_mapping: Optional[Dict[str, str]] = None, |
| 254 | +) -> int: |
| 255 | + """Return the number of developers active during the week starting at *week_start*. |
| 256 | +
|
| 257 | + A developer is active if ``start <= week_end`` and (``end`` is None or |
| 258 | + ``end >= week_start``) and they are not on leave for the entire week. |
| 259 | +
|
| 260 | + Args: |
| 261 | + week_start: Monday of the ISO week. |
| 262 | + team: If given, only count developers belonging to this team. |
| 263 | + tenure: Pre-loaded tenure dict (from :func:`load_developer_tenure`). |
| 264 | + team_mapping: Pre-loaded team mapping (from :func:`load_team_mapping`). |
| 265 | + """ |
| 266 | + if tenure is None: |
| 267 | + tenure = load_developer_tenure() |
| 268 | + if team_mapping is None: |
| 269 | + team_mapping = load_team_mapping() |
| 270 | + |
| 271 | + week_end = week_start + timedelta(days=6) |
| 272 | + count = 0 |
| 273 | + for dev, info in tenure.items(): |
| 274 | + start = info["start"] |
| 275 | + end = info.get("end") |
| 276 | + if start is None or start > week_end: |
| 277 | + continue |
| 278 | + if end is not None and end < week_start: |
| 279 | + continue |
| 280 | + if team is not None and team_mapping.get(dev, "") != team: |
| 281 | + continue |
| 282 | + if _is_on_leave(week_start, week_end, info.get("leaves", [])): |
| 283 | + continue |
| 284 | + count += 1 |
| 285 | + return count |
| 286 | + |
| 287 | + |
| 288 | +def get_weekly_headcounts( |
| 289 | + weeks: List[date], |
| 290 | + teams: Optional[List[str]] = None, |
| 291 | + cwd: Optional[Path] = None, |
| 292 | +) -> Dict[str, List[int]]: |
| 293 | + """Compute headcount for each week, for 'All Teams' and each team. |
| 294 | +
|
| 295 | + Returns:: |
| 296 | +
|
| 297 | + {"All Teams": [12, 13, ...], "Core": [5, 5, ...], ...} |
| 298 | + """ |
| 299 | + tenure = load_developer_tenure(cwd) |
| 300 | + team_mapping = load_team_mapping(cwd) |
| 301 | + |
| 302 | + if teams is None: |
| 303 | + teams = sorted({t for t in team_mapping.values() if t and t != "Bots"}) |
| 304 | + |
| 305 | + result: Dict[str, List[int]] = {"All Teams": []} |
| 306 | + for t in teams: |
| 307 | + result[t] = [] |
| 308 | + |
| 309 | + for w in weeks: |
| 310 | + result["All Teams"].append(get_active_headcount(w, team=None, tenure=tenure, team_mapping=team_mapping)) |
| 311 | + for t in teams: |
| 312 | + result[t].append(get_active_headcount(w, team=t, tenure=tenure, team_mapping=team_mapping)) |
| 313 | + |
| 314 | + return result |
0 commit comments