forked from xiaokun567/office365
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathuser_creator.py
More file actions
341 lines (292 loc) · 14.6 KB
/
user_creator.py
File metadata and controls
341 lines (292 loc) · 14.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
import requests
import json
from typing import Dict, Optional
class UserCreator:
"""Office 365 用户创建器"""
def __init__(self, config_manager):
self.config_manager = config_manager
def _assign_license(self, base_url: str, headers: dict, cookies: dict,
object_id: str, subscription: dict) -> Dict:
"""为用户分配许可证"""
try:
# 从订阅数据中获取可用的许可证
subscription_data = subscription.get('subscription_data', {})
# 查找有可用许可证的产品
available_licenses = []
for product in subscription_data.get('Skus', []):
sku_id = product.get('SkuId')
available = product.get('Available', 0)
if sku_id and available > 0:
available_licenses.append({
'SkuId': sku_id,
'SkuPartNumber': product.get('SkuPartNumber', ''),
'Available': available
})
if not available_licenses:
return {
'success': False,
'message': '没有可用的许可证'
}
# 使用第一个可用的许可证
license_to_assign = available_licenses[0]
print(f"[分配许可证] 使用许可证: {license_to_assign['SkuPartNumber']} (SKU: {license_to_assign['SkuId']})")
# 构建分配许可证的 API URL
assign_license_url = f"{base_url}/admin/api/users/{object_id}/assignlicense"
# 构建请求体
payload = {
"AddLicenses": [{
"SkuId": license_to_assign['SkuId'],
"DisabledServicePlans": []
}],
"RemoveLicenses": []
}
print(f"[分配许可证] API URL: {assign_license_url}")
print(f"[分配许可证] Payload: {json.dumps(payload, ensure_ascii=False)}")
# 发送请求
response = requests.post(
assign_license_url,
headers=headers,
cookies=cookies,
json=payload,
timeout=30
)
print(f"[分配许可证] 响应状态码: {response.status_code}")
print(f"[分配许可证] 响应内容: {response.text[:500]}")
if response.status_code in [200, 201, 204]:
return {
'success': True,
'message': f'已分配许可证: {license_to_assign["SkuPartNumber"]}'
}
else:
return {
'success': False,
'message': f'分配许可证失败 (状态码: {response.status_code})'
}
except Exception as e:
print(f"[分配许可证] 错误: {str(e)}")
return {
'success': False,
'message': f'分配许可证时出错: {str(e)}'
}
def create_user(self, subscription_id: str, username: str, password: str) -> Dict:
"""创建 Office 365 用户"""
subscription = self.config_manager.get_subscription(subscription_id)
if not subscription:
return {
'success': False,
'error': '订阅不存在'
}
# 检查是否配置了用户创建配置
user_create_config = subscription.get('user_create_config')
if not user_create_config:
return {
'success': False,
'error': '该订阅未配置用户创建功能,请在设置中添加用户创建配置'
}
try:
# 使用用户创建配置中的信息
headers = user_create_config['headers'].copy()
headers['content-type'] = 'application/json'
cookies_str = user_create_config['cookies']
# 如果用户创建配置的 cookie 为空,尝试使用订阅配置的 cookie
if not cookies_str or cookies_str.strip() == '':
cookies_str = subscription.get('cookies', '')
if cookies_str:
print(f"[创建用户] 用户创建配置的 cookie 为空,使用订阅配置的 cookie")
else:
return {
'success': False,
'error': '未找到有效的 Cookie 配置,请检查订阅配置或用户创建配置'
}
# 将 cookies 字符串转换为字典
cookies = {}
if cookies_str:
for cookie in cookies_str.split('; '):
if '=' in cookie:
key, value = cookie.split('=', 1)
cookies[key] = value
# 使用用户创建配置中的 API URL
create_user_url = user_create_config['api_url']
# 提取基础 URL
parts = create_user_url.split('/')
base_url = f"{parts[0]}//{parts[2]}"
print(f"[创建用户] API URL: {create_user_url}")
# 从用户创建配置的原始 curl 命令中提取请求体
user_create_curl = subscription.get('user_create_curl', '')
# 提取 --data-raw 后的 JSON 数据
import re
# 尝试多种模式匹配
data_match = re.search(r'--data-raw\s+\$?\'(.+?)\'(?:\s|$)', user_create_curl, re.DOTALL)
if not data_match:
data_match = re.search(r'--data-raw\s+\$?"(.+?)"(?:\s|$)', user_create_curl, re.DOTALL)
if not data_match:
data_match = re.search(r'--data-raw\s+(.+?)(?:\s+-H|\s+$)', user_create_curl, re.DOTALL)
if not data_match:
print(f"[创建用户] 错误: 无法从 curl 命令中提取请求体")
print(f"[创建用户] curl 命令长度: {len(user_create_curl)}")
return {
'success': False,
'error': '无法从用户创建配置中提取请求体数据'
}
# 解析原始请求体
raw_data = data_match.group(1).strip()
print(f"[创建用户] 提取到的数据长度: {len(raw_data)}")
# 处理转义字符 - 将 \r\n 等转义序列替换为实际字符
# 但保留 JSON 中的合法转义(如 \")
try:
# 先尝试直接解析
template_data = json.loads(raw_data)
print(f"[创建用户] 成功解析模板数据")
except json.JSONDecodeError as e:
# 如果失败,尝试处理转义字符
print(f"[创建用户] 首次解析失败,尝试处理转义字符")
try:
# 使用 Python 的字符串字面量解析来处理转义
# 将字符串用引号包裹后用 ast.literal_eval 解析
import codecs
decoded_data = codecs.decode(raw_data, 'unicode_escape')
template_data = json.loads(decoded_data)
print(f"[创建用户] 处理转义后成功解析")
except Exception as e2:
print(f"[创建用户] 错误: 解析 JSON 失败 - {str(e)}")
print(f"[创建用户] 原始数据前200字符: {raw_data[:200]}")
print(f"[创建用户] 错误位置附近: {raw_data[max(0, e.pos-50):min(len(raw_data), e.pos+50)]}")
return {
'success': False,
'error': f'用户创建配置中的请求体格式不正确: {str(e)}'
}
# 从模板中提取域名
template_upn = template_data.get('UserPrincipalName', '')
if '@' in template_upn:
domain = template_upn.split('@')[1]
else:
domain = "dfem.net" # 默认域名
user_principal_name = f"{username}@{domain}"
print(f"[创建用户] 用户邮箱: {user_principal_name}")
print(f"[创建用户] 使用域名: {domain}")
# 使用模板中的 Products 和 AdminRoles
products = template_data.get('Products', [])
admin_roles = template_data.get('AdminRoles', [])
# 构建请求体 - 使用模板结构
payload = {
"FirstName": template_data.get('FirstName', ''),
"JobTitle": template_data.get('JobTitle', ''),
"LastName": template_data.get('LastName', ''),
"DisplayName": username,
"UserPrincipalName": user_principal_name,
"Office": template_data.get('Office', ''),
"OfficePhone": template_data.get('OfficePhone', ''),
"MobilePhone": template_data.get('MobilePhone', ''),
"FaxNumber": template_data.get('FaxNumber', ''),
"City": template_data.get('City', ''),
"CountryRegion": template_data.get('CountryRegion', ''),
"StateProvince": template_data.get('StateProvince', ''),
"Department": template_data.get('Department', ''),
"StreetAddress": template_data.get('StreetAddress', ''),
"ZipOrPostalCode": template_data.get('ZipOrPostalCode', ''),
"ForceChangePassword": True,
"SendPasswordEmail": False,
"Password": password,
"AdminRoles": admin_roles,
"UsageLocation": template_data.get('UsageLocation', 'CN'),
"Products": products,
"CreateUserWithNoLicense": template_data.get('CreateUserWithNoLicense', False)
}
if products:
product_names = [p.get('SkuPartNumber', p.get('ProductSkuId', '')) for p in products if isinstance(p, dict)]
print(f"[创建用户] 将分配许可证: {product_names}")
print(f"[创建用户] 发送请求...")
print(f"[创建用户] Payload: {json.dumps(payload, ensure_ascii=False)[:200]}...")
# 发送请求
response = requests.post(
create_user_url,
headers=headers,
cookies=cookies,
json=payload,
timeout=30
)
print(f"[创建用户] 响应状态码: {response.status_code}")
print(f"[创建用户] 响应内容: {response.text[:500]}")
# 检查响应状态
if response.status_code == 401 or response.status_code == 403:
return {
'success': False,
'error': 'auth_failure',
'message': '认证失败,Cookie 可能已过期'
}
# 200 和 201 都是成功状态码
if response.status_code not in [200, 201]:
try:
error_data = response.json()
error_message = error_data.get('Message', '') or f"Code: {error_data.get('Code', 'unknown')}"
return {
'success': False,
'error': 'api_error',
'message': f'API 返回错误状态码: {response.status_code}',
'details': error_message,
'response_data': error_data
}
except:
return {
'success': False,
'error': 'api_error',
'message': f'API 返回错误状态码: {response.status_code}',
'details': response.text[:500]
}
# 解析响应
data = response.json()
print(f"[创建用户] 响应数据: {json.dumps(data, ensure_ascii=False)}")
# 检查是否创建成功
if data.get('Status') == 0: # 0 表示成功
user_info = data.get('UserInfo', {})
object_id = user_info.get('ObjectId', '')
# 提取许可证名称
license_names = []
if products:
for p in products:
if isinstance(p, dict):
license_names.append(p.get('SkuPartNumber', p.get('ProductSkuId', '')))
return {
'success': True,
'data': {
'username': username,
'user_principal_name': user_principal_name,
'display_name': user_info.get('DisplayName', username),
'object_id': object_id,
'password': password,
'licenses': license_names,
'licenses_info': user_info.get('Licenses', '')
}
}
else:
# Status != 0 表示失败
error_code = data.get('Code', 'unknown')
error_message = data.get('Message', '') or f'错误代码: {error_code}'
# 406 通常表示请求参数问题
if error_code == '406':
error_message = '请求参数不正确或缺少必要信息(错误代码 406)'
return {
'success': False,
'error': 'creation_failed',
'message': error_message,
'code': error_code,
'status': data.get('Status')
}
except requests.exceptions.Timeout:
return {
'success': False,
'error': 'timeout',
'message': '请求超时'
}
except requests.exceptions.RequestException as e:
return {
'success': False,
'error': 'network_error',
'message': f'网络错误: {str(e)}'
}
except Exception as e:
return {
'success': False,
'error': 'unknown_error',
'message': f'未知错误: {str(e)}'
}