diff --git a/CHANGELOG.md b/CHANGELOG.md index 8848d22fd9..4a2e7300f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ - Added support for `FileOperation.remove` to remove files in a stage. - Added a new function `snowflake.snowpark.functions.vectorized` that allows users to mark a function as vectorized UDF. - Added support for parameter `use_vectorized_scanner` in function `Session.write_pandas()`. +- Added support for parameter `session_init_statement` in udtf ingestion of `DataFrameReader.jdbc`(PrPr). - Added support for the following scalar functions in `functions.py`: - `getdate` - `getvariable` @@ -27,6 +28,8 @@ #### Bug Fixes +- Fixed a bug that `query_timeout` does not work in udtf ingestion of `DataFrameReader.jdbc`(PrPr). + #### Deprecations #### Dependency Updates diff --git a/src/snowflake/snowpark/_internal/data_source/datasource_reader.py b/src/snowflake/snowpark/_internal/data_source/datasource_reader.py index 507141b9fa..cb2ea1e01d 100644 --- a/src/snowflake/snowpark/_internal/data_source/datasource_reader.py +++ b/src/snowflake/snowpark/_internal/data_source/datasource_reader.py @@ -60,7 +60,7 @@ def read(self, partition: str) -> Iterator[List[Any]]: cursor.execute(statement) except BaseException as exc: raise SnowparkDataframeReaderException( - f"Failed to execute session init statement: '{statement}' due to exception '{exc!r}'" + f"Failed to execute session init statement: '{statement}' due to exception '{exc}'" ) # use server side cursor to fetch data if supported by the driver # some drivers do not support execute twice on server side cursor (e.g. psycopg2) diff --git a/src/snowflake/snowpark/_internal/data_source/drivers/base_driver.py b/src/snowflake/snowpark/_internal/data_source/drivers/base_driver.py index a9de3af7b7..28b45b5770 100644 --- a/src/snowflake/snowpark/_internal/data_source/drivers/base_driver.py +++ b/src/snowflake/snowpark/_internal/data_source/drivers/base_driver.py @@ -139,6 +139,8 @@ def udtf_ingestion( fetch_size: int = 1000, imports: Optional[List[str]] = None, packages: Optional[List[str]] = None, + session_init_statement: Optional[List[str]] = None, + query_timeout: Optional[int] = 0, _emit_ast: bool = True, ) -> "snowflake.snowpark.DataFrame": from snowflake.snowpark._internal.data_source.utils import UDTF_PACKAGE_MAP @@ -146,7 +148,12 @@ def udtf_ingestion( udtf_name = random_name_for_temp_object(TempObjectType.FUNCTION) with measure_time() as udtf_register_time: session.udtf.register( - self.udtf_class_builder(fetch_size=fetch_size, schema=schema), + self.udtf_class_builder( + fetch_size=fetch_size, + schema=schema, + session_init_statement=session_init_statement, + query_timeout=query_timeout, + ), name=udtf_name, output_schema=StructType( [ @@ -166,14 +173,22 @@ def udtf_ingestion( return self.to_result_snowpark_df_udtf(res, schema, _emit_ast=_emit_ast) def udtf_class_builder( - self, fetch_size: int = 1000, schema: StructType = None + self, + fetch_size: int = 1000, + schema: StructType = None, + session_init_statement: List[str] = None, + query_timeout: int = 0, ) -> type: create_connection = self.create_connection + prepare_connection = self.prepare_connection class UDTFIngestion: def process(self, query: str): - conn = create_connection() + conn = prepare_connection(create_connection(), query_timeout) cursor = conn.cursor() + if session_init_statement is not None: + for statement in session_init_statement: + cursor.execute(statement) cursor.execute(query) while True: rows = cursor.fetchmany(fetch_size) diff --git a/src/snowflake/snowpark/_internal/data_source/drivers/databricks_driver.py b/src/snowflake/snowpark/_internal/data_source/drivers/databricks_driver.py index 226b45236d..62d1e2bcce 100644 --- a/src/snowflake/snowpark/_internal/data_source/drivers/databricks_driver.py +++ b/src/snowflake/snowpark/_internal/data_source/drivers/databricks_driver.py @@ -69,7 +69,11 @@ def to_snow_type(self, schema: List[Any]) -> StructType: return StructType(all_columns) def udtf_class_builder( - self, fetch_size: int = 1000, schema: StructType = None + self, + fetch_size: int = 1000, + schema: StructType = None, + session_init_statement: List[str] = None, + query_timeout: int = 0, ) -> type: create_connection = self.create_connection @@ -77,6 +81,9 @@ class UDTFIngestion: def process(self, query: str): conn = create_connection() cursor = conn.cursor() + if session_init_statement is not None: + for statement in session_init_statement: + cursor.execute(statement) # First get schema information describe_query = f"DESCRIBE QUERY SELECT * FROM ({query})" diff --git a/src/snowflake/snowpark/_internal/data_source/drivers/oracledb_driver.py b/src/snowflake/snowpark/_internal/data_source/drivers/oracledb_driver.py index 11d7b9ec07..745adec64f 100644 --- a/src/snowflake/snowpark/_internal/data_source/drivers/oracledb_driver.py +++ b/src/snowflake/snowpark/_internal/data_source/drivers/oracledb_driver.py @@ -105,13 +105,18 @@ def prepare_connection( conn: "Connection", query_timeout: int = 0, ) -> "Connection": - conn.call_timeout = query_timeout * 1000 + if query_timeout > 0: + conn.call_timeout = query_timeout * 1000 if conn.outputtypehandler is None: conn.outputtypehandler = output_type_handler return conn def udtf_class_builder( - self, fetch_size: int = 1000, schema: StructType = None + self, + fetch_size: int = 1000, + schema: StructType = None, + session_init_statement: List[str] = None, + query_timeout: int = 0, ) -> type: create_connection = self.create_connection @@ -138,9 +143,14 @@ def convert_to_hex(value): class UDTFIngestion: def process(self, query: str): conn = create_connection() + if query_timeout > 0: + conn.call_timeout = query_timeout * 1000 if conn.outputtypehandler is None: conn.outputtypehandler = oracledb_output_type_handler cursor = conn.cursor() + if session_init_statement is not None: + for statement in session_init_statement: + cursor.execute(statement) cursor.execute(query) while True: rows = cursor.fetchmany(fetch_size) diff --git a/src/snowflake/snowpark/_internal/data_source/drivers/psycopg2_driver.py b/src/snowflake/snowpark/_internal/data_source/drivers/psycopg2_driver.py index 8bfa734f92..ee9575f090 100644 --- a/src/snowflake/snowpark/_internal/data_source/drivers/psycopg2_driver.py +++ b/src/snowflake/snowpark/_internal/data_source/drivers/psycopg2_driver.py @@ -253,7 +253,11 @@ def prepare_connection( return conn def udtf_class_builder( - self, fetch_size: int = 1000, schema: StructType = None + self, + fetch_size: int = 1000, + schema: StructType = None, + session_init_statement: List[str] = None, + query_timeout: int = 0, ) -> type: create_connection = self.create_connection @@ -275,10 +279,15 @@ def prepare_connection_in_udtf( class UDTFIngestion: def process(self, query: str): - conn = prepare_connection_in_udtf(create_connection()) + conn = prepare_connection_in_udtf(create_connection(), query_timeout) cursor = conn.cursor( f"SNOWPARK_CURSOR_{generate_random_alphanumeric(5)}" ) + if session_init_statement is not None: + session_init_cur = conn.cursor() + for statement in session_init_statement: + session_init_cur.execute(statement) + session_init_cur.fetchall() cursor.execute(query) while True: rows = cursor.fetchmany(fetch_size) diff --git a/src/snowflake/snowpark/_internal/data_source/drivers/pymsql_driver.py b/src/snowflake/snowpark/_internal/data_source/drivers/pymsql_driver.py index 268a9145ae..7428bce4af 100644 --- a/src/snowflake/snowpark/_internal/data_source/drivers/pymsql_driver.py +++ b/src/snowflake/snowpark/_internal/data_source/drivers/pymsql_driver.py @@ -184,7 +184,11 @@ def to_snow_type(self, schema: List[Any]) -> StructType: return StructType(fields) def udtf_class_builder( - self, fetch_size: int = 1000, schema: StructType = None + self, + fetch_size: int = 1000, + schema: StructType = None, + session_init_statement: List[str] = None, + query_timeout: int = 0, ) -> type: create_connection = self.create_connection @@ -194,6 +198,9 @@ def process(self, query: str): conn = create_connection() cursor = pymysql.cursors.SSCursor(conn) + if session_init_statement is not None: + for statement in session_init_statement: + cursor.execute(statement) cursor.execute(query) while True: rows = cursor.fetchmany(fetch_size) @@ -203,14 +210,6 @@ def process(self, query: str): return UDTFIngestion - def prepare_connection( - self, - conn: "Connection", - query_timeout: int = 0, - ) -> "Connection": - conn.read_timeout = query_timeout if query_timeout != 0 else None - return conn - @staticmethod def infer_type_from_data(data: List[tuple], number_of_columns: int) -> List[Type]: # TODO: SNOW-2112938 investigate whether different types can be fit into one column diff --git a/src/snowflake/snowpark/_internal/data_source/drivers/pyodbc_driver.py b/src/snowflake/snowpark/_internal/data_source/drivers/pyodbc_driver.py index 9ecbac9ea9..cbdedc3ccb 100644 --- a/src/snowflake/snowpark/_internal/data_source/drivers/pyodbc_driver.py +++ b/src/snowflake/snowpark/_internal/data_source/drivers/pyodbc_driver.py @@ -78,9 +78,14 @@ def to_snow_type(self, schema: List[Any]) -> StructType: return StructType(fields) def udtf_class_builder( - self, fetch_size: int = 1000, schema: StructType = None + self, + fetch_size: int = 1000, + schema: StructType = None, + session_init_statement: List[str] = None, + query_timeout: int = 0, ) -> type: create_connection = self.create_connection + prepare_connection = self.prepare_connection def binary_converter(value): return value.hex() if value is not None else None @@ -89,7 +94,7 @@ class UDTFIngestion: def process(self, query: str): import pyodbc - conn = create_connection() + conn = prepare_connection(create_connection(), query_timeout) if ( conn.get_output_converter(pyodbc.SQL_BINARY) is None and conn.get_output_converter(pyodbc.SQL_VARBINARY) is None @@ -101,6 +106,9 @@ def process(self, query: str): pyodbc.SQL_LONGVARBINARY, binary_converter ) cursor = conn.cursor() + if session_init_statement is not None: + for statement in session_init_statement: + cursor.execute(statement) cursor.execute(query) while True: rows = cursor.fetchmany(fetch_size) diff --git a/src/snowflake/snowpark/dataframe_reader.py b/src/snowflake/snowpark/dataframe_reader.py index 5533ebb246..d376751d8b 100644 --- a/src/snowflake/snowpark/dataframe_reader.py +++ b/src/snowflake/snowpark/dataframe_reader.py @@ -1859,6 +1859,8 @@ def create_oracledb_connection(): fetch_size=fetch_size, imports=udtf_configs.get("imports", None), packages=udtf_configs.get("packages", None), + session_init_statement=session_init_statement, + query_timeout=query_timeout, _emit_ast=_emit_ast, ) end_time = time.perf_counter() diff --git a/tests/integ/datasource/test_databricks.py b/tests/integ/datasource/test_databricks.py index 954dec122d..63c496f1c5 100644 --- a/tests/integ/datasource/test_databricks.py +++ b/tests/integ/datasource/test_databricks.py @@ -17,7 +17,10 @@ random_name_for_temp_object, TempObjectType, ) -from snowflake.snowpark.exceptions import SnowparkDataframeReaderException +from snowflake.snowpark.exceptions import ( + SnowparkDataframeReaderException, + SnowparkSQLException, +) from snowflake.snowpark.types import ( StructType, StructField, @@ -205,7 +208,9 @@ def local_create_databricks_connection(): def test_unit_udtf_ingestion(): dbx_driver = DatabricksDriver(create_databricks_connection, DBMS_TYPE.DATABRICKS_DB) - udtf_ingestion_class = dbx_driver.udtf_class_builder() + udtf_ingestion_class = dbx_driver.udtf_class_builder( + session_init_statement=["select 1"] + ) udtf_ingestion_instance = udtf_ingestion_class() dsp = DataSourcePartitioner( @@ -258,3 +263,37 @@ def test_unsupported_type(): create_databricks_connection, DBMS_TYPE.DATABRICKS_DB ).to_snow_type([("test_col", "unsupported_type", True)]) assert schema == StructType([StructField("TEST_COL", StringType(), nullable=True)]) + + +def test_session_init(session): + with pytest.raises( + SnowparkDataframeReaderException, + match="syntax error command", + ): + session.read.dbapi( + create_databricks_connection, + table=TEST_TABLE_NAME, + session_init_statement=["syntax error command"], + ) + + +def test_session_init_udtf(session): + udtf_configs = { + "external_access_integration": DATABRICKS_TEST_EXTERNAL_ACCESS_INTEGRATION + } + + def create_databricks_udtf_connection(): + import databricks.sql + + return databricks.sql.connect(**DATABRICKS_CONNECTION_PARAMETERS) + + with pytest.raises( + SnowparkSQLException, + match="syntax error command", + ): + session.read.dbapi( + create_databricks_udtf_connection, + table=TEST_TABLE_NAME, + session_init_statement=["syntax error command"], + udtf_configs=udtf_configs, + ).collect() diff --git a/tests/integ/datasource/test_mysql.py b/tests/integ/datasource/test_mysql.py index 6878437a68..842f386dcf 100644 --- a/tests/integ/datasource/test_mysql.py +++ b/tests/integ/datasource/test_mysql.py @@ -15,6 +15,10 @@ ) from snowflake.snowpark._internal.data_source.utils import DBMS_TYPE from snowflake.snowpark.types import StructType, StructField, StringType +from snowflake.snowpark.exceptions import ( + SnowparkDataframeReaderException, + SnowparkSQLException, +) from tests.resources.test_data_source_dir.test_mysql_data import ( mysql_real_data, MysqlType, @@ -261,7 +265,9 @@ def test_pymysql_driver_udtf_class_builder(): driver = PymysqlDriver(create_connection_mysql, DBMS_TYPE.MYSQL_DB) # Get the UDTF class with a small fetch size to test batching - UDTFClass = driver.udtf_class_builder(fetch_size=2) + UDTFClass = driver.udtf_class_builder( + fetch_size=2, session_init_statement=["select 1"] + ) # Instantiate the UDTF class udtf_instance = UDTFClass() @@ -297,3 +303,47 @@ def test_unsupported_type(): [("test_col", "unsupported_type", None, None, 0, 0, True)] ) assert schema == StructType([StructField("TEST_COL", StringType(), nullable=True)]) + + +def test_session_init(session): + with pytest.raises( + SnowparkDataframeReaderException, + match="Mock error to test init_statement", + ): + session.read.dbapi( + create_connection_mysql, + table=TEST_TABLE_NAME, + session_init_statement=[ + "SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Mock error to test init_statement'" + ], + ) + + +def test_session_init_udtf(session): + udtf_configs = { + "external_access_integration": MYSQL_TEST_EXTERNAL_ACCESS_INTEGRATION + } + + def create_connection_udtf_mysql(): + import pymysql # noqa: F811 + + conn = pymysql.connect( + user=MYSQL_CONNECTION_PARAMETERS["username"], + password=MYSQL_CONNECTION_PARAMETERS["password"], + host=MYSQL_CONNECTION_PARAMETERS["host"], + database=MYSQL_CONNECTION_PARAMETERS["database"], + ) + return conn + + with pytest.raises( + SnowparkSQLException, + match="Mock error to test init_statement", + ): + session.read.dbapi( + create_connection_udtf_mysql, + table=TEST_TABLE_NAME, + session_init_statement=[ + "SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Mock error to test init_statement'" + ], + udtf_configs=udtf_configs, + ).collect() diff --git a/tests/integ/datasource/test_oracledb.py b/tests/integ/datasource/test_oracledb.py index 7360f8a74c..6b5178590c 100644 --- a/tests/integ/datasource/test_oracledb.py +++ b/tests/integ/datasource/test_oracledb.py @@ -20,6 +20,10 @@ DBMS_TYPE, ) from snowflake.snowpark.types import StructType, StructField, StringType +from snowflake.snowpark.exceptions import ( + SnowparkDataframeReaderException, + SnowparkSQLException, +) from tests.parameters import ORACLEDB_CONNECTION_PARAMETERS from tests.resources.test_data_source_dir.test_data_source_data import ( OracleDBType, @@ -248,3 +252,85 @@ def test_unsupported_type(): create_connection_oracledb, DBMS_TYPE.ORACLE_DB ).to_snow_type([MockDescription("test_col", invalid_type, 0, 0, True)]) assert schema == StructType([StructField("TEST_COL", StringType(), nullable=True)]) + + +def test_query_timeout_and_session_init(session): + statement = """ + BEGIN + DBMS_LOCK.SLEEP(5); + END; +""" + with pytest.raises(SnowparkDataframeReaderException) as error: + session.read.dbapi( + create_connection_oracledb, + table=ORACLEDB_TABLE_NAME, + query_timeout=1, + session_init_statement=[statement], + ) + assert "socket timed out while recovering from previous socket timeout" in str( + error.value + ) or "call timeout of 1000 ms exceeded" in str(error.value) + + +def test_query_timeout_and_session_init_udtf(session): + udtf_configs = { + "external_access_integration": ORACLEDB_TEST_EXTERNAL_ACCESS_INTEGRATION + } + statement = """ + BEGIN + DBMS_LOCK.SLEEP(5); + END; + """ + + def create_connection_udtf_oracledb(): + import oracledb + + host = ORACLEDB_CONNECTION_PARAMETERS["host"] + port = ORACLEDB_CONNECTION_PARAMETERS["port"] + service_name = ORACLEDB_CONNECTION_PARAMETERS["service_name"] + username = ORACLEDB_CONNECTION_PARAMETERS["username"] + password = ORACLEDB_CONNECTION_PARAMETERS["password"] + dsn = f"{host}:{port}/{service_name}" + connection = oracledb.connect(user=username, password=password, dsn=dsn) + return connection + + with pytest.raises( + SnowparkSQLException, + match="call timeout of 1000 ms exceeded", + ): + session.read.dbapi( + create_connection_udtf_oracledb, + table=ORACLEDB_TABLE_NAME, + query_timeout=1, + session_init_statement=[statement], + udtf_configs=udtf_configs, + ).collect() + + +def test_oracledb_driver_udtf_class_builder(): + """Test the UDTF class builder in OracledbDriver using a real Oracledb connection""" + # Create the driver with the real connection function + driver = OracledbDriver(create_connection_oracledb, DBMS_TYPE.ORACLE_DB) + + # Get the UDTF class with a small fetch size to test batching + UDTFClass = driver.udtf_class_builder( + fetch_size=2, session_init_statement=["select 1 from dual"], query_timeout=1 + ) + + # Instantiate the UDTF class + udtf_instance = UDTFClass() + + # Test with a simple query that should return a few rows + test_query = f"SELECT * FROM {ORACLEDB_TABLE_NAME}" + result_rows = list(udtf_instance.process(test_query)) + + # Verify we got some data back (we know the test table has data from other tests) + assert len(result_rows) > 0 + + # Test with a query that returns specific columns + test_columns_query = f"SELECT ID, NUMBER_COL FROM {ORACLEDB_TABLE_NAME}" + column_result_rows = list(udtf_instance.process(test_columns_query)) + + # Verify we got data with the right structure (2 columns) + assert len(column_result_rows) > 0 + assert len(column_result_rows[0]) == 2 # Two columns diff --git a/tests/integ/datasource/test_postgres.py b/tests/integ/datasource/test_postgres.py index 82e1b13a8f..99088268e2 100644 --- a/tests/integ/datasource/test_postgres.py +++ b/tests/integ/datasource/test_postgres.py @@ -11,7 +11,10 @@ Psycopg2TypeCode, ) from snowflake.snowpark._internal.data_source.utils import DBMS_TYPE -from snowflake.snowpark.exceptions import SnowparkDataframeReaderException +from snowflake.snowpark.exceptions import ( + SnowparkDataframeReaderException, + SnowparkSQLException, +) from snowflake.snowpark.types import ( DecimalType, BinaryType, @@ -102,10 +105,10 @@ def test_error_case(session, input_type, input_value, error_message): session.read.dbapi(create_postgres_connection, **input_dict) -def test_query_timeout(session): +def test_query_timeout_and_session_init(session): with pytest.raises( SnowparkDataframeReaderException, - match=r"due to exception 'QueryCanceled\('canceling statement due to statement timeout", + match="canceling statement due to statement timeout", ): session.read.dbapi( create_postgres_connection, @@ -115,6 +118,27 @@ def test_query_timeout(session): ) +def test_query_timeout_and_session_init_udtf(session): + udtf_configs = { + "external_access_integration": POSTGRES_TEST_EXTERNAL_ACCESS_INTEGRATION + } + + def create_postgres_udtf_connection(): + return psycopg2.connect(**POSTGRES_CONNECTION_PARAMETERS) + + with pytest.raises( + SnowparkSQLException, + match="canceling statement due to statement timeout", + ): + session.read.dbapi( + create_postgres_udtf_connection, + table=POSTGRES_TABLE_NAME, + query_timeout=1, + session_init_statement=["SELECT pg_sleep(5)"], + udtf_configs=udtf_configs, + ).collect() + + def test_external_access_integration_not_set(session): with pytest.raises( ValueError, @@ -186,7 +210,9 @@ def test_psycopg2_driver_udtf_class_builder(): driver = Psycopg2Driver(create_postgres_connection, DBMS_TYPE.POSTGRES_DB) # Get the UDTF class with a small fetch size to test batching - UDTFClass = driver.udtf_class_builder(fetch_size=2) + UDTFClass = driver.udtf_class_builder( + fetch_size=2, session_init_statement=["SELECT pg_sleep(1)"] + ) # Instantiate the UDTF class udtf_instance = UDTFClass() diff --git a/tests/integ/test_data_source_api.py b/tests/integ/test_data_source_api.py index 7de529432e..5d7d10de84 100644 --- a/tests/integ/test_data_source_api.py +++ b/tests/integ/test_data_source_api.py @@ -558,7 +558,7 @@ def test_session_init_statement(session, fetch_with_process): with pytest.raises( SnowparkDataframeReaderException, - match=r'Failed to execute session init statement: \'SELECT FROM NOTHING;\' due to exception \'OperationalError\(\'near "FROM": syntax error\'\)\'', + match="Failed to execute session init statement:", ): session.read.dbapi( functools.partial(create_connection_to_sqlite3_db, dbpath), @@ -1018,7 +1018,7 @@ def fetchmany(self, row_count: int): driver.to_snow_type(raw_schema), partitions_table, "", - packages=["pyodbc"], + packages=["pyodbc", "snowflake-snowpark-python"], ) Utils.check_answer(df, sql_server_udtf_ingestion_data) @@ -1697,3 +1697,33 @@ def test_error_in_upload_is_raised(session): create_connection=sql_server_create_connection, table=SQL_SERVER_TABLE_NAME, ) + + +@pytest.mark.skipif( + IS_WINDOWS, + reason="sqlite3 file can not be shared across processes on windows", +) +def test_base_driver_udtf_class_builder(): + with tempfile.TemporaryDirectory() as temp_dir: + dbpath = os.path.join(temp_dir, "sqlite3udtf.db") + table_name, columns, example_data, _ = sqlite3_db(dbpath) + # Create the driver with the real connection function + driver = BaseDriver( + functools.partial(create_connection_to_sqlite3_db, dbpath), + DBMS_TYPE.UNKNOWN, + ) + + # Get the UDTF class with a small fetch size to test batching + UDTFClass = driver.udtf_class_builder( + fetch_size=2, session_init_statement=["select 1"] + ) + + # Instantiate the UDTF class + udtf_instance = UDTFClass() + + # Test with a simple query that should return a few rows + test_query = f"SELECT * FROM {table_name}" + result_rows = list(udtf_instance.process(test_query)) + + # Verify we got some data back (we know the test table has data from other tests) + assert len(result_rows) > 0 diff --git a/tests/resources/test_data_source_dir/test_data_source_data.py b/tests/resources/test_data_source_dir/test_data_source_data.py index 2d7e7a3949..6d42ad08ee 100644 --- a/tests/resources/test_data_source_dir/test_data_source_data.py +++ b/tests/resources/test_data_source_dir/test_data_source_data.py @@ -49,6 +49,12 @@ def cursor(self): def close(self): pass + def get_output_converter(self, type): + pass + + def add_output_converter(self, type1, type2): + pass + @property def description(self): return self.schema