diff --git a/src/onegov/people/cli.py b/src/onegov/people/cli.py index 4625963f5b..4d172da8b3 100644 --- a/src/onegov/people/cli.py +++ b/src/onegov/people/cli.py @@ -9,6 +9,7 @@ from onegov.core.cli import command_group from onegov.core.cli import abort +from onegov.people.collections import PersonCollection from onegov.people.models import Person from openpyxl import load_workbook from openpyxl import Workbook @@ -212,6 +213,297 @@ def _import(request: CoreRequest, app: Framework) -> None: return _import +# Column headers that identify each Horw export format +_HORW_PERSONEN_HEADER = ( + 'Name', 'Vorname', 'E-Mail', 'Internet', 'Funktion', + 'Strasse', 'Nr.', 'PLZ', 'Ort', 'Telefon Arbeit', + 'Fax Arbeit', 'Mobile Arbeit', 'Oberinstanzen', 'Instanzen', 'Behörden', +) + +_HORW_BEHOERDEN_HEADER = ( + 'Behörde', 'Strasse', 'Nr.', 'Postfach', 'PLZ', 'Ort', + 'Telefon', 'E-Mail', 'URL / Homepage', 'Kategorie', + 'Funktion', 'Name', 'Vorname', 'Präsentation', 'E-Mail', + 'Internet', 'Strasse', 'Nr.', 'Postfach', 'PLZ', 'Ort', + 'Telefon Arbeit', 'Mobile Arbeit', 'Telefon Privat', 'Mobile Privat', + 'Partei', 'Fraktion Einwohnerrat', 'Kategorie', +) + + +def _v(value: object) -> str | None: + if value is None: + return None + if isinstance(value, float) and value.is_integer(): + value = int(value) + s = str(value).strip() + return s or None + + +def _join_address(*parts: object) -> str | None: + joined = ' '.join(s for p in parts if (s := _v(p))) + return joined or None + + +def _read_excel_rows(path: str) -> list[tuple[object, ...]]: + """Read all rows from an .xls or .xlsx file as tuples of cell values.""" + if path.lower().endswith('.xls'): + import xlrd + book = xlrd.open_workbook(path) + sheet = book.sheet_by_index(0) + return [tuple(sheet.row_values(i)) for i in range(sheet.nrows)] + else: + book = load_workbook(path, data_only=True) + sheet = book.active + if TYPE_CHECKING: + from openpyxl.worksheet.worksheet import Worksheet + assert isinstance(sheet, Worksheet) + return [tuple(cell.value for cell in row) for row in sheet.rows] + + +def _parse_horw_personen_row(v: tuple[object, ...]) -> dict[str, object]: + # Name | Vorname | E-Mail | Internet | Funktion | Strasse | Nr. | + # PLZ | Ort | Telefon Arbeit | Fax Arbeit | Mobile Arbeit | + # Oberinstanzen | Instanzen | Behörden + return { + 'last_name': _v(v[0]) or '', + 'first_name': _v(v[1]) or '', + 'email': _v(v[2]), + 'website': _v(v[3]), + 'function': _v(v[4]), + 'location_address': _join_address(v[5], v[6]), + 'location_code_city': _join_address(v[7], v[8]), + 'phone': _v(v[9]), + # v[10] = Fax Arbeit (no model field) + 'phone_direct': _v(v[11]) if len(v) > 11 else None, + '_horw_org': _v(v[12]) if len(v) > 12 else None, # Oberinstanzen + '_horw_sub_org': _v(v[13]) if len(v) > 13 else None, # Instanzen + # v[14] = Behörden (not used for person fields) + } + + +def _parse_horw_behoerden_row(v: tuple[object, ...]) -> dict[str, object]: + # Cols 0-9: Behörde/agency data (skipped) + # 10: Funktion | 11: Name | 12: Vorname | 13: Präsentation | + # 14: E-Mail | 15: Internet | 16: Strasse | 17: Nr. | 18: Postfach | + # 19: PLZ | 20: Ort | 21: Telefon Arbeit | 22: Mobile Arbeit | + # 23: Telefon Privat | 24: Mobile Privat | + # 25: Partei | 26: Fraktion Einwohnerrat | 27: Kategorie + function = ' '.join(filter(None, (_v(v[10]), _v(v[0])))) + return { + 'last_name': _v(v[11]) or '', + 'first_name': _v(v[12]) or '', + 'function': function or None, + 'notes': _v(v[13]), + 'email': _v(v[14]), + 'website': _v(v[15]), + 'location_address': _join_address(v[16], v[17]), + 'location_code_city': _join_address(v[19], v[20]), + 'phone': _v(v[21]), + 'phone_direct': _v(v[22]), + # v[23] = Telefon Privat, v[24] = Mobile Privat (skip) + 'political_party': _v(v[25]) if len(v) > 25 else None, + 'parliamentary_group': _v(v[26]) if len(v) > 26 else None, + # v[27] = Kategorie (skip) + } + + +def _upsert_horw_person( + people: PersonCollection, + first_name: str, + last_name: str, + extra: dict[str, object], +) -> None: + """Find existing person by email or name and update, or create new.""" + person = None + email = extra.get('email') + if email: + person = ( + people.query() + .filter(people.model_class.email == email) + .first() + ) + if person is None: + person = ( + people.query() + .filter(people.model_class.first_name == first_name) + .filter(people.model_class.last_name == last_name) + .first() + ) + if person is not None: + for key, value in extra.items(): + if key == 'function': + existing = person.function or '' + new = str(value) if value else '' + if new and new not in existing: + person.function = ( + f'{existing}; {new}' if existing else new + ) + else: + setattr(person, key, value) + else: + people.add(first_name=first_name, last_name=last_name, **extra) + + +@cli.command('import-horw') +@click.argument('file', type=click.Path(exists=True)) +@click.option('--dry-run', is_flag=True, default=False) +def import_horw( + file: str, + dry_run: bool, +) -> Callable[[CoreRequest, Framework], None]: + """ Imports people from a Horw municipality Excel export. + + Detects the file format automatically from the header row. Supports + both the Personen export (one person per row) and the Behörden export + (one membership per row, people deduplicated by email then name). + + Example: + + onegov-people --select /onegov_town6/horw import-horw export.xls + + """ + + def _import(request: CoreRequest, app: Framework) -> None: + session = app.session() + people = PersonCollection(session) + + all_rows = _read_excel_rows(file) + if not all_rows: + click.secho('Empty file', fg='red') + return + + header = all_rows[0] + + if header == _HORW_PERSONEN_HEADER: + parse_row = _parse_horw_personen_row + is_personen = True + deduplicate = False + click.secho('Detected format: Personen', fg='yellow') + elif header == _HORW_BEHOERDEN_HEADER: + parse_row = _parse_horw_behoerden_row + is_personen = False + deduplicate = True + click.secho('Detected format: Behörden', fg='yellow') + else: + click.secho('Unknown file format. Header columns:', fg='red') + for col in header: + click.secho(f' {col!r}', fg='red') + return + + # Build valid org → sub-orgs lookup from the configured hierarchy + hierarchy = getattr( + getattr(app, 'org', None), 'organisation_hierarchy', None + ) or [] + valid_orgs: dict[str, set[str]] = {} + for item in hierarchy: + if isinstance(item, dict): + for top, subs in item.items(): + valid_orgs[top] = set(subs) + elif isinstance(item, str): + valid_orgs[item] = set() + + seen: set[tuple[str, str]] = set() + count = 0 + errors = 0 + + for row_num, values in enumerate(all_rows[1:], start=2): + fields = parse_row(values) + + first_name = fields['first_name'] + last_name = fields['last_name'] + assert isinstance(first_name, str) + assert isinstance(last_name, str) + + if not first_name and not last_name: + continue + + email = str(fields.get('email') or '') + key = (email, f'{last_name} {first_name}') + if deduplicate and key in seen: + continue + seen.add(key) + + extra = { + k: v for k, v in fields.items() + if k not in ('first_name', 'last_name', '_horw_org', + '_horw_sub_org') + and v is not None + } + + if is_personen: + raw_org = fields.get('_horw_org') + raw_sub_org = fields.get('_horw_sub_org') + if isinstance(raw_org, str) and raw_org: + top_orgs = [s for s in ( + p.strip() for p in raw_org.split(',') + ) if s] + # validate top-level orgs; collect valid ones in order + valid_tops: list[str] = [] + for top in top_orgs: + if valid_orgs and top not in valid_orgs: + click.secho( + f'Row {row_num} ({last_name} {first_name}): ' + f'org {top!r} not in hierarchy', + fg='red') + errors += 1 + else: + valid_tops.append(top) + # assign each sub-org to its parent top-level org + subs_by_top: dict[str, list[str]] = { + t: [] for t in valid_tops + } + if valid_tops and isinstance(raw_sub_org, str) and raw_sub_org: + all_valid_subs: set[str] = set().union( + *(valid_orgs.get(t, set()) for t in valid_tops) + ) + # try the full value first (handles names with commas), + # fall back to comma-splitting + if raw_sub_org in all_valid_subs: + sub_candidates = [raw_sub_org] + else: + sub_candidates = [ + s for s in ( + p.strip() for p in raw_sub_org.split(',') + ) if s + ] + for sub in sub_candidates: + parent = next( + (t for t in valid_tops + if sub in valid_orgs.get(t, set())), + None + ) + if parent is None: + click.secho( + f'Row {row_num} ({last_name} ' + f'{first_name}): sub-org {sub!r} ' + f'not in hierarchy', + fg='red') + errors += 1 + else: + subs_by_top[parent].append(sub) + # build flat list: each org immediately followed by its subs + orgs: list[str] = [] + for top in valid_tops: + orgs.append(top) + orgs.extend(f'-{s}' for s in subs_by_top[top]) + if orgs: + extra['organisations_multiple'] = orgs + + _upsert_horw_person(people, first_name, last_name, extra) + count += 1 + + if dry_run: + transaction.abort() + click.secho( + f'Dry run: would import {count} person(s)', fg='yellow') + else: + click.secho(f'Imported {count} person(s)', fg='green') + if errors: + click.secho(f'{errors} org/sub-org error(s)', fg='red') + + return _import + + @cli.command('list') def list_people() -> Callable[[CoreRequest, Framework], None]: