-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathweixin_mp.py
More file actions
139 lines (116 loc) · 4.76 KB
/
weixin_mp.py
File metadata and controls
139 lines (116 loc) · 4.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from typing import Any
from urllib.parse import urlencode
import httpx
from fastapi_oauth20.errors import AccessTokenError, GetUserInfoError, RefreshTokenError
from fastapi_oauth20.oauth20 import OAuth20Base
class WeChatMpOAuth20(OAuth20Base):
"""WeChat public platform OAuth2 client implementation."""
def __init__(self, client_id: str, client_secret: str):
"""
Initialize WeChat public platform OAuth2 client.
:param client_id: AppID from the WeChat public platform developer console.
:param client_secret: AppSecret from the WeChat public platform developer console.
:return:
"""
super().__init__(
client_id=client_id,
client_secret=client_secret,
authorize_endpoint='https://open.weixin.qq.com/connect/oauth2/authorize',
access_token_endpoint='https://api.weixin.qq.com/sns/oauth2/access_token',
refresh_token_endpoint='https://api.weixin.qq.com/sns/oauth2/refresh_token',
userinfo_endpoint='https://api.weixin.qq.com/sns/userinfo',
default_scopes=['snsapi_userinfo'],
)
async def get_authorization_url(
self,
redirect_uri: str,
state: str | None = None,
scope: list[str] | None = None,
**kwargs,
) -> str: # ty:ignore[invalid-method-override]
"""
Generate WeChat OAuth2 authorization URL.
:param redirect_uri: The URL where WeChat will redirect after authorization.
:param state: An opaque value used to maintain state between request and callback.
:param scope: The list of OAuth scopes to request. Default is ['snsapi_userinfo'].
:param kwargs: Additional query parameters.
:return:
"""
params = {
'appid': self.client_id,
'redirect_uri': redirect_uri,
'response_type': 'code',
}
if state is not None:
params['state'] = state
_scope = scope or self.default_scopes
if _scope is not None:
params['scope'] = ','.join(_scope)
if kwargs:
params.update(kwargs)
return f'{self.authorize_endpoint}?{urlencode(params)}#wechat_redirect'
async def get_access_token(self, code: str) -> dict[str, Any]: # ty:ignore[invalid-method-override]
"""
Exchange authorization code for access token using WeChat's GET method.
:param code: The authorization code received from WeChat callback.
:return:
"""
params = {
'appid': self.client_id,
'secret': self.client_secret,
'code': code,
'grant_type': 'authorization_code',
}
async with httpx.AsyncClient() as client:
response = await client.get(
self.access_token_endpoint,
params=params,
headers=self.request_headers,
)
self.raise_httpx_oauth20_errors(response)
result = self.get_json_result(response, err_class=AccessTokenError)
return result
async def refresh_token(self, refresh_token: str) -> dict[str, Any]:
"""
Refresh access token using WeChat's GET method.
:param refresh_token: The refresh token received from initial token exchange.
:return:
"""
if self.refresh_token_endpoint is None:
raise RefreshTokenError('The refresh token address is missing')
params = {
'appid': self.client_id,
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
}
async with httpx.AsyncClient() as client:
response = await client.get(
self.refresh_token_endpoint,
params=params,
headers=self.request_headers,
)
self.raise_httpx_oauth20_errors(response)
result = self.get_json_result(response, err_class=RefreshTokenError)
return result
async def get_userinfo(self, access_token: str, openid: str | None = None) -> dict[str, Any]:
"""
Retrieve user information from WeChat API.
:param access_token: Valid WeChat access token.
:param openid: User's OpenID.
:return:
"""
if openid is None:
raise GetUserInfoError('openid is required')
params = {
'access_token': access_token,
'openid': openid,
'lang': 'zh_CN',
}
async with httpx.AsyncClient() as client:
response = await client.get(
self.userinfo_endpoint,
params=params,
)
self.raise_httpx_oauth20_errors(response)
result = self.get_json_result(response, err_class=GetUserInfoError)
return result