-
Notifications
You must be signed in to change notification settings - Fork 185
Expand file tree
/
Copy pathMySQL.py
More file actions
executable file
·1730 lines (1445 loc) · 63.2 KB
/
MySQL.py
File metadata and controls
executable file
·1730 lines (1445 loc) · 63.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""DIRAC Basic MySQL Class
It provides access to the basic MySQL methods in a multithread-safe mode
keeping used connections in a python Queue for further reuse.
These are the coded methods:
__init__( host, user, passwd, name, [maxConnsInQueue=10] )
Initializes the Queue and tries to connect to the DB server,
using the _connect method.
"maxConnsInQueue" defines the size of the Queue of open connections
that are kept for reuse. It also defined the maximum number of open
connections available from the object.
maxConnsInQueue = 0 means unlimited and it is not supported.
_except( methodName, exception, errorMessage )
Helper method for exceptions: the "methodName" and the "errorMessage"
are printed with ERROR level, then the "exception" is printed (with
full description if it is a MySQL Exception) and S_ERROR is returned
with the errorMessage and the exception.
_connect()
Attempts connection to DB and sets the _connected flag to True upon success.
Returns S_OK or S_ERROR.
_query( cmd, [conn=conn] )
Executes SQL command "cmd".
Gets a connection from the Queue (or open a new one if none is available),
the used connection is back into the Queue.
If a connection to the the DB is passed as second argument this connection
is used and is not in the Queue.
Returns S_OK with fetchall() out in Value or S_ERROR upon failure.
_update( cmd, [conn=conn] )
Executes SQL command "cmd" and issue a commit
Gets a connection from the Queue (or open a new one if none is available),
the used connection is back into the Queue.
If a connection to the the DB is passed as second argument this connection
is used and is not in the Queue
Returns S_OK with number of updated registers in Value or S_ERROR upon failure.
_createTables( tableDict )
Create a new Table in the DB
_getConnection()
Gets a connection from the Queue (or open a new one if none is available)
Returns S_OK with connection in Value or S_ERROR
the calling method is responsible for closing this connection once it is no
longer needed.
Some high level methods have been added to avoid the need to write SQL
statement in most common cases. They should be used instead of low level
_insert, _update methods when ever possible.
buildCondition( self, condDict = None, older = None, newer = None,
timeStamp = None, orderAttribute = None, limit = False,
greater = None, smaller = None ):
Build SQL condition statement from provided condDict and other extra check on
a specified time stamp.
The conditions dictionary specifies for each attribute one or a List of possible
values
greater and smaller are dictionaries in which the keys are the names of the fields,
that are requested to be >= or < than the corresponding value.
For compatibility with current usage it uses Exceptions to exit in case of
invalid arguments
insertFields( self, tableName, inFields = None, inValues = None, conn = None, inDict = None ):
Insert a new row in "tableName" assigning the values "inValues" to the
fields "inFields".
Alternatively inDict can be used
String type values will be appropriately escaped.
updateFields( self, tableName, updateFields = None, updateValues = None,
condDict = None,
limit = False, conn = None,
updateDict = None,
older = None, newer = None,
timeStamp = None, orderAttribute = None ):
Update "updateFields" from "tableName" with "updateValues".
updateDict alternative way to provide the updateFields and updateValues
N records can match the condition
return S_OK( number of updated rows )
if limit is not False, the given limit is set
String type values will be appropriately escaped.
deleteEntries( self, tableName,
condDict = None,
limit = False, conn = None,
older = None, newer = None,
timeStamp = None, orderAttribute = None ):
Delete rows from "tableName" with
N records can match the condition
if limit is not False, the given limit is set
String type values will be appropriately escaped, they can be single values or lists of values.
getFields( self, tableName, outFields = None,
condDict = None,
limit = False, conn = None,
older = None, newer = None,
timeStamp = None, orderAttribute = None ):
Select "outFields" from "tableName" with condDict
N records can match the condition
return S_OK( tuple(Field,Value) )
if limit is not False, the given limit is set
String type values will be appropriately escaped, they can be single values or lists of values.
for compatibility with other methods condDict keyed argument is added
getCounters( self, table, attrList, condDict = None, older = None,
newer = None, timeStamp = None, connection = False ):
Count the number of records on each distinct combination of AttrList, selected
with condition defined by condDict and time stamps
getDistinctAttributeValues( self, table, attribute, condDict = None, older = None,
newer = None, timeStamp = None, connection = False ):
Get distinct values of a table attribute under specified conditions
"""
import collections
import functools
import json
import os
import time
import threading
import MySQLdb
from DIRAC import gLogger
from DIRAC import S_OK, S_ERROR
from DIRAC.Core.Utilities.TimeUtilities import fromString
from DIRAC.Core.Utilities import DErrno
gInstancesCount = 0
MAXCONNECTRETRY = 10
RETRY_SLEEP_DURATION = 5
def _checkFields(inFields, inValues):
"""
Helper to check match between inFields and inValues lengths
"""
if inFields is None and inValues is None:
return S_OK()
try:
assert len(inFields) == len(inValues)
except AssertionError:
return S_ERROR(DErrno.EMYSQL, "Mismatch between inFields and inValues.")
return S_OK()
def _quotedList(fieldList=None, allowDate=False):
"""
Quote a list of MySQL Field Names with "`"
Return a comma separated list of quoted Field Names
To be use for Table and Field Names
"""
if fieldList is None:
return None
quotedFields = []
try:
for field in fieldList:
if allowDate and field.startswith("date(") and field.endswith(")"):
field = field[len("date(") : -len(")")]
quotedFields.append(f"date(`{field.replace('`', '')}`)")
else:
quotedFields.append(f"`{field.replace('`', '')}`")
except Exception:
return None
if not quotedFields:
return None
return ", ".join(quotedFields)
def captureOptimizerTraces(meth):
"""If enabled, this will dump the optimizer trace for each query performed.
Obviously, it has a performance cost...
In order to enable the tracing, the environment variable ``DIRAC_MYSQL_OPTIMIZER_TRACES_PATH``
should be set and point to an existing directory where the files will be stored.
It makes sense to enable it when preparing the migration to a newer major version
of mysql: you run your integration tests (or whever scenario you prepared) with the old version,
then the same tests with the new version, and compare the output files.
The file produced are called "optimizer_trace_<timestamp>_<hash>.json"
The hash is here to minimize the risk of concurence for the same file.
The timestamp is to maintain the execution order. For easier comparison between two executions,
you can rename the files with a sequence number.
.. code-block:: bash
cd ${DIRAC_MYSQL_OPTIMIZER_TRACES_PATH}
c=0; for i in $(ls); do newFn=$(echo $i | sed -E "s/_trace_[0-9]+.[0-9]+_(.*)/_trace_${c}_\1/g"); mv $i $newFn; c=$(( c + 1 )); done
This tool is useful then to compare the files https://github.com/crusaderky/recursive_diff
Note that this method is far from pretty:
* error handling is not really done. Of course, I could add a lot of safety and try/catch and what not,
but if you are using this, it means you reaaaally want to profile something. And in that case, you want things
to go smoothly. And if they don't, you want to see it, and you want it to crash.
* it mangles a bit with the connection pool to be able to capture the traces
All the docs related to the optimizer tracing is available here https://dev.mysql.com/doc/internals/en/optimizer-tracing.html
The generated file contains one of the following:
* ``{"EmptyTrace": arguments}``: some method like "show tables" do not generate a trace
* A list of dictionaries, one per trace for the specific call:
* ``{ "Query": <query executed>, "Trace" : <optimizer analysis>}`` if all is fine
* ``{"Error": <the error>}`` in case something goes wrong. See the lower in the code
for the description of errors
"""
optimizerTracingFolder = os.environ.get("DIRAC_MYSQL_OPTIMIZER_TRACES_PATH")
@functools.wraps(meth)
def innerMethod(self, *args, **kwargs):
# First, get a connection to the DB, and enable the tracing
connection = self._MySQL__connectionPool.get(self._MySQL__dbName)["Value"]
connection.cursor().execute('SET optimizer_trace="enabled=on";')
# We also set some options that worked for my use case.
# you may need to tune these parameters if you have huge traces
# or more recursive calls.
# I doubt it though....
connection.cursor().execute(
"SET optimizer_trace_offset=0, optimizer_trace_limit=20, optimizer_trace_max_mem_size=131072;"
)
# Because we can only trace on a per session base, give the same connection object
# to the actual method
kwargs["conn"] = connection
# Execute the method
res = meth(self, *args, **kwargs)
# Turn of the tracing
connection.cursor().execute('SET optimizer_trace="enabled=off";')
# Get all the traces
cursor = connection.cursor()
if cursor.execute("SELECT * FROM INFORMATION_SCHEMA.OPTIMIZER_TRACE;"):
queryTraces = cursor.fetchall()
else:
queryTraces = ()
# Generate a filename stored in DIRAC_MYSQL_OPTIMIZER_TRACES_PATH
methHash = hash(f"{args},{kwargs}")
# optimizer_trace_<timestamp>_<hash>.json
jsonFn = os.path.join(optimizerTracingFolder, f"optimizer_trace_{time.time()}_{methHash}.json")
with open(jsonFn, "w") as f:
# Some calls do not generate a trace, like "show tables"
if not queryTraces:
json.dump({"EmptyTrace": args}, f)
jsonTraces = []
for trace_query, trace_analysis, trace_missingBytes, trace_privilegeError in queryTraces:
# if trace_privilegeError is True, it's a permission error
# https://dev.mysql.com/doc/internals/en/privilege-checking.html
# It may particularly happen with stored procedures.
# Although it is not really a good practice, for profiling purposes,
# I would go with the no brainer solution: GRANT ALL ON <yourDB>.* TO 'Dirac'@'%';
if trace_privilegeError:
# f.write(f"ERROR: {args}")
jsonTraces.append({"Error": f"PrivilegeError: {args}"})
continue
# The memory is not large enough to store all the traces, so it is truncated
# https://dev.mysql.com/doc/internals/en/tracing-memory-usage.html
if trace_missingBytes:
jsonTraces.append({"Error": f"MissingBytes {trace_missingBytes}"})
continue
jsonTraces.append(
{"Query": trace_query, "Trace": json.loads(trace_analysis) if trace_analysis else None}
)
json.dump(jsonTraces, f, indent=True)
return res
if optimizerTracingFolder:
return innerMethod
else:
return meth
class ConnectionPool:
"""
Management of connections per thread
"""
def __init__(self, host, user, passwd, port=3306):
self.__host = host
self.__user = user
self.__passwd = passwd
self.__port = port
self.__spares = collections.deque()
self.__maxSpares = 10
self.__lastClean = 0
self.__assigned = {}
@property
def __thid(self):
return threading.current_thread()
def __newConn(self):
conn = MySQLdb.connect(host=self.__host, port=self.__port, user=self.__user, passwd=self.__passwd)
self.__execute(conn, "SET AUTOCOMMIT=1")
return conn
def __execute(self, conn, cmd):
cursor = conn.cursor()
res = cursor.execute(cmd)
conn.commit()
cursor.close()
return res
def get(self, dbName, retries=10):
retries = max(0, min(MAXCONNECTRETRY, retries))
self.clean()
return self.__getWithRetry(dbName, retries, retries)
def __getWithRetry(self, dbName, totalRetries, retriesLeft):
sleepTime = RETRY_SLEEP_DURATION * (totalRetries - retriesLeft)
if sleepTime > 0:
time.sleep(sleepTime)
try:
conn, lastName, thid = self.__innerGet()
except MySQLdb.MySQLError as excp:
if retriesLeft > 0:
return self.__getWithRetry(dbName, totalRetries, retriesLeft - 1)
return S_ERROR(DErrno.EMYSQL, f"Could not connect: {excp}")
if not self.__ping(conn):
try:
self.__assigned.pop(thid)
except KeyError:
pass
if retriesLeft > 0:
return self.__getWithRetry(dbName, totalRetries, retriesLeft)
return S_ERROR(DErrno.EMYSQL, "Could not connect")
if lastName != dbName:
try:
conn.select_db(dbName)
except MySQLdb.MySQLError as excp:
if retriesLeft > 0:
return self.__getWithRetry(dbName, totalRetries, retriesLeft - 1)
return S_ERROR(DErrno.EMYSQL, f"Could not select db {dbName}: {excp}")
try:
self.__assigned[thid][1] = dbName
except KeyError:
if retriesLeft > 0:
return self.__getWithRetry(dbName, totalRetries, retriesLeft - 1)
return S_ERROR(DErrno.EMYSQL, "Could not connect")
return S_OK(conn)
def __ping(self, conn):
try:
conn.ping()
return True
except Exception:
return False
def __innerGet(self):
thid = self.__thid
now = time.time()
if thid in self.__assigned:
data = self.__assigned[thid]
conn = data[0]
data[2] = now
return data[0], data[1], thid
# Not cached
try:
conn, dbName = self.__spares.pop()
except IndexError:
conn = self.__newConn()
dbName = ""
self.__assigned[thid] = [conn, dbName, now]
return conn, dbName, thid
def __pop(self, thid):
try:
data = self.__assigned.pop(thid)
if len(self.__spares) < self.__maxSpares:
self.__spares.append((data[0], data[1]))
else:
try:
data[0].close()
except MySQLdb.ProgrammingError as exc:
gLogger.warn(f"ProgrammingError exception while closing MySQL connection: {exc}")
except Exception as exc:
gLogger.warn(f"Exception while closing MySQL connection: {exc}")
except KeyError:
pass
def clean(self, now=False):
if not now:
now = time.time()
self.__lastClean = now
for thid in list(self.__assigned):
if not thid.is_alive():
self.__pop(thid)
def transactionStart(self, dbName):
result = self.get(dbName)
if not result["OK"]:
return result
conn = result["Value"]
try:
return S_OK(self.__execute(conn, "START TRANSACTION WITH CONSISTENT SNAPSHOT"))
except MySQLdb.MySQLError as excp:
return S_ERROR(DErrno.EMYSQL, f"Could not begin transaction: {excp}")
def transactionCommit(self, dbName):
result = self.get(dbName)
if not result["OK"]:
return result
conn = result["Value"]
try:
result = self.__execute(conn, "COMMIT")
return S_OK(result)
except MySQLdb.MySQLError as excp:
return S_ERROR(DErrno.EMYSQL, f"Could not commit transaction: {excp}")
def transactionRollback(self, dbName):
result = self.get(dbName)
if not result["OK"]:
return result
conn = result["Value"]
try:
result = self.__execute(conn, "ROLLBACK")
return S_OK(result)
except MySQLdb.MySQLError as excp:
return S_ERROR(DErrno.EMYSQL, f"Could not rollback transaction: {excp}")
class MySQL:
"""
Basic multithreaded DIRAC MySQL Client Class
"""
__initialized = False
__connectionPools = {}
def __init__(self, hostName="localhost", userName="dirac", passwd="dirac", dbName="", port=3306, debug=False):
"""
set MySQL connection parameters and try to connect
:param debug: unused
"""
global gInstancesCount
gInstancesCount += 1
self._connected = False
if "log" not in dir(self):
self.log = gLogger.getSubLogger(self.__class__.__name__)
self.logger = self.log
# let the derived class decide what to do with if is not 1
self._threadsafe = MySQLdb.threadsafety
# self.log.debug('thread_safe = %s' % self._threadsafe)
self.__hostName = str(hostName)
self.__userName = str(userName)
self.__passwd = str(passwd)
self.__dbName = str(dbName)
self.__port = port
cKey = (self.__hostName, self.__userName, self.__passwd, self.__port)
if cKey not in MySQL.__connectionPools:
MySQL.__connectionPools[cKey] = ConnectionPool(*cKey)
self.__connectionPool = MySQL.__connectionPools[cKey]
self.__initialized = True
result = self._connect()
if not result["OK"]:
gLogger.error("Cannot connect to the DB", f" {result['Message']}")
def __del__(self):
global gInstancesCount
try:
gInstancesCount -= 1
except Exception:
pass
def _except(self, methodName, x, err, cmd="", debug=True):
"""
print MySQL error or exception
return S_ERROR with Exception
"""
try:
raise x
except MySQLdb.Error as e:
if debug:
self.log.error(f"{methodName} ({self._safeCmd(cmd)}): {err}", "%d: %s" % (e.args[0], e.args[1]))
return S_ERROR(DErrno.EMYSQL, "%s: ( %d: %s )" % (err, e.args[0], e.args[1]))
except Exception as e:
if debug:
self.log.error(f"{methodName} ({self._safeCmd(cmd)}): {err}", repr(e))
return S_ERROR(DErrno.EMYSQL, f"{err}: ({repr(e)})")
def __isDateTime(self, dateString):
if dateString == "UTC_TIMESTAMP()":
return True
try:
dtime = dateString.replace('"', "").replace("'", "")
dtime = fromString(dtime)
if dtime is None:
return False
return True
except Exception:
return False
def __escapeString(self, myString, connection=None):
"""
To be used for escaping any MySQL string before passing it to the DB
this should prevent passing non-MySQL accepted characters to the DB
It also includes quotation marks " around the given string
"""
if connection is None:
retDict = self._getConnection()
if not retDict["OK"]:
return retDict
connection = retDict["Value"]
if isinstance(myString, bytes):
myString = myString.decode()
try:
myString = str(myString)
except ValueError:
return S_ERROR(DErrno.EMYSQL, "Cannot escape value!")
timeUnits = ["MICROSECOND", "SECOND", "MINUTE", "HOUR", "DAY", "WEEK", "MONTH", "QUARTER", "YEAR"]
try:
# Check datetime functions first
if myString.strip() == "UTC_TIMESTAMP()":
return S_OK(myString)
for func in ["TIMESTAMPDIFF", "TIMESTAMPADD"]:
if myString.strip().startswith(f"{func}(") and myString.strip().endswith(")"):
args = myString.strip()[:-1].replace(f"{func}(", "").strip().split(",")
arg1, arg2, arg3 = (x.strip() for x in args)
if arg1 in timeUnits:
if self.__isDateTime(arg2) or arg2.isalnum():
if self.__isDateTime(arg3) or arg3.isalnum():
return S_OK(myString)
# self.log.debug('__escape_string: Could not escape string', '"%s"' % myString)
return S_ERROR(DErrno.EMYSQL, "__escape_string: Could not escape string")
escape_string = connection.escape_string(myString.encode()).decode()
# self.log.debug('__escape_string: returns', '"%s"' % escape_string)
return S_OK(f'"{escape_string}"')
except Exception as x:
return self._except("__escape_string", x, "Could not escape string", myString)
def __checkTable(self, tableName, force=False):
"""Check if a table exists by issuing 'SHOW TABLES'
:param str tableName: table name in check
:param bool force: force or not the re-creation (would drop the previous one)
:returns: S_OK/S_ERROR
"""
table = _quotedList([tableName])
if not table:
return S_ERROR(DErrno.EMYSQL, "Invalid tableName argument")
cmd = "SHOW TABLES"
retDict = self._query(cmd)
if not retDict["OK"]:
return retDict
if (tableName,) in retDict["Value"]:
if not force:
# the requested exist and table creation is not force, return with error
return S_ERROR(DErrno.EMYSQL, "The requested table already exist")
else:
cmd = f"DROP TABLE {table}"
retDict = self._update(cmd)
if not retDict["OK"]:
return retDict
return S_OK()
def _escapeString(self, myString, conn=None):
"""
Wrapper around the internal method __escapeString
"""
# self.log.debug('_escapeString:', '"%s"' % str(myString))
return self.__escapeString(myString)
def _escapeValues(self, inValues=None):
"""
Escapes all strings in the list of values provided
"""
# self.log.debug('_escapeValues:', inValues)
retDict = self._getConnection()
if not retDict["OK"]:
return retDict
connection = retDict["Value"]
inEscapeValues = []
if not inValues:
return S_OK(inEscapeValues)
for value in inValues:
if isinstance(value, str):
retDict = self.__escapeString(value, connection=connection)
if not retDict["OK"]:
return retDict
inEscapeValues.append(retDict["Value"])
elif isinstance(value, (tuple, list)):
tupleValues = []
for val in value:
retDict = self.__escapeString(val, connection=connection)
if not retDict["OK"]:
return retDict
tupleValues.append(retDict["Value"])
inEscapeValues.append("(" + ", ".join(tupleValues) + ")")
elif isinstance(value, bool):
inEscapeValues = [str(value)]
else:
if isinstance(value, bytes):
value = value.decode()
retDict = self.__escapeString(str(value), connection=connection)
if not retDict["OK"]:
return retDict
inEscapeValues.append(retDict["Value"])
return S_OK(inEscapeValues)
def _safeCmd(self, command):
"""Just replaces password, if visible, with *********"""
return command.replace(self.__passwd, "**********")
def _connect(self):
"""
open connection to MySQL DB and put Connection into Queue
set connected flag to True and return S_OK
return S_ERROR upon failure
"""
if not self.__initialized:
error = "DB not properly initialized"
gLogger.error(error)
return S_ERROR(DErrno.EMYSQL, error)
# self.log.debug('_connect:', self._connected)
if self._connected:
return S_OK()
# Test the connection to the DB
retDict = self._getConnection()
if not retDict["OK"]:
return retDict
self._connected = True
return S_OK()
@captureOptimizerTraces
def _query(self, cmd, *, conn=None, debug=True):
"""
execute MySQL query command
:param debug: print or not the errors
return S_OK structure with fetchall result as tuple
it returns an empty tuple if no matching rows are found
return S_ERROR upon error
"""
self.log.debug(f"_query: {self._safeCmd(cmd)}")
if conn:
connection = conn
else:
retDict = self._getConnection()
if not retDict["OK"]:
return retDict
connection = retDict["Value"]
try:
cursor = connection.cursor()
if cursor.execute(cmd):
res = cursor.fetchall()
else:
res = ()
# Log the result limiting it to just 10 records
# if len(res) <= 10:
# self.log.debug('_query: returns', res)
# else:
# self.log.debug('_query: Total %d records returned' % len(res))
# self.log.debug('_query: %s ...' % str(res[:10]))
retDict = S_OK(res)
except Exception as x:
# self.log.debug('_query: %s' % self._safeCmd(cmd))
retDict = self._except("_query", x, "Execution failed.", cmd, debug)
try:
cursor.close()
except Exception:
pass
return retDict
@captureOptimizerTraces
def _update(self, cmd, *, args=None, conn=None, debug=True):
"""execute MySQL update command
:param args: parameters passed to cursor.execute(..., args=args) method.
:param conn: connection object.
:param debug: print or not the errors
:return: S_OK with number of updated registers upon success.
S_ERROR upon error.
lastRowId: if set, added to the returned dictionary
"""
self.log.debug(f"_update: {self._safeCmd(cmd)}")
if conn:
connection = conn
else:
retDict = self._getConnection()
if not retDict["OK"]:
return retDict
connection = retDict["Value"]
try:
cursor = connection.cursor()
res = cursor.execute(cmd, args=args)
retDict = S_OK(res)
if cursor.lastrowid:
retDict["lastRowId"] = cursor.lastrowid
except Exception as x:
retDict = self._except("_update", x, "Execution failed.", cmd, debug)
try:
cursor.close()
except Exception:
pass
return retDict
@captureOptimizerTraces
def _updatemany(self, cmd, data, *, conn=None, debug=True):
"""execute MySQL updatemany command
:param debug: print or not the errors
:return: S_OK with number of updated registers upon success.
S_ERROR upon error.
"""
self.log.debug(f"_updatemany: {self._safeCmd(cmd)}")
if conn:
connection = conn
else:
retDict = self._getConnection()
if not retDict["OK"]:
return retDict
connection = retDict["Value"]
try:
cursor = connection.cursor()
res = cursor.executemany(cmd, data)
retDict = S_OK(res)
if cursor.lastrowid:
retDict["lastRowId"] = cursor.lastrowid
except Exception as x:
retDict = self._except("_updatemany", x, "Execution failed.", cmd, debug)
try:
cursor.close()
except Exception:
pass
return retDict
def _transaction(self, cmdList, conn=None):
"""dummy transaction support
:param self: self reference
:param list cmdList: list of queries to be executed within the transaction
:param MySQLDB.Connection conn: connection
:return: S_OK( [ ( cmd1, ret1 ), ... ] ) or S_ERROR
"""
if not isinstance(cmdList, list):
return S_ERROR(DErrno.EMYSQL, f"_transaction: wrong type ({type(cmdList)}) for cmdList")
# # get connection
connection = conn
if not connection:
retDict = self._getConnection()
if not retDict["OK"]:
return retDict
connection = retDict["Value"]
# # list with cmds and their results
cmdRet = []
try:
cursor = connection.cursor()
for cmd in cmdList:
cmdRet.append((cmd, cursor.execute(cmd)))
connection.commit()
except Exception as error:
self.logger.exception(error)
# # rollback, put back connection to the pool
connection.rollback()
return S_ERROR(DErrno.EMYSQL, error)
# # close cursor, put back connection to the pool
cursor.close()
return S_OK(cmdRet)
def _createViews(self, viewsDict, force=False):
"""create view based on query
:param dict viewDict: { 'ViewName': "Fields" : { "`a`": `tblA.a`, "`sumB`" : "SUM(`tblB.b`)" }
"SelectFrom" : "tblA join tblB on tblA.id = tblB.id",
"Clauses" : [ "`tblA.a` > 10", "`tblB.Status` = 'foo'" ] ## WILL USE AND CLAUSE
"GroupBy": [ "`a`" ],
"OrderBy": [ "`b` DESC" ] }
"""
if force:
# gLogger.debug(viewsDict)
for viewName, viewDict in viewsDict.items():
viewQuery = [f"CREATE OR REPLACE VIEW `{self.__dbName}`.`{viewName}` AS"]
columns = ",".join([f"{colDef} AS {colName}" for colName, colDef in viewDict.get("Fields", {}).items()])
tables = viewDict.get("SelectFrom", "")
if columns and tables:
viewQuery.append(f"SELECT {columns} FROM {tables}")
where = " AND ".join(viewDict.get("Clauses", []))
if where:
viewQuery.append(f"WHERE {where}")
groupBy = ",".join(viewDict.get("GroupBy", []))
if groupBy:
viewQuery.append(f"GROUP BY {groupBy}")
orderBy = ",".join(viewDict.get("OrderBy", []))
if orderBy:
viewQuery.append(f"ORDER BY {orderBy}")
viewQuery.append(";")
viewQuery = " ".join(viewQuery)
# self.log.debug("`%s` VIEW QUERY IS: %s" % (viewName, viewQuery))
createView = self._query(viewQuery)
if not createView["OK"]:
self.log.error("Can not create view", createView["Message"])
return createView
return S_OK()
def _parseForeignKeyReference(self, auxTable, defaultKey):
"""
Parse foreign key reference in format 'Table' or 'Table.key'
:param str auxTable: Foreign key reference (e.g., 'MyTable' or 'MyTable.id')
:param str defaultKey: Default key name if not specified in auxTable
:return: tuple (table_name, key_name)
"""
if "." in auxTable:
parts = auxTable.split(".", 1)
if len(parts) != 2:
raise ValueError(f"Invalid foreign key reference format: {auxTable}")
return parts[0], parts[1]
return auxTable, defaultKey
def _createTables(self, tableDict, force=False):
"""
tableDict:
tableName: { 'Fields' : { 'Field': 'Description' },
'ForeignKeys': {'Field': 'Table.key' },
'PrimaryKey': 'Id',
'Indexes': { 'Index': [] },
'UniqueIndexes': { 'Index': [] },
'Engine': 'InnoDB' }
only 'Fields' is a mandatory key.
Creates a new Table for each key in tableDict, "tableName" in the DB with
the provided description.
It allows to create:
- flat tables if no "ForeignKeys" key defined.
- tables with foreign keys to auxiliary tables holding the values
of some of the fields
Arguments:
tableDict: dictionary of dictionary with description of tables to be created.
Only "Fields" is a mandatory key in the table description.
"Fields": Dictionary with Field names and description of the fields
"ForeignKeys": Dictionary with Field names and name of auxiliary tables.
The auxiliary tables must be defined in tableDict.
"PrimaryKey": Name of PRIMARY KEY for the table (if exist).
"Indexes": Dictionary with definition of indexes, the value for each
index is the list of fields to be indexed.
"UniqueIndexes": Dictionary with definition of indexes, the value for each
index is the list of fields to be indexed. This indexes will declared
unique.
"Engine": use the given DB engine, InnoDB is the default if not present.
"Charset": use the given character set. Default is utf8mb4
force:
if True, requested tables are DROP if they exist.
if False, returned with S_ERROR if table exist.
"""
# First check consistency of request
if not isinstance(tableDict, dict):
return S_ERROR(DErrno.EMYSQL, f"Argument is not a dictionary: {type(tableDict)}( {tableDict} )")
tableList = list(tableDict)
if len(tableList) == 0:
return S_OK(0)
for table in tableList:
thisTable = tableDict[table]
# Check if Table is properly described with a dictionary
if not isinstance(thisTable, dict):
return S_ERROR(
DErrno.EMYSQL, f"Table description is not a dictionary: {type(thisTable)}( {thisTable} )"
)
if "Fields" not in thisTable:
return S_ERROR(DErrno.EMYSQL, f"Missing `Fields` key in `{table}` table dictionary")
# Build dependency-ordered list of tables to create
# Tables with foreign keys must be created after their referenced tables
tableCreationList = []
auxiliaryTableList = []
# Get list of existing tables in the database to handle migrations
existingTablesResult = self._query("SHOW TABLES")
if not existingTablesResult["OK"]:
return existingTablesResult
existingTables = [t[0] for t in existingTablesResult["Value"]]
extracted = True
while tableList and extracted:
# iterate extracting tables from list if they only depend on
# already extracted tables.
extracted = False
currentLevelTables = []
for table in list(tableList):
toBeExtracted = True
thisTable = tableDict[table]
if "ForeignKeys" in thisTable:
thisKeys = thisTable["ForeignKeys"]
for key, auxTable in thisKeys.items():
try:
forTable, forKey = self._parseForeignKeyReference(auxTable, key)