forked from NCIOCPL/cdr-lib
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdocs.py
More file actions
8216 lines (6887 loc) · 301 KB
/
docs.py
File metadata and controls
8216 lines (6887 loc) · 301 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Manage CDR documents
"""
import base64
import copy
import datetime
import re
import sys
import threading
import time
import unicodedata
from urllib.parse import quote as url_quote
from urllib.parse import unquote as url_unquote
import dateutil.parser
from lxml import etree
import requests
from cdrapi.db import Query
class Doc:
"""
Information about an XML document in the CDR repository
Read-only attributes:
active_status - 'A' if the document is active; 'I' if inactive
blob_date - date/time the document's blob was first saved
cdr_id - standard string representation for the document's id
comment - description of this version of the document
creation - `Doc.Action` object for document creation information
denormalized_xml - xml with links resolved
eids - copy of document error location IDs
errors - sequence of `Error` objects
errors_node - legacy DOM node for the document's errors
export_filename - file name used by export publishing jobs
first_pub - date/time the document was first published (if known)
first_pub_knowable - False for really old legacy documents
frag_ids - set of unique IDs for candidate link target in this doc
hard_error_count - number of real errors (not warnings or info messages)
highest_fragment_id - highest cdr:id attribute value in the form _\\d+
id - primary key in the `all_docs` database table for the document
last_publishable_version - integer for most recent pub version
last_saved - when the document was most recently saved
last_version - integer for the most recently created version
last_version_date - when the last version was created
level - what to keep from revision markup filtering
lock - `Doc.Lock` object (or None if the document isn't checked out)
modification - `Doc.Action` object for last document modification info
publishable - True iff the object's version is marked publishable
ready_for_review - True if the document has be marked review ready
resolved - document after going through revision markup filtering
root - parsed tree for the document's XML
session - login session for which the document information was collected
title - string for the title of this version of the document
val_date - when the version's validation status was last determined
val_status - V[alid], I[nvalid], U[nvalidated], or M[alformed]
valid - True iff the document has passed all validation tests
version - optional integer represent version requested for the doc
has_unversioned changes - True if the all_docs table was updated
more recently than the doc's latest version
Read/write attributes:
xml - unicode string for the serialized DOM for the document
blob - bytes for a BLOB associated with the document (optional)
doctype - `Doctype` object
"""
# The XML namespace used by CDR documents for links and fragment IDs
NS = "cips.nci.nih.gov/cdr"
NSMAP = {"cdr": NS}
# Validation status codes (stored in `val_status` columns)
UNVALIDATED = "U"
VALID = "V"
INVALID = "I"
MALFORMED = "M"
# Status codes indicating whether a document is blocked or deleted
ACTIVE = "A"
BLOCKED = INACTIVE = "I"
DELETED = "D"
VALIDATION_TEMPLATE = None
VALIDATION = "validation"
# Type and level values for error messages
LINK = "link"
OTHER = "other"
LEVEL_OTHER = "other"
LEVEL_INFO = "info"
LEVEL_WARNING = "warning"
LEVEL_ERROR = "error"
LEVEL_FATAL = "fatal"
# Value size constraints
MAX_TITLE_LEN = 255
MAX_COMMENT_LEN = 255
MAX_SQLSERVER_INDEX_SIZE = 800
MAX_INDEX_ELEMENT_DEPTH = 40
INDEX_POSITION_WIDTH = 4
MAX_LOCATION_LENGTH = INDEX_POSITION_WIDTH * MAX_INDEX_ELEMENT_DEPTH
# Patterns for generating the values for columns in the query term tables
HEX_INDEX = f"{{:0{INDEX_POSITION_WIDTH}X}}"
INTEGERS = re.compile(r"\d+")
# Codes indicating which markup revision should be applied
REVISION_LEVEL_PUBLISHED = 3
REVISION_LEVEL_PUBLISHED_OR_APPROVED = 2
REVISION_LEVEL_PUBLISHED_OR_APPROVED_OR_PROPOSED = 1
DEFAULT_REVISION_LEVEL = REVISION_LEVEL_PUBLISHED
# Optimization for mailer cleanup, avoiding mailers from the Oracle system
LEGACY_MAILER_CUTOFF = 390000
# Error messages for exceptions raised when a version can't be found
NOT_VERSIONED = "document not versioned"
NO_PUBLISHABLE_VERSIONS = "no publishable version found"
def __init__(self, session, **opts):
"""
Capture the session and options passed by the caller
Called by:
cdr.getDoc()
client XML wrapper command CdrGetDoc
Two typical scenarios for invoking this constructor would be
* pass in the XML for a new document we will then save
* pass is an ID (and possibly a version) to fetch information
about an existing document
There are many variations on these uses. For example, use the
second option to fetch a document, then make some modifications
to the XML and then save a new version. Or, assuming you already
know what the new XML should be, pass in both the ID and the XML
to the constructor, and then call `doc.save()`.
Required positional argument:
session - `Session` object for which `Doc` object is made
Optional keyword arguments
id - optional unique identifier for existing CDR document
doctype - string for the type of this CDR document
xml - serialized tree for the XML document
blob - binary large object (BLOB) for the document
version - legal values are:
"Current" for current working copy of document
"LastVersion" or "last" for most recent version of docuement
"LastPublishableVersion" or "lastp" for latest publishable ver
"Label ..." to get version with specified label
version number integer
default is current working copy of document from all_docs table
before - only consider versions created before this date
or date/time
level - what to retain when filtering revision markup
default is DEFAULT_REVISION_LEVEL
"""
self.__session = session
self.__opts = opts
self._errors = []
# ------------------------------------------------------------------
# PROPERTIES START HERE.
# ------------------------------------------------------------------
@property
def active_status(self):
"""
'A' if the document is active; 'I' if inactive ("blocked")
"""
if not self.id:
return None
query = Query("all_docs", "active_status")
query.where(query.Condition("id", self.id))
rows = query.execute(self.cursor).fetchall()
assert rows, "Document not in database"
status = rows[0].active_status
assert status in "AID", "Invalid active_status value"
return status
@property
def blob(self):
"""
Bytes for BLOB associated with this version of the document
"""
if not hasattr(self, "_blob"):
if "blob" in self.__opts:
self._blob = self.__opts["blob"]
elif not self.has_blob:
self._blob = None
else:
query = Query("doc_blob", "data")
query.where(query.Condition("id", self._blob_id))
rows = query.execute(self.cursor).fetchall()
if not rows:
raise Exception("no blob found")
self._blob = rows[0].data
return self._blob
@blob.setter
def blob(self, value):
self._blob = value
@property
def blob_date(self):
"""
Date/time the document's blob was last changed
If the blob has been versioned, find the date of the earliest
version with this blob. Otherwise, return the date the
document was last saved, assuming it was saved with a blob
(if not, return None).
"""
table = "version_blob_usage" if self.version else "doc_blob_usage"
query = Query(table, "blob_id")
query.where(query.Condition("doc_id", self.id))
if self.version:
query.where(query.Condition("doc_version", self.version))
rows = query.execute(self.cursor).fetchall()
if not rows or not rows[0]:
return None
blob_id = rows[0].blob_id
query = Query("version_blob_usage u", "MIN(v.dt) AS dt")
query.join("doc_version v", "v.id = u.doc_id", "v.num = u.doc_version")
query.where(query.Condition("doc_id", self.id))
query.where(query.Condition("u.blob_id", blob_id))
rows = query.execute(self.cursor).fetchall()
if rows:
return rows[0].dt
return self.last_saved
@property
def cdr_id(self):
"""
Canonical string form for the CDR document ID (CDR9999999999)
"""
return f"CDR{self.id:010d}" if self.id else None
@property
def comment(self):
"""
String describing this version of the document
"""
return self.__fetch_document_property("comment")
@property
def creation(self):
"""
When and by whom the document was originally created
Return:
`Doc.Action` object (or None if the document has never been saved)
"""
if not self.id:
return None
if hasattr(self, "_creation"):
return self._creation
query = Query("audit_trail t", "t.dt", "u.id", "u.name", "u.fullname")
query.join("action a", "a.id = t.action")
query.join("usr u", "u.id = t.usr")
query.where(query.Condition("t.document", self.id))
query.where("a.name = 'ADD DOCUMENT'")
rows = query.execute(self.cursor).fetchall()
if not rows:
# A small handful of documents bootstrapped the system without
# the audit trail on June 22, 2002.
if self.id > 374:
raise Exception("No audit trail for document creation")
class Action:
def __init__(self, when, user):
self.when = when
self.user = user
when = datetime.datetime(2002, 6, 22, 7)
user = Doc.User(2, "bkline", "Bob Kline")
self._creation = Action(when, user)
else:
self._creation = self.Action(rows[0])
return self._creation
@property
def cursor(self):
"""
Give the document object its own cursor
"""
if not hasattr(self, "_cursor") or self._cursor is None:
self._cursor = self.session.conn.cursor()
return self._cursor
@property
def denormalized_xml(self):
"""
Pass the document's XML through the Fast Denormalization Filter
Don't denormalize filter, css, or schema docs.
If filtering fails (as it will if the original XML is malformed)
return the original XML string.
"""
if not self.xml:
return None
if hasattr(self, "_denormalized_xml") and self._denormalized_xml:
return self._denormalized_xml
if self.is_control_type:
self._denormalized_xml = self.xml
else:
try:
result = self.filter("name:Fast Denormalization Filter")
self._denormalized_xml = str(result.result_tree)
except Exception:
self._denormalized_xml = self.xml
return self._denormalized_xml
@property
def doctype(self):
"""
`Doctype` object representing the type of the document
We have to be careful to look in the row for the version if
the `Doc` object represents a specific version, because the
document type can change from one version to the next.
"""
if not hasattr(self, "_doctype"):
if "doctype" in self.__opts:
name = self.__opts["doctype"]
self._doctype = Doctype(self.session, name=name)
elif not self.id:
self._doctype = None
else:
table = "doc_version" if self.version else "all_docs"
query = Query(table, "doc_type")
query.where(query.Condition("id", self.id))
if self.version:
query.where(query.Condition("num", self.version))
rows = query.execute(self.cursor).fetchall()
if not rows:
what = "version" if self.version else "document"
raise Exception(what + " not found")
self._doctype = Doctype(self.session, id=rows[0].doc_type)
return self._doctype
@doctype.setter
def doctype(self, value):
"""
Set the document type according to the caller's document type name
"""
self._doctype = Doctype(self.session, name=value)
@property
def eids(self):
"""
Return the version of the doc which has cdr-eid attributes (if any)
"""
if hasattr(self, "_eids"):
return self._eids
return None
@property
def errors(self):
"""
Sequence of `Error` objects recorded during processing of document
"""
return self._errors if hasattr(self, "_errors") else []
@property
def errors_node(self):
"""
DOM node representing all of the documents errors/warnings
Used for reporting errors to clients from the API.
"""
if not self.errors:
return None
node = etree.Element("Errors", count=str(len(self.errors)))
for error in self.errors:
node.append(error.to_node())
return node
@property
def export_filename(self):
"""
File name used for publishing export jobs.
"""
if not hasattr(self, "_export_filename"):
suffix = None
if self.doctype.name == "Media":
for node in self.root.findall("PhysicalMedia"):
for child in node.findall("ImageData/ImageEncoding"):
suffix = self.get_text(child)
if suffix is None:
for child in node.findall("SoundData/SoundEncoding"):
suffix = self.get_text(child)
doc_id = self.cdr_id
else:
suffix = "xml"
doc_id = f"CDR{self.id:d}"
if suffix is None:
raise Exception("Encoding missing or unsupported")
suffix = suffix.lower()
if suffix == "jpeg":
suffix = "jpg"
self._export_filename = f"{doc_id}.{suffix}"
return self._export_filename
@property
def first_pub(self):
"""
Date/time the document was first published if known
"""
if not self.id:
return None
query = Query("document", "first_pub")
query.where(query.Condition("id", self.id))
rows = query.execute(self.cursor).fetchall()
date = rows[0].first_pub
if isinstance(date, datetime.datetime):
return date.replace(microsecond=0)
return date
@property
def first_pub_knowable(self):
"""
Flag indicating whether we can know when the doc was first published
Will be False for really old (pre-CDR) documents
"""
if not hasattr(self, "_first_pub_knowable"):
self._first_pub_knowable = False
if self.id:
query = Query("document", "first_pub_knowable")
query.where(query.Condition("id", self.id))
rows = query.execute(self.cursor).fetchall()
if rows and rows[0].first_pub_knowable == "Y":
self._first_pub_knowable = True
return self._first_pub_knowable
@property
def frag_ids(self):
"""
Return the set of unique IDs for candidate link targets in this doc
"""
return self._frag_ids if hasattr(self, "_frag_ids") else None
@property
def hard_error_count(self):
"""
Return the count of real errors (ignoring warnings and info)
"""
count = 0
for error in self._errors:
if error.level in (self.LEVEL_ERROR, self.LEVEL_FATAL):
count += 1
return count
@property
def has_blob(self):
"""
Determine whether the document has a BLOB for this version
Avoid fetching the bytes for the BLOB if it hasn't already been
done; just get the primary key for the BLOB.
"""
if hasattr(self, "_blob"):
return self._blob is not None
if not self.id:
return False
if hasattr(self, "_blob_id"):
return bool(self._blob_id)
table = "version_blob_usage" if self.version else "doc_blob_usage"
query = Query(table, "blob_id")
query.where(query.Condition("doc_id", self.id))
if self.version:
query.where(query.Condition("doc_version", self.version))
rows = query.execute(self.cursor).fetchall()
self._blob_id = rows[0].blob_id if rows else None
return bool(self._blob_id)
@property
def has_unversioned_changes(self):
"""
Determine if the document has saved after the last version
"""
last_saved = self.last_saved
if last_saved is None:
return False
last_version_date = self.last_version_date
if not last_version_date:
return True
return last_version_date < self.last_saved
@property
def highest_fragment_id(self):
"""
Find the highest automatically assigned link target ID
These are stored in `cdr:id` attributes using values starting
with an underscore character followed by one or more decimal
digits.
Return:
integer for the highest target ID assigned to the document
"""
highest = 0
if self.root is None:
return 0
for node in self.root.xpath("//*[@cdr:id]", namespaces=self.NSMAP):
cdr_id = node.get(Link.CDR_ID)
if cdr_id is not None and cdr_id.startswith("_"):
digits = cdr_id[1:]
if digits.isdigit():
highest = max(highest, int(digits))
return highest
@property
def id(self):
"""
Unique integer identifier for the CDR document
"""
if not hasattr(self, "_id"):
try:
self._id = self.extract_id(self.__opts.get("id"))
except Exception:
self._id = None
return self._id
@property
def is_content_type(self):
"""
Return True if the document is a non-control type
"""
if not self.doctype:
return False
return not self.is_control_type
@property
def is_control_type(self):
"""
Return True iff the document is a Filter, schema, or css document
"""
if not self.doctype:
return False
return self.doctype.name in ("Filter", "css", "schema")
@property
def last_publishable_version(self):
"""
Integer for the most recently created publishable version, if any
"""
if not self.id:
return None
query = Query("doc_version", "MAX(num) AS n")
query.where(query.Condition("id", self.id))
query.where("publishable = 'Y'")
rows = query.execute(self.cursor).fetchall()
return rows[0].n if rows else None
@property
def last_saved(self):
"""
Return the last time the document was saved
Includes document creation or modification, with or without
versioning.
"""
modification = self.modification
if modification:
return modification.when
creation = self.creation
if creation:
return creation.when
return None
@property
def last_valid_version(self):
"""
Integer for the most recently created publishable version, if any
"""
if not self.id:
return None
query = Query("doc_version", "MAX(num) AS n")
query.where(query.Condition("id", self.id))
query.where("val_status = 'V'")
rows = query.execute(self.cursor).fetchall()
return rows[0].n if rows else None
@property
def last_version(self):
"""
Integer for the most recently saved version, if any; else None
"""
if not self.id:
return None
query = Query("doc_version", "MAX(num) AS n")
query.where(query.Condition("id", self.id))
rows = query.execute(self.cursor).fetchall()
return rows[0].n if rows else None
@property
def last_version_date(self):
"""
Date/time when the last version was created, if any; else None
"""
if not self.id:
return None
query = Query("doc_version", "MAX(updated_dt) as dt")
query.where(query.Condition("id", self.id))
rows = query.execute(self.cursor).fetchall()
date = rows[0].dt if rows else None
if isinstance(date, datetime.datetime):
return date.replace(microsecond=0)
return date
@property
def lock(self):
"""
`Doc.Lock` object if checked out; otherwise None
Don't cache this value (in case some other process locks
the document). This means that users of the property should
assign it to a local variable for efficiency within a block
of processing over a short period of time.
"""
if not self.id:
return None
fields = "c.dt_out", "u.id", "u.name", "u.fullname"
query = Query("checkout c", *fields)
query.join("usr u", "u.id = c.usr")
query.where(query.Condition("c.id", self.id))
query.where("c.dt_in IS NULL")
rows = query.execute(self.cursor).fetchall()
return self.Lock(rows[0]) if rows else None
@property
def modification(self):
"""
When and by whom the document was last modified
Return:
`Doc.Action` object if modification found; otherwise None
"""
if not self.id:
return None
query = Query("audit_trail t", "t.dt", "u.id", "u.name", "u.fullname")
query.join("action a", "a.id = t.action")
query.join("usr u", "u.id = t.usr")
query.where(query.Condition("t.document", self.id))
query.where("a.name = 'MODIFY DOCUMENT'")
query.order("t.dt DESC").limit(1)
rows = query.execute(self.cursor).fetchall()
return self.Action(rows[0]) if rows else None
@property
def publishable(self):
"""
True if this is a numbered publishable version; else False
"""
if not self.id or not self.version:
return None
query = Query("doc_version", "publishable")
query.where(query.Condition("id", self.id))
query.where(query.Condition("num", self.version))
rows = query.execute(self.cursor).fetchall()
if not rows:
message = f"Information for version {self.version} missing"
raise Exception(message)
return rows[0].publishable == "Y"
@property
def ready_for_review(self):
"""
True if this is a new document which is ready for review
"""
query = Query("ready_for_review", "doc_id")
query.where(query.Condition("doc_id", self.id))
rows = query.execute(self.cursor).fetchall()
return bool(rows)
@property
def resolved(self):
"""
Copy of `self.root` with revision markup applied.
"""
if self.root is None:
return None
return self.__apply_revision_markup()
@property
def revision_level(self):
"""
Integer showing what should be retained by revision markup filtering
"""
return self.__opts.get("level") or self.DEFAULT_REVISION_LEVEL
@property
def root(self):
"""
Parsed tree for the document's XML
"""
if not hasattr(self, "_root") or self._root is None:
try:
self._root = etree.fromstring(self.xml.encode("utf-8"))
except Exception:
self.session.logger.exception("can't parse %r", self.xml)
self._root = None
return self._root
@property
def session(self):
"""
`Session` for which this `Doc` object was requested
"""
return self.__session
@property
def title(self):
"""
String for the title of this version of the document
"""
return self.__fetch_document_property("title")
@property
def val_date(self):
"""
Date/time this version of the document was last validated
"""
val_date = self.__fetch_document_property("val_date")
if isinstance(val_date, datetime.datetime):
return val_date.replace(microsecond=0)
return val_date
@property
def val_status(self):
"""
'V' (valid), 'I' (invalid), 'Y' (unvalidated), or 'M' (malformed)
"""
if hasattr(self, "_val_status"):
return self._val_status
return self.__fetch_document_property("val_status")
@property
def valid(self):
"""
Return True iff the document passed all validation tests
"""
return self.val_status == self.VALID
@property
def version(self):
"""
Integer for specific version for all_doc_versions row (or None)
"""
# Pull out the version-related options passed into the constructor.
self.session.logger.debug("@version: __opts = %s", self.__opts)
version = self.__opts.get("version")
cutoff = self.__opts.get("before")
# If we've done this before, the version integer has been cached
if not hasattr(self, "_version") or self._version is None:
# Handle the obvious case first.
if str(version).isdigit():
self._version = int(version) or None
# If the document hasn't been saved (no ID) it has no version.
elif not self.id:
self._version = None
# Look up any "before this date" versions.
elif cutoff:
lastp = str(version).startswith("lastp")
self._version = self.__get_version_before(cutoff, lastp)
# See if this is an object for the current working document.
elif not version:
self._version = None
# At this point we assume version is a string; normalize it.
else:
try:
version = version.lower()
except Exception:
raise Exception(f"invalid version {version!r}")
# Current is an alias for non-versioned copy.
if version in ("current", "none"):
self._version = None
# We have properties for last (published) versions.
elif version in ("last", "lastversion"):
version = self.last_version
if not version:
raise Exception(self.NOT_VERSIONED)
self._version = version
elif version.startswith("lastp"):
version = self.last_publishable_version
if not version:
raise Exception(self.NO_PUBLISHABLE_VERSIONS)
self._version = version
elif version.startswith("lastv"):
self._version = self.last_valid_version
# Version labels have never been used, but you never know!
elif version.startswith("label "):
tokens = version.split(" ", 1)
if len(tokens) != 2:
error = "missing token for version specifier"
raise Exception(error)
_prefix, label = tokens
self._version = self.__get_labeled_version(label)
# We've run out of valid options.
else:
error = f"invalid version spec {version}"
self.session.logger.exception(error)
raise Exception(error)
# Return the cached version value.
return self._version
@property
def xml(self):
"""
Unicode string for the serialized DOM for this version of the doc
"""
if hasattr(self, "_xml"):
return self._xml
self._xml = self.__opts.get("xml")
if self._xml:
if not isinstance(self._xml, str):
self._xml = self._xml.decode("utf-8")
elif self.id:
if self.version:
query = Query("doc_version", "xml")
query.where(query.Condition("num", self.version))
else:
query = Query("document", "xml")
query.where(query.Condition("id", self.id))
rows = query.execute(self.cursor).fetchall()
if not rows:
raise Exception("no xml found")
self._xml = rows[0].xml
return self._xml
@xml.setter
def xml(self, value):
"""
Assign a new value to the `xml` property, coercing to Unicode
Invalidate any parse trees.
Pass:
value - new property value
"""
self._xml = value
if self._xml and not isinstance(self._xml, str):
self._xml = self._xml.decode("utf-8")
self._root = self._denormalized_xml = self._resolved = None
# ------------------------------------------------------------------
# PUBLIC METHODS START HERE.
# ------------------------------------------------------------------
def add_external_mapping(self, usage, value, **opts):
"""
Insert a row into the external mapping table
This is used by the XMetaL client when the user wants to
register a variant phrase found in the document being edited
for a glossary term.
Called by:
cdr.addExternalMapping()
client XML wrapper command CdrAddExternalMapping
Required positional arguments:
usage - string representing the context for the mapping
(for example, 'Spanish GlossaryTerm Phrases')
value - string for the value to be mapped to this document
Optional keyword arguments:
bogus - if "Y" value does not really map to any document,
but is instead a known invalid value found in
(usually imported) data
mappable - if "N" the value is not an actual field value;
often it's a comment explaining why no value
which could be mapped to a CDR doc is available
Return:
integer primary key for newly inserted mapping table row
"""
# Make sure we have the required arguments.
self.session.log(f"add_external_usage({usage!r}, {value!r})")
if not usage:
raise Exception("Missing usage name")
if not value:
raise Exception("Missing mapping value")
# Get values for the optional arguments.
bogus = (opts.get("bogus") or "N").upper()
mappable = (opts.get("mappable") or "Y").upper()
assert bogus in "YN", "Bogus 'bogus' option"
assert mappable in "YN", "Invalid 'mappable' options"
# Find the usage ID and action name.
query = Query("external_map_usage u", "u.id", "a.name")
query.join("action a", "a.id = u.auth_action")
query.where(query.Condition("u.name", usage))
rows = query.execute(self.cursor).fetchall()
if not rows:
raise Exception(f"Unknown usage {usage!r}")
usage_id, action = list(rows[0])
# Make sure the user is allowed to add a row for this usage.
if not self.session.can_do(action):
message = f"User not allowed to add {usage} mappings"
raise Exception(message)
# Add the new mapping row.
fields = dict(
usage=usage_id,
value=str(value),
doc_id=self.id,
usr=self.session.user_id,
last_mod=datetime.datetime.now().replace(microsecond=0),
bogus=bogus,
mappable=mappable
)
names = sorted(fields)
args = ", ".join(names), ", ".join(["?"] * len(names))
values = tuple(fields[name] for name in names)
insert = "INSERT INTO external_map ({}) VALUES ({})".format(*args)
self.cursor.execute(insert, values)
self.session.conn.commit()
self.cursor.execute("SELECT @@IDENTITY AS id")
return self.cursor.fetchall()[0].id
def add_error(self, message, location=None, **opts):
"""
Add an `Error` object to our list
This is public because `Link` objects call it.
Required positional argument:
message - description of the problem
Optional keyword arguments:
location - where the error was found (None if unavailable)
type - string for type of error (default 'validation')
level - how serious is the problem (default 'error')
"""
# Make sure we've got something to append to.
if not hasattr(self, "_errors"):