Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ pip install fastapi_oauth20

[fastapi oauth20](https://fastapi-practices.github.io/fastapi-oauth20/)

## Demo

查看完整的示例项目:[fastapi-oauth20-demo](https://github.com/fastapi-practices/fastapi-oauth20-demo)

该示例项目展示了如何在实际应用中使用 fastapi-oauth20,包括:

- 多个 OAuth2 提供商的集成示例
- 完整的授权流程实现
- 用户信息获取和处理
- 错误处理最佳实践

## Sponsor

如果这个项目对你有帮助,欢迎[请作者喝杯咖啡](https://wu-clan.github.io/sponsor/) ☕
12 changes: 5 additions & 7 deletions docs/status.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
下面展示了我们的计划,如果你有更多需求,请在仓库内创建 Issues,我们将尽力完成所有目标
如果你有更多需求,请在仓库内创建 [Issues](https://github.com/fastapi-practices/fastapi-oauth20/issues)

## FINISHED

- [x] [LinuxDo](clients/linuxdo.md)
- [x] [GitHub](clients/github.md)
- [x] [Google](clients/google.md)
- [x] [LinuxDo](clients/linuxdo.md)
- [x] [Gitee](clients/gitee.md)
- [x] [开源中国](clients/oschina.md)
- [x] [飞书](clients/feishu.md)
- [x] [Google](clients/google.md)
- [x] [微信小程序](clients/wechat_open.md)
- [x] [微信开放平台](clients/wechat_mp.md)

## TODO

- [ ] [微信小程序](clients/wechat_open.md)
- [ ] [微信开放平台](clients/wechat_mp.md)
- [ ] [企业微信二维码登录](clients/wechat_work.md)
- [ ] [钉钉](clients/dingtalk.md)
- [ ] [QQ](clients/qq.md)
16 changes: 16 additions & 0 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,22 @@ from fastapi_oauth20 import FastAPIOAuth20

本指南介绍如何将 FastAPI OAuth2.0 库与各种 OAuth2 提供程序一起使用。

## 演示项目

在开始之前,强烈推荐查看我们的完整演示项目

**[fastapi-oauth20-demo](https://github.com/fastapi-practices/fastapi-oauth20-demo)**

该演示项目包含:

- 多个 OAuth2 提供商的集成示例
- 详细的代码注释和实现说明
- 生产环境的最佳实践
- 错误处理示例
- 可直接运行的完整应用

通过演示项目,你可以快速了解如何在真实应用中使用 fastapi-oauth20

## 基本用法

### 1. 选择 OAuth2 提供商并初始化客户端
Expand Down
2 changes: 2 additions & 0 deletions fastapi_oauth20/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@
from .clients.google import GoogleOAuth20 as GoogleOAuth20
from .clients.linuxdo import LinuxDoOAuth20 as LinuxDoOAuth20
from .clients.oschina import OSChinaOAuth20 as OSChinaOAuth20
from .clients.weixin_mp import WeChatMpOAuth20 as WeChatMpOAuth20
from .clients.weixin_open import WeChatOpenOAuth20 as WeChatOpenOAuth20

__version__ = '0.0.2'
22 changes: 17 additions & 5 deletions fastapi_oauth20/callback.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import inspect

from typing import Annotated, Any

import httpx
Expand Down Expand Up @@ -25,6 +27,7 @@ def __init__(
:param detail: Error detail message describing what went wrong.
:param headers: Additional HTTP headers to include in the error response.
:param response: The original HTTP response that caused the error (if any).
:return:
"""
self.response = response
super().__init__(status_code=status_code, detail=detail, headers=headers)
Expand All @@ -44,6 +47,7 @@ def __init__(

:param client: An OAuth2 client instance that inherits from OAuth20Base.
:param redirect_uri: The full callback URL where the OAuth2 provider redirects after authorization. Must match the URL registered with the OAuth2 provider.
:return:
"""
self.client = client
self.redirect_uri = redirect_uri
Expand All @@ -64,19 +68,27 @@ async def __call__(
:param state: The state parameter for CSRF protection (extracted from query parameters).
:param code_verifier: PKCE code verifier if PKCE was used in the authorization request.
:param error: Error parameter from OAuth2 provider if authorization was denied or failed.
:return:
"""
if code is None or error is not None:
raise OAuth20AuthorizeCallbackError(
status_code=400,
detail=error if error is not None else None,
)

kwargs = {'code': code}

try:
access_token = await self.client.get_access_token(
code=code,
redirect_uri=self.redirect_uri,
code_verifier=code_verifier,
)
sig = inspect.signature(self.client.get_access_token)
params = sig.parameters

if 'redirect_uri' in params:
kwargs['redirect_uri'] = self.redirect_uri

if 'code_verifier' in params:
kwargs['code_verifier'] = code_verifier

access_token = await self.client.get_access_token(**kwargs)
except OAuth20RequestError as e:
raise OAuth20AuthorizeCallbackError(
status_code=500,
Expand Down
1 change: 1 addition & 0 deletions fastapi_oauth20/clients/feishu.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def __init__(self, client_id: str, client_secret: str):

:param client_id: FeiShu app client ID from the FeiShu developer console.
:param client_secret: FeiShu app client secret from the FeiShu developer console.
:return:
"""
super().__init__(
client_id=client_id,
Expand Down
1 change: 1 addition & 0 deletions fastapi_oauth20/clients/gitee.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def __init__(self, client_id: str, client_secret: str):

:param client_id: Gitee OAuth application client ID.
:param client_secret: Gitee OAuth application client secret.
:return:
"""
super().__init__(
client_id=client_id,
Expand Down
1 change: 1 addition & 0 deletions fastapi_oauth20/clients/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def __init__(self, client_id: str, client_secret: str):

:param client_id: GitHub OAuth App client ID.
:param client_secret: GitHub OAuth App client secret.
:return:
"""
super().__init__(
client_id=client_id,
Expand Down
1 change: 1 addition & 0 deletions fastapi_oauth20/clients/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def __init__(self, client_id: str, client_secret: str):

:param client_id: Google OAuth 2.0 client ID from Google Cloud Console.
:param client_secret: Google OAuth 2.0 client secret from Google Cloud Console.
:return:
"""
super().__init__(
client_id=client_id,
Expand Down
1 change: 1 addition & 0 deletions fastapi_oauth20/clients/linuxdo.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def __init__(self, client_id: str, client_secret: str):

:param client_id: Linux.do OAuth application client ID.
:param client_secret: Linux.do OAuth application client secret.
:return:
"""
super().__init__(
client_id=client_id,
Expand Down
1 change: 1 addition & 0 deletions fastapi_oauth20/clients/oschina.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def __init__(self, client_id: str, client_secret: str):

:param client_id: OSChina OAuth application client ID.
:param client_secret: OSChina OAuth application client secret.
:return:
"""
super().__init__(
client_id=client_id,
Expand Down
139 changes: 139 additions & 0 deletions fastapi_oauth20/clients/weixin_mp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
from typing import Any
from urllib.parse import urlencode

import httpx

from fastapi_oauth20.errors import AccessTokenError, GetUserInfoError, RefreshTokenError
from fastapi_oauth20.oauth20 import OAuth20Base


class WeChatMpOAuth20(OAuth20Base):
"""WeChat public platform OAuth2 client implementation."""

def __init__(self, client_id: str, client_secret: str):
"""
Initialize WeChat public platform OAuth2 client.

:param client_id: AppID from the WeChat public platform developer console.
:param client_secret: AppSecret from the WeChat public platform developer console.
:return:
"""
super().__init__(
client_id=client_id,
client_secret=client_secret,
authorize_endpoint='https://open.weixin.qq.com/connect/oauth2/authorize',
access_token_endpoint='https://api.weixin.qq.com/sns/oauth2/access_token',
refresh_token_endpoint='https://api.weixin.qq.com/sns/oauth2/refresh_token',
userinfo_endpoint='https://api.weixin.qq.com/sns/userinfo',
default_scopes=['snsapi_userinfo'],
)

async def get_authorization_url(
self,
redirect_uri: str,
state: str | None = None,
scope: list[str] | None = None,
**kwargs,
) -> str:
"""
Generate WeChat OAuth2 authorization URL.

:param redirect_uri: The URL where WeChat will redirect after authorization.
:param state: An opaque value used to maintain state between request and callback.
:param scope: The list of OAuth scopes to request. Default is ['snsapi_userinfo'].
:param kwargs: Additional query parameters.
:return:
"""
params = {
'appid': self.client_id,
'redirect_uri': redirect_uri,
'response_type': 'code',
}

if state is not None:
params['state'] = state

_scope = scope or self.default_scopes
if _scope is not None:
params['scope'] = ','.join(_scope)

if kwargs:
params.update(kwargs)

return f'{self.authorize_endpoint}?{urlencode(params)}#wechat_redirect'

async def get_access_token(self, code: str) -> dict[str, Any]:
"""
Exchange authorization code for access token using WeChat's GET method.

:param code: The authorization code received from WeChat callback.
:return:
"""
params = {
'appid': self.client_id,
'secret': self.client_secret,
'code': code,
'grant_type': 'authorization_code',
}

async with httpx.AsyncClient() as client:
response = await client.get(
self.access_token_endpoint,
params=params,
headers=self.request_headers,
)
self.raise_httpx_oauth20_errors(response)
result = self.get_json_result(response, err_class=AccessTokenError)
return result

async def refresh_token(self, refresh_token: str) -> dict[str, Any]:
"""
Refresh access token using WeChat's GET method.

:param refresh_token: The refresh token received from initial token exchange.
:return:
"""
if self.refresh_token_endpoint is None:
raise RefreshTokenError('The refresh token address is missing')

params = {
'appid': self.client_id,
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
}

async with httpx.AsyncClient() as client:
response = await client.get(
self.refresh_token_endpoint,
params=params,
headers=self.request_headers,
)
self.raise_httpx_oauth20_errors(response)
result = self.get_json_result(response, err_class=RefreshTokenError)
return result

async def get_userinfo(self, access_token: str, openid: str | None = None) -> dict[str, Any]:
"""
Retrieve user information from WeChat API.

:param access_token: Valid WeChat access token.
:param openid: User's OpenID.
:return:
"""
if openid is None:
raise GetUserInfoError('openid is required')

params = {
'access_token': access_token,
'openid': openid,
'lang': 'zh_CN',
}

async with httpx.AsyncClient() as client:
response = await client.get(
self.userinfo_endpoint,
params=params,
)
self.raise_httpx_oauth20_errors(response)
result = self.get_json_result(response, err_class=GetUserInfoError)
return result
Loading