diff --git a/generate-code.py b/generate-code.py index dfb386708..b68fd89ed 100644 --- a/generate-code.py +++ b/generate-code.py @@ -19,6 +19,82 @@ def run_command(command): return proc.stdout.strip() +def add_stateless_channel_token_wrappers(): + for fname in ['channel_access_token.py', 'async_channel_access_token.py']: + filepath = f'linebot/v3/oauth/api/{fname}' + + # Inject deprecation notes into original methods' docstrings + with open(filepath, 'r') as fp: + lines = fp.readlines() + + new_lines = [] + i = 0 + while i < len(lines): + new_lines.append(lines[i]) + for base_method in ['issue_stateless_channel_token_with_http_info', + 'issue_stateless_channel_token']: + if f'def {base_method}(self' in lines[i] and i + 1 < len(lines) and '"""' in lines[i + 1]: + # Next line: docstring title + i += 1 + new_lines.append(lines[i]) + # Next line: blank line + i += 1 + new_lines.append(lines[i]) + # Insert deprecation note + new_lines.append(f' .. deprecated::\n') + new_lines.append(f' Use :func:`{base_method}_by_jwt_assertion` or\n') + new_lines.append(f' :func:`{base_method}_by_client_secret` instead.\n') + new_lines.append('\n') + break + i += 1 + + with open(filepath, 'w') as fp: + fp.writelines(new_lines) + + # Append wrapper methods with docstrings + with open(filepath, 'a') as fp: + for base_method in ['issue_stateless_channel_token', + 'issue_stateless_channel_token_with_http_info']: + if base_method == 'issue_stateless_channel_token': + rtype = 'IssueStatelessChannelAccessTokenResponse' + else: + rtype = 'ApiResponse' + + fp.write("\n") + fp.write(f" def {base_method}_by_jwt_assertion(self, client_assertion, **kwargs):\n") + fp.write(f' """Issue a stateless channel access token using a JSON Web Token (JWT).\n') + fp.write(f'\n') + fp.write(f' :param str client_assertion: A JSON Web Token the client needs to create and sign with the private key of the Assertion Signing Key.\n') + fp.write(f' :return: Returns the result object.\n') + fp.write(f' :rtype: {rtype}\n') + fp.write(f' """\n') + fp.write(f" return self.{base_method}(\n") + fp.write(" grant_type='client_credentials',\n") + fp.write(" client_assertion_type='urn:ietf:params:oauth:client-assertion-type:jwt-bearer',\n") + fp.write(" client_assertion=client_assertion,\n") + fp.write(" client_id='',\n") + fp.write(" client_secret='',\n") + fp.write(" **kwargs,\n") + fp.write(" )\n") + fp.write("\n") + fp.write(f" def {base_method}_by_client_secret(self, client_id, client_secret, **kwargs):\n") + fp.write(f' """Issue a stateless channel access token using client ID and client secret.\n') + fp.write(f'\n') + fp.write(f' :param str client_id: Channel ID.\n') + fp.write(f' :param str client_secret: Channel secret.\n') + fp.write(f' :return: Returns the result object.\n') + fp.write(f' :rtype: {rtype}\n') + fp.write(f' """\n') + fp.write(f" return self.{base_method}(\n") + fp.write(" grant_type='client_credentials',\n") + fp.write(" client_assertion_type='',\n") + fp.write(" client_assertion='',\n") + fp.write(" client_id=client_id,\n") + fp.write(" client_secret=client_secret,\n") + fp.write(" **kwargs,\n") + fp.write(" )\n") + + def rewrite_liff_function_name_backward_compats(): for fname in ['liff.py', 'async_liff.py']: with open(f'linebot/v3/liff/api/{fname}', 'a') as fp: @@ -98,6 +174,8 @@ def main(): run_command(command) + add_stateless_channel_token_wrappers() + ## TODO(v4): Delete this workaround in v4. This workaround keeps backward compatibility. rewrite_liff_function_name_backward_compats() diff --git a/linebot/v3/oauth/api/async_channel_access_token.py b/linebot/v3/oauth/api/async_channel_access_token.py index 2149ff77c..dd1c2b60f 100644 --- a/linebot/v3/oauth/api/async_channel_access_token.py +++ b/linebot/v3/oauth/api/async_channel_access_token.py @@ -570,6 +570,10 @@ def issue_stateless_channel_token(self, grant_type : Annotated[StrictStr, Field( def issue_stateless_channel_token(self, grant_type : Annotated[StrictStr, Field(..., description="`client_credentials`")], client_assertion_type : Annotated[StrictStr, Field(..., description="URL-encoded value of `urn:ietf:params:oauth:client-assertion-type:jwt-bearer`")], client_assertion : Annotated[StrictStr, Field(..., description="A JSON Web Token the client needs to create and sign with the private key of the Assertion Signing Key.")], client_id : Annotated[StrictStr, Field(..., description="Channel ID.")], client_secret : Annotated[StrictStr, Field(..., description="Channel secret.")], async_req: Optional[bool]=None, **kwargs) -> Union[IssueStatelessChannelAccessTokenResponse, Awaitable[IssueStatelessChannelAccessTokenResponse]]: # noqa: E501 """issue_stateless_channel_token # noqa: E501 + .. deprecated:: + Use :func:`issue_stateless_channel_token_by_jwt_assertion` or + :func:`issue_stateless_channel_token_by_client_secret` instead. + Issues a new stateless channel access token, which doesn't have max active token limit unlike the other token types. The newly issued token is only valid for 15 minutes but can not be revoked until it naturally expires. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True @@ -609,6 +613,10 @@ def issue_stateless_channel_token(self, grant_type : Annotated[StrictStr, Field( def issue_stateless_channel_token_with_http_info(self, grant_type : Annotated[StrictStr, Field(..., description="`client_credentials`")], client_assertion_type : Annotated[StrictStr, Field(..., description="URL-encoded value of `urn:ietf:params:oauth:client-assertion-type:jwt-bearer`")], client_assertion : Annotated[StrictStr, Field(..., description="A JSON Web Token the client needs to create and sign with the private key of the Assertion Signing Key.")], client_id : Annotated[StrictStr, Field(..., description="Channel ID.")], client_secret : Annotated[StrictStr, Field(..., description="Channel secret.")], **kwargs) -> ApiResponse: # noqa: E501 """issue_stateless_channel_token # noqa: E501 + .. deprecated:: + Use :func:`issue_stateless_channel_token_with_http_info_by_jwt_assertion` or + :func:`issue_stateless_channel_token_with_http_info_by_client_secret` instead. + Issues a new stateless channel access token, which doesn't have max active token limit unlike the other token types. The newly issued token is only valid for 15 minutes but can not be revoked until it naturally expires. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True @@ -1376,3 +1384,69 @@ def verify_channel_token_by_jwt_with_http_info(self, access_token : Annotated[St _host=_host, collection_formats=_collection_formats, _request_auth=_params.get('_request_auth')) + + def issue_stateless_channel_token_by_jwt_assertion(self, client_assertion, **kwargs): + """Issue a stateless channel access token using a JSON Web Token (JWT). + + :param str client_assertion: A JSON Web Token the client needs to create and sign with the private key of the Assertion Signing Key. + :return: Returns the result object. + :rtype: IssueStatelessChannelAccessTokenResponse + """ + return self.issue_stateless_channel_token( + grant_type='client_credentials', + client_assertion_type='urn:ietf:params:oauth:client-assertion-type:jwt-bearer', + client_assertion=client_assertion, + client_id='', + client_secret='', + **kwargs, + ) + + def issue_stateless_channel_token_by_client_secret(self, client_id, client_secret, **kwargs): + """Issue a stateless channel access token using client ID and client secret. + + :param str client_id: Channel ID. + :param str client_secret: Channel secret. + :return: Returns the result object. + :rtype: IssueStatelessChannelAccessTokenResponse + """ + return self.issue_stateless_channel_token( + grant_type='client_credentials', + client_assertion_type='', + client_assertion='', + client_id=client_id, + client_secret=client_secret, + **kwargs, + ) + + def issue_stateless_channel_token_with_http_info_by_jwt_assertion(self, client_assertion, **kwargs): + """Issue a stateless channel access token using a JSON Web Token (JWT). + + :param str client_assertion: A JSON Web Token the client needs to create and sign with the private key of the Assertion Signing Key. + :return: Returns the result object. + :rtype: ApiResponse + """ + return self.issue_stateless_channel_token_with_http_info( + grant_type='client_credentials', + client_assertion_type='urn:ietf:params:oauth:client-assertion-type:jwt-bearer', + client_assertion=client_assertion, + client_id='', + client_secret='', + **kwargs, + ) + + def issue_stateless_channel_token_with_http_info_by_client_secret(self, client_id, client_secret, **kwargs): + """Issue a stateless channel access token using client ID and client secret. + + :param str client_id: Channel ID. + :param str client_secret: Channel secret. + :return: Returns the result object. + :rtype: ApiResponse + """ + return self.issue_stateless_channel_token_with_http_info( + grant_type='client_credentials', + client_assertion_type='', + client_assertion='', + client_id=client_id, + client_secret=client_secret, + **kwargs, + ) diff --git a/linebot/v3/oauth/api/channel_access_token.py b/linebot/v3/oauth/api/channel_access_token.py index 15634e631..e1c1af7c5 100644 --- a/linebot/v3/oauth/api/channel_access_token.py +++ b/linebot/v3/oauth/api/channel_access_token.py @@ -530,6 +530,10 @@ def issue_channel_token_by_jwt_with_http_info(self, grant_type : Annotated[Stric def issue_stateless_channel_token(self, grant_type : Annotated[StrictStr, Field(..., description="`client_credentials`")], client_assertion_type : Annotated[StrictStr, Field(..., description="URL-encoded value of `urn:ietf:params:oauth:client-assertion-type:jwt-bearer`")], client_assertion : Annotated[StrictStr, Field(..., description="A JSON Web Token the client needs to create and sign with the private key of the Assertion Signing Key.")], client_id : Annotated[StrictStr, Field(..., description="Channel ID.")], client_secret : Annotated[StrictStr, Field(..., description="Channel secret.")], **kwargs) -> IssueStatelessChannelAccessTokenResponse: # noqa: E501 """issue_stateless_channel_token # noqa: E501 + .. deprecated:: + Use :func:`issue_stateless_channel_token_by_jwt_assertion` or + :func:`issue_stateless_channel_token_by_client_secret` instead. + Issues a new stateless channel access token, which doesn't have max active token limit unlike the other token types. The newly issued token is only valid for 15 minutes but can not be revoked until it naturally expires. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True @@ -567,6 +571,10 @@ def issue_stateless_channel_token(self, grant_type : Annotated[StrictStr, Field( def issue_stateless_channel_token_with_http_info(self, grant_type : Annotated[StrictStr, Field(..., description="`client_credentials`")], client_assertion_type : Annotated[StrictStr, Field(..., description="URL-encoded value of `urn:ietf:params:oauth:client-assertion-type:jwt-bearer`")], client_assertion : Annotated[StrictStr, Field(..., description="A JSON Web Token the client needs to create and sign with the private key of the Assertion Signing Key.")], client_id : Annotated[StrictStr, Field(..., description="Channel ID.")], client_secret : Annotated[StrictStr, Field(..., description="Channel secret.")], **kwargs) -> ApiResponse: # noqa: E501 """issue_stateless_channel_token # noqa: E501 + .. deprecated:: + Use :func:`issue_stateless_channel_token_with_http_info_by_jwt_assertion` or + :func:`issue_stateless_channel_token_with_http_info_by_client_secret` instead. + Issues a new stateless channel access token, which doesn't have max active token limit unlike the other token types. The newly issued token is only valid for 15 minutes but can not be revoked until it naturally expires. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True @@ -1294,3 +1302,69 @@ def verify_channel_token_by_jwt_with_http_info(self, access_token : Annotated[St _host=_host, collection_formats=_collection_formats, _request_auth=_params.get('_request_auth')) + + def issue_stateless_channel_token_by_jwt_assertion(self, client_assertion, **kwargs): + """Issue a stateless channel access token using a JSON Web Token (JWT). + + :param str client_assertion: A JSON Web Token the client needs to create and sign with the private key of the Assertion Signing Key. + :return: Returns the result object. + :rtype: IssueStatelessChannelAccessTokenResponse + """ + return self.issue_stateless_channel_token( + grant_type='client_credentials', + client_assertion_type='urn:ietf:params:oauth:client-assertion-type:jwt-bearer', + client_assertion=client_assertion, + client_id='', + client_secret='', + **kwargs, + ) + + def issue_stateless_channel_token_by_client_secret(self, client_id, client_secret, **kwargs): + """Issue a stateless channel access token using client ID and client secret. + + :param str client_id: Channel ID. + :param str client_secret: Channel secret. + :return: Returns the result object. + :rtype: IssueStatelessChannelAccessTokenResponse + """ + return self.issue_stateless_channel_token( + grant_type='client_credentials', + client_assertion_type='', + client_assertion='', + client_id=client_id, + client_secret=client_secret, + **kwargs, + ) + + def issue_stateless_channel_token_with_http_info_by_jwt_assertion(self, client_assertion, **kwargs): + """Issue a stateless channel access token using a JSON Web Token (JWT). + + :param str client_assertion: A JSON Web Token the client needs to create and sign with the private key of the Assertion Signing Key. + :return: Returns the result object. + :rtype: ApiResponse + """ + return self.issue_stateless_channel_token_with_http_info( + grant_type='client_credentials', + client_assertion_type='urn:ietf:params:oauth:client-assertion-type:jwt-bearer', + client_assertion=client_assertion, + client_id='', + client_secret='', + **kwargs, + ) + + def issue_stateless_channel_token_with_http_info_by_client_secret(self, client_id, client_secret, **kwargs): + """Issue a stateless channel access token using client ID and client secret. + + :param str client_id: Channel ID. + :param str client_secret: Channel secret. + :return: Returns the result object. + :rtype: ApiResponse + """ + return self.issue_stateless_channel_token_with_http_info( + grant_type='client_credentials', + client_assertion_type='', + client_assertion='', + client_id=client_id, + client_secret=client_secret, + **kwargs, + ) diff --git a/tests/api/test_issue_stateless_channel_token.py b/tests/api/test_issue_stateless_channel_token.py new file mode 100644 index 000000000..aeabfc8c7 --- /dev/null +++ b/tests/api/test_issue_stateless_channel_token.py @@ -0,0 +1,192 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import unittest +from urllib.parse import parse_qs + +from pytest_httpserver import HTTPServer +from linebot.v3.oauth import ( + Configuration, + ApiClient, + ChannelAccessToken, +) + + +class TestIssueStatelessChannelToken(unittest.TestCase): + + def test_issue_stateless_channel_token_by_jwt_assertion(self): + client_assertion = 'eyJhbGciOiJSUzI.q....' + client_assertion_type = 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer' + + with HTTPServer() as httpserver: + httpserver.expect_request( + uri="/oauth2/v3/token", + method="POST", + ).respond_with_json( + { + 'access_token': 'test_access_token', + 'expires_in': 900, + 'token_type': 'Bearer', + }, + status=200, + ) + + configuration = Configuration(host=httpserver.url_for("/")) + with ApiClient(configuration) as api_client: + api = ChannelAccessToken(api_client) + api.line_base_path = httpserver.url_for("/") + + response = api.issue_stateless_channel_token_by_jwt_assertion( + client_assertion, + ) + + self.assertEqual(response.access_token, 'test_access_token') + self.assertEqual(response.expires_in, 900) + self.assertEqual(response.token_type, 'Bearer') + self.assertEqual(len(httpserver.log), 1) + + request, _ = httpserver.log[0] + encoded_body = parse_qs(request.data.decode('utf-8')) + self.assertEqual(encoded_body['grant_type'], ['client_credentials']) + self.assertEqual(encoded_body['client_assertion_type'], [client_assertion_type]) + self.assertEqual(encoded_body['client_assertion'], [client_assertion]) + self.assertNotIn('client_id', encoded_body) + self.assertNotIn('client_secret', encoded_body) + + def test_issue_stateless_channel_token_by_client_secret(self): + client_id = 'test_client_id' + client_secret = 'test_client_secret' + + with HTTPServer() as httpserver: + httpserver.expect_request( + uri="/oauth2/v3/token", + method="POST", + ).respond_with_json( + { + 'access_token': 'test_access_token', + 'expires_in': 900, + 'token_type': 'Bearer', + }, + status=200, + ) + + configuration = Configuration(host=httpserver.url_for("/")) + with ApiClient(configuration) as api_client: + api = ChannelAccessToken(api_client) + api.line_base_path = httpserver.url_for("/") + + response = api.issue_stateless_channel_token_by_client_secret( + client_id, + client_secret, + ) + + self.assertEqual(response.access_token, 'test_access_token') + self.assertEqual(response.expires_in, 900) + self.assertEqual(response.token_type, 'Bearer') + self.assertEqual(len(httpserver.log), 1) + + request, _ = httpserver.log[0] + encoded_body = parse_qs(request.data.decode('utf-8')) + self.assertEqual(encoded_body['grant_type'], ['client_credentials']) + self.assertEqual(encoded_body['client_id'], [client_id]) + self.assertEqual(encoded_body['client_secret'], [client_secret]) + self.assertNotIn('client_assertion_type', encoded_body) + self.assertNotIn('client_assertion', encoded_body) + + def test_issue_stateless_channel_token_with_http_info_by_jwt_assertion(self): + client_assertion = 'eyJhbGciOiJSUzI.q....' + client_assertion_type = 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer' + + with HTTPServer() as httpserver: + httpserver.expect_request( + uri="/oauth2/v3/token", + method="POST", + ).respond_with_json( + { + 'access_token': 'test_access_token', + 'expires_in': 900, + 'token_type': 'Bearer', + }, + status=200, + ) + + configuration = Configuration(host=httpserver.url_for("/")) + with ApiClient(configuration) as api_client: + api = ChannelAccessToken(api_client) + api.line_base_path = httpserver.url_for("/") + + api_response = api.issue_stateless_channel_token_with_http_info_by_jwt_assertion( + client_assertion, + ) + + self.assertEqual(api_response.status_code, 200) + response = api_response.data + self.assertEqual(response.access_token, 'test_access_token') + self.assertEqual(response.expires_in, 900) + self.assertEqual(response.token_type, 'Bearer') + self.assertEqual(len(httpserver.log), 1) + + request, _ = httpserver.log[0] + encoded_body = parse_qs(request.data.decode('utf-8')) + self.assertEqual(encoded_body['grant_type'], ['client_credentials']) + self.assertEqual(encoded_body['client_assertion_type'], [client_assertion_type]) + self.assertEqual(encoded_body['client_assertion'], [client_assertion]) + self.assertNotIn('client_id', encoded_body) + self.assertNotIn('client_secret', encoded_body) + + def test_issue_stateless_channel_token_with_http_info_by_client_secret(self): + client_id = 'test_client_id' + client_secret = 'test_client_secret' + + with HTTPServer() as httpserver: + httpserver.expect_request( + uri="/oauth2/v3/token", + method="POST", + ).respond_with_json( + { + 'access_token': 'test_access_token', + 'expires_in': 900, + 'token_type': 'Bearer', + }, + status=200, + ) + + configuration = Configuration(host=httpserver.url_for("/")) + with ApiClient(configuration) as api_client: + api = ChannelAccessToken(api_client) + api.line_base_path = httpserver.url_for("/") + + api_response = api.issue_stateless_channel_token_with_http_info_by_client_secret( + client_id, + client_secret, + ) + + self.assertEqual(api_response.status_code, 200) + response = api_response.data + self.assertEqual(response.access_token, 'test_access_token') + self.assertEqual(response.expires_in, 900) + self.assertEqual(response.token_type, 'Bearer') + self.assertEqual(len(httpserver.log), 1) + + request, _ = httpserver.log[0] + encoded_body = parse_qs(request.data.decode('utf-8')) + self.assertEqual(encoded_body['grant_type'], ['client_credentials']) + self.assertEqual(encoded_body['client_id'], [client_id]) + self.assertEqual(encoded_body['client_secret'], [client_secret]) + self.assertNotIn('client_assertion_type', encoded_body) + self.assertNotIn('client_assertion', encoded_body) + + +if __name__ == '__main__': + unittest.main()