Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
292 changes: 292 additions & 0 deletions src/onegov/people/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from onegov.core.cli import command_group
from onegov.core.cli import abort
from onegov.people.collections import PersonCollection
from onegov.people.models import Person
from openpyxl import load_workbook
from openpyxl import Workbook
Expand Down Expand Up @@ -212,6 +213,297 @@
return _import


# Column headers that identify each Horw export format
_HORW_PERSONEN_HEADER = (
'Name', 'Vorname', 'E-Mail', 'Internet', 'Funktion',
'Strasse', 'Nr.', 'PLZ', 'Ort', 'Telefon Arbeit',
'Fax Arbeit', 'Mobile Arbeit', 'Oberinstanzen', 'Instanzen', 'Behörden',
)

_HORW_BEHOERDEN_HEADER = (
'Behörde', 'Strasse', 'Nr.', 'Postfach', 'PLZ', 'Ort',
'Telefon', 'E-Mail', 'URL / Homepage', 'Kategorie',
'Funktion', 'Name', 'Vorname', 'Präsentation', 'E-Mail',
'Internet', 'Strasse', 'Nr.', 'Postfach', 'PLZ', 'Ort',
'Telefon Arbeit', 'Mobile Arbeit', 'Telefon Privat', 'Mobile Privat',
'Partei', 'Fraktion Einwohnerrat', 'Kategorie',
)


def _v(value: object) -> str | None:
if value is None:
return None
if isinstance(value, float) and value.is_integer():
value = int(value)
s = str(value).strip()
return s or None


def _join_address(*parts: object) -> str | None:
joined = ' '.join(s for p in parts if (s := _v(p)))
return joined or None


def _read_excel_rows(path: str) -> list[tuple[object, ...]]:
"""Read all rows from an .xls or .xlsx file as tuples of cell values."""
if path.lower().endswith('.xls'):
import xlrd
book = xlrd.open_workbook(path)
sheet = book.sheet_by_index(0)
return [tuple(sheet.row_values(i)) for i in range(sheet.nrows)]
else:
book = load_workbook(path, data_only=True)
sheet = book.active
if TYPE_CHECKING:
from openpyxl.worksheet.worksheet import Worksheet
assert isinstance(sheet, Worksheet)
return [tuple(cell.value for cell in row) for row in sheet.rows]


def _parse_horw_personen_row(v: tuple[object, ...]) -> dict[str, object]:
# Name | Vorname | E-Mail | Internet | Funktion | Strasse | Nr. |
# PLZ | Ort | Telefon Arbeit | Fax Arbeit | Mobile Arbeit |
# Oberinstanzen | Instanzen | Behörden
return {
'last_name': _v(v[0]) or '',
'first_name': _v(v[1]) or '',
'email': _v(v[2]),
'website': _v(v[3]),
'function': _v(v[4]),
'location_address': _join_address(v[5], v[6]),
'location_code_city': _join_address(v[7], v[8]),
'phone': _v(v[9]),
# v[10] = Fax Arbeit (no model field)
'phone_direct': _v(v[11]) if len(v) > 11 else None,
'_horw_org': _v(v[12]) if len(v) > 12 else None, # Oberinstanzen
'_horw_sub_org': _v(v[13]) if len(v) > 13 else None, # Instanzen
# v[14] = Behörden (not used for person fields)
}


def _parse_horw_behoerden_row(v: tuple[object, ...]) -> dict[str, object]:
# Cols 0-9: Behörde/agency data (skipped)
# 10: Funktion | 11: Name | 12: Vorname | 13: Präsentation |
# 14: E-Mail | 15: Internet | 16: Strasse | 17: Nr. | 18: Postfach |
# 19: PLZ | 20: Ort | 21: Telefon Arbeit | 22: Mobile Arbeit |
# 23: Telefon Privat | 24: Mobile Privat |
# 25: Partei | 26: Fraktion Einwohnerrat | 27: Kategorie
function = ' '.join(filter(None, (_v(v[10]), _v(v[0]))))
return {
'last_name': _v(v[11]) or '',
'first_name': _v(v[12]) or '',
'function': function or None,
'notes': _v(v[13]),
'email': _v(v[14]),
'website': _v(v[15]),
'location_address': _join_address(v[16], v[17]),
'location_code_city': _join_address(v[19], v[20]),
'phone': _v(v[21]),
'phone_direct': _v(v[22]),
# v[23] = Telefon Privat, v[24] = Mobile Privat (skip)
'political_party': _v(v[25]) if len(v) > 25 else None,
'parliamentary_group': _v(v[26]) if len(v) > 26 else None,
# v[27] = Kategorie (skip)
}


def _upsert_horw_person(
people: PersonCollection,
first_name: str,
last_name: str,
extra: dict[str, object],
) -> None:
"""Find existing person by email or name and update, or create new."""
person = None
email = extra.get('email')
if email:
person = (
people.query()
.filter(people.model_class.email == email)
.first()
)
if person is None:
person = (
people.query()
.filter(people.model_class.first_name == first_name)
.filter(people.model_class.last_name == last_name)
.first()
)
if person is not None:
for key, value in extra.items():
if key == 'function':
existing = person.function or ''
new = str(value) if value else ''
if new and new not in existing:
person.function = (
f'{existing}; {new}' if existing else new
)
else:
setattr(person, key, value)
else:
people.add(first_name=first_name, last_name=last_name, **extra)


@cli.command('import-horw')
@click.argument('file', type=click.Path(exists=True))
@click.option('--dry-run', is_flag=True, default=False)
def import_horw(
file: str,
dry_run: bool,
) -> Callable[[CoreRequest, Framework], None]:
""" Imports people from a Horw municipality Excel export.

Detects the file format automatically from the header row. Supports
both the Personen export (one person per row) and the Behörden export
(one membership per row, people deduplicated by email then name).

Example:

onegov-people --select /onegov_town6/horw import-horw export.xls

"""

def _import(request: CoreRequest, app: Framework) -> None:
session = app.session()
people = PersonCollection(session)

all_rows = _read_excel_rows(file)
if not all_rows:
click.secho('Empty file', fg='red')
return

header = all_rows[0]

if header == _HORW_PERSONEN_HEADER:
parse_row = _parse_horw_personen_row
is_personen = True
deduplicate = False
click.secho('Detected format: Personen', fg='yellow')
elif header == _HORW_BEHOERDEN_HEADER:
parse_row = _parse_horw_behoerden_row
is_personen = False
deduplicate = True
click.secho('Detected format: Behörden', fg='yellow')
else:
click.secho('Unknown file format. Header columns:', fg='red')
for col in header:
click.secho(f' {col!r}', fg='red')
return

# Build valid org → sub-orgs lookup from the configured hierarchy
hierarchy = getattr(
getattr(app, 'org', None), 'organisation_hierarchy', None
) or []
valid_orgs: dict[str, set[str]] = {}
for item in hierarchy:
if isinstance(item, dict):
for top, subs in item.items():
valid_orgs[top] = set(subs)
elif isinstance(item, str):
valid_orgs[item] = set()

seen: set[tuple[str, str]] = set()
count = 0
errors = 0

for row_num, values in enumerate(all_rows[1:], start=2):
fields = parse_row(values)

first_name = fields['first_name']
last_name = fields['last_name']
assert isinstance(first_name, str)
assert isinstance(last_name, str)

if not first_name and not last_name:
continue

email = str(fields.get('email') or '')
key = (email, f'{last_name} {first_name}')
if deduplicate and key in seen:
continue
seen.add(key)

extra = {
k: v for k, v in fields.items()
if k not in ('first_name', 'last_name', '_horw_org',
'_horw_sub_org')
and v is not None
}

if is_personen:
raw_org = fields.get('_horw_org')
raw_sub_org = fields.get('_horw_sub_org')
if isinstance(raw_org, str) and raw_org:
top_orgs = [s for s in (
p.strip() for p in raw_org.split(',')
) if s]
# validate top-level orgs; collect valid ones in order
valid_tops: list[str] = []
for top in top_orgs:
if valid_orgs and top not in valid_orgs:
click.secho(
f'Row {row_num} ({last_name} {first_name}): '
f'org {top!r} not in hierarchy',
fg='red')
errors += 1
else:
valid_tops.append(top)
# assign each sub-org to its parent top-level org
subs_by_top: dict[str, list[str]] = {
t: [] for t in valid_tops
}
if valid_tops and isinstance(raw_sub_org, str) and raw_sub_org:

Check failure on line 455 in src/onegov/people/cli.py

View workflow job for this annotation

GitHub Actions / Lint

ruff (line-too-long)

src/onegov/people/cli.py:455:80: line-too-long: Line too long (83 > 79)
all_valid_subs: set[str] = set().union(
*(valid_orgs.get(t, set()) for t in valid_tops)
)
# try the full value first (handles names with commas),
# fall back to comma-splitting
if raw_sub_org in all_valid_subs:
sub_candidates = [raw_sub_org]
else:
sub_candidates = [
s for s in (
p.strip() for p in raw_sub_org.split(',')
) if s
]
for sub in sub_candidates:
parent = next(
(t for t in valid_tops
if sub in valid_orgs.get(t, set())),
None
)
if parent is None:
click.secho(
f'Row {row_num} ({last_name} '
f'{first_name}): sub-org {sub!r} '
f'not in hierarchy',
fg='red')
errors += 1
else:
subs_by_top[parent].append(sub)
# build flat list: each org immediately followed by its subs

Check failure on line 484 in src/onegov/people/cli.py

View workflow job for this annotation

GitHub Actions / Lint

ruff (line-too-long)

src/onegov/people/cli.py:484:80: line-too-long: Line too long (80 > 79)
orgs: list[str] = []
for top in valid_tops:
orgs.append(top)
orgs.extend(f'-{s}' for s in subs_by_top[top])
if orgs:
extra['organisations_multiple'] = orgs

_upsert_horw_person(people, first_name, last_name, extra)
count += 1

if dry_run:
transaction.abort()
click.secho(
f'Dry run: would import {count} person(s)', fg='yellow')
else:
click.secho(f'Imported {count} person(s)', fg='green')
if errors:
click.secho(f'{errors} org/sub-org error(s)', fg='red')

return _import


@cli.command('list')
def list_people() -> Callable[[CoreRequest, Framework], None]:

Expand Down
Loading