This repository was archived by the owner on Sep 29, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathutils.py
More file actions
284 lines (211 loc) · 7.64 KB
/
utils.py
File metadata and controls
284 lines (211 loc) · 7.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# -*- coding: utf-8 -*-
from google.appengine.api import memcache, app_identity, mail
from google.appengine.ext import deferred
import new, os
from server import db
import string, random, base64
from server import conf
import logging
from itertools import izip
def generateRandomString( length=13 ):
"""
Return a string containing random characters of given *length*.
Its safe to use this string in URLs or HTML.
:type length: int
:param length: The desired length of the generated string.
:returns: A string with random characters of the given length.
:rtype: str
"""
return( ''.join( [
random.choice( string.ascii_lowercase + string.ascii_uppercase + string.digits )
for x in range( length ) ] ) )
def sendEMail(dests, name, skel, extraFiles=[], cc=None, bcc=None, replyTo=None, *args, **kwargs):
"""
General purpose function for sending e-mail.
This function allows for sending e-mails, also with generated content using the Jinja2 template engine.
:type dests: str | list of str
:param dests: Full-qualified recipient email addresses; These can be assigned as list, for multiple targets.
:type name: str
:param name: The name of a template from the appengine/emails directory, or the template string itself.
:type skel: server.skeleton.Skeleton | dict | None
:param skel: The data made available to the template. In case of a Skeleton, its parsed the usual way;\
Dictionaries are passed unchanged.
:type extraFiles: list of fileobjects
:param extraFiles: List of **open** fileobjects to be sent within the mail as attachments
:type cc: str | list of str
:param cc: Carbon-copy recipients
:type bcc: str | list of str
:param bcc: Blind carbon-copy recipients
:type replyTo: str
:param replyTo: A reply-to email address
"""
if conf["viur.emailRecipientOverride"]:
logging.warning("Overriding destination %s with %s", dests, conf["viur.emailRecipientOverride"])
oldDests = dests
if isinstance(oldDests, basestring):
oldDests = [oldDests]
newDests = conf["viur.emailRecipientOverride"]
if isinstance(newDests, basestring):
newDests = [newDests]
dests = []
for newDest in newDests:
if newDest.startswith("@"):
for oldDest in oldDests:
dests.append(oldDest.replace(".", "_dot_").replace("@", "_at_") + newDest)
else:
dests.append(newDest)
elif conf["viur.emailRecipientOverride"] is False:
logging.warning("Sending emails disabled by config[viur.emailRecipientOverride]")
return
handler = conf.get("viur.emailHandler")
if handler is None:
handler = _GAE_sendEMail
if not callable(handler):
logging.warning("Invalid emailHandler configured, no email will be sent.")
return False
logging.debug("CALLING %s" % str(handler))
return handler(dests, name, skel, extraFiles=extraFiles, cc=cc, bcc=bcc, replyTo=replyTo, *args, **kwargs)
def _GAE_sendEMail(dests, name, skel, extraFiles=[], cc=None, bcc=None, replyTo=None, *args, **kwargs):
"""
Internal function for using Google App Engine Email processing API.
"""
headers, data = conf["viur.emailRenderer"]( skel, name, dests,**kwargs )
xheader = {}
if "references" in headers:
xheader["References"] = headers["references"]
if "in-reply-to" in headers:
xheader["In-Reply-To"] = headers["in-reply-to"]
if xheader:
message = mail.EmailMessage(headers=xheader)
else:
message = mail.EmailMessage()
mailfrom = "viur@%s.appspotmail.com" % app_identity.get_application_id()
if "subject" in headers:
message.subject = "=?utf-8?B?%s?=" % base64.b64encode( headers["subject"].encode("UTF-8") )
else:
message.subject = "No Subject"
if "from" in headers:
mailfrom = headers["from"]
if conf["viur.emailSenderOverride"]:
mailfrom = conf["viur.emailSenderOverride"]
if isinstance( dests, list ):
message.to = ", ".join( dests )
else:
message.to = dests
if cc:
if isinstance( cc, list ):
message.cc = ", ".join( cc )
else:
message.cc = cc
if bcc:
if isinstance( bcc, list ):
message.bcc = ", ".join( bcc )
else:
message.bcc = bcc
if replyTo:
message.reply_to = replyTo
message.sender = mailfrom
message.html = data.replace("\x00","").encode('ascii', 'xmlcharrefreplace')
if len( extraFiles )> 0:
message.attachments = extraFiles
message.send()
return True
def sendEMailToAdmins( subject, body, sender=None ):
"""
Sends an e-mail to the appengine administration of the current app.
(all users having access to the applications dashboard)
:param subject: Defines the subject of the message.
:type subject: str
:param body: Defines the message body.
:type body: str
:param sender: (optional) specify a different sender
:type sender: str
"""
if not sender:
sender = "viur@%s.appspotmail.com" % app_identity.get_application_id()
mail.send_mail_to_admins( sender, "=?utf-8?B?%s?=" % base64.b64encode( subject.encode("UTF-8") ),
body.encode('ascii', 'xmlcharrefreplace') )
def getCurrentUser( ):
"""
Retrieve current user, if logged in.
If a user is logged in, this function returns a dict containing user data.
If no user is logged in, the function returns None.
:rtype: dict | bool
:returns: A dict containing information about the logged-in user, None if no user is logged in.
"""
user = None
if "user" in dir( conf["viur.mainApp"] ): #Check for our custom user-api
user = conf["viur.mainApp"].user.getCurrentUser()
return( user )
def markFileForDeletion( dlkey ):
"""
Adds a marker to the data store that the file specified as *dlkey* can be deleted.
Once the mark has been set, the data store is checked four times (default: every 4 hours)
if the file is in use somewhere. If it is still in use, the mark goes away, otherwise
the mark and the file are removed from the datastore. These delayed checks are necessary
due to database inconsistency.
:type dlkey: str
:param dlkey: Unique download-key of the file that shall be marked for deletion.
"""
fileObj = db.Query( "viur-deleted-files" ).filter( "dlkey", dlkey ).get()
if fileObj: #Its allready marked
return
fileObj = db.Entity( "viur-deleted-files" )
fileObj["itercount"] = 0
fileObj["dlkey"] = str( dlkey )
db.Put( fileObj )
def escapeString( val, maxLength=254 ):
"""
Quotes several characters and removes "\\\\n" and "\\\\0" to prevent XSS injection.
:param val: The value to be escaped.
:type val: str
:param maxLength: Cut-off after maxLength characters. A value of 0 means "unlimited".
:type maxLength: int
:returns: The quoted string.
:rtype: str
"""
val = unicode(val).strip() \
.replace("<", "<") \
.replace(">", ">") \
.replace("\"", """) \
.replace("'", "'") \
.replace("\n","") \
.replace("\0","")
if maxLength:
return( val[0:maxLength] )
return( val )
def safeStringComparison(s1, s2):
"""
Performs a string comparison in constant time.
This should prevent side-channel (timing) attacks
on passwords etc.
:param s1: First string to compare
:type s1: str | unicode
:param s2: Second string to compare
:type s2: str | unicode
:return: True if both strings are equal, False otherwise
:return type: bool
"""
isOkay = True
if type(s1) != type(s2):
isOkay = False # We have a unicode/str messup here
if len(s1) != len(s2):
isOkay = False
for x, y in izip(s1, s2):
if x != y:
isOkay = False
return isOkay
def normalizeKey( key ):
"""
Normalizes a datastore key (replacing _application with the current one)
:param key: Key to be normalized.
:return: Normalized key in string representation.
"""
if key is None:
return None
key = db.Key(encoded=str(key))
if key.parent():
parent = db.Key(encoded=normalizeKey(key.parent()))
else:
parent = None
return str(db.Key.from_path(key.kind(), key.id_or_name(), parent=parent))