Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pyodbc.conf
tmp
tags
xxx_*
.DS_Store

# The Access unit tests copy empty.accdb and empty.mdb to these names and use them.
test.accdb
Expand Down
146 changes: 146 additions & 0 deletions src/cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,151 @@ static PyObject* Cursor_tables(PyObject* self, PyObject* args, PyObject* kwargs)
}


static char tablePrivileges_doc[] =
"C.tablePrivileges(table=None, catalog=None, schema=None) --> self\n"
"\n"
"Executes SQLTablePrivileges and creates a results set of tables and the\n"
"privileges associated with those tables.\n"
"\n"
"Pattern strings can be provided to filter the results set. Some data\n"
"sources add additional filtering to suppress rows based on the rights\n"
"of the current user.\n"
"\n"
"For the table and schema values the '_' and '%' characters are interpreted\n"
"as wildcards. The escape character is driver specific, so you should use\n"
"the Connection.searchescape property if you need to include one of the\n"
"wildcard characters as part of the name being searched.\n"
"\n"
"If the SQL_ATTR_METADATA_ID statement attribute is set to SQL_TRUE, the\n"
"arguments are treated as identifiers and their case is not significant.\n"
"Otherwise they are treated literally, and case is significant.\n"
"\n"
"table\n"
" Optional string search pattern for table names.\n\n"
"catalog\n"
" Table catalog name. If a driver supports catalogs for some tables but\n"
" not for others, such as when the driver retrieves data from different\n"
" DBMSs, an empty string ('') denotes those tables that do not have\n"
" catalogs. This argument cannot contain a search string pattern.\n\n"
"schema\n"
" Search string pattern for schema names. If a driver supports schemas\n"
" for some tables but not others, an empty string ('') is used to restrict\n"
" the results to tables that do not have schemas.\n\n"
"Each row fetched has the following columns:\n"
" 0) table_cat\n"
" 1) table_schem\n"
" 2) table_name\n"
" 3) grantor\n"
" 4) grantee\n"
" 5) privilege\n"
" 6) is_grantable";


static PyObject* Cursor_tablePrivileges(PyObject* self, PyObject* args, PyObject* kwargs)
{
PyObject* pCatalog = 0;
PyObject* pSchema = 0;
PyObject* pTable = 0;

char* kwnames[] = { "table", "catalog", "schema", 0 };
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OOO", kwnames, &pTable, &pCatalog, &pSchema))
return 0;

Cursor* cur = Cursor_Validate(self, CURSOR_REQUIRE_OPEN);

if (!free_results(cur, FREE_STATEMENT | FREE_PREPARED))
return 0;

SQLRETURN ret = 0;

// Use the cursor's encoding.
const TextEnc* penc = &cur->cnxn->unicode_enc;
bool isWide = penc->ctype == SQL_C_WCHAR;
Object oTable;
Object oCatalog;
Object oSchema;
if (pTable && pTable != Py_None)
{
oTable = penc->Encode(pTable);
if (!oTable)
return 0;
}
if (pCatalog && pCatalog != Py_None)
{
oCatalog = penc->Encode(pCatalog);
if (!oCatalog)
return 0;
}
if (pSchema && pSchema != Py_None)
{
oSchema = penc->Encode(pSchema);
if (!oSchema)
return 0;
}
char* szTable = 0;
SQLSMALLINT cchTable = SQL_NTS;
if (oTable)
{
szTable = PyBytes_AS_STRING(oTable.Get());
if (isWide)
cchTable = (SQLSMALLINT)(PyBytes_GET_SIZE(oTable.Get()) / sizeof(uint16_t));
}
char* szCatalog = 0;
SQLSMALLINT cchCatalog = SQL_NTS;
if (oCatalog)
{
szCatalog = PyBytes_AS_STRING(oCatalog.Get());
if (isWide)
cchCatalog = (SQLSMALLINT)(PyBytes_GET_SIZE(oCatalog.Get()) / sizeof(uint16_t));
}
char* szSchema = 0;
SQLSMALLINT cchSchema = SQL_NTS;
if (oSchema)
{
szSchema = PyBytes_AS_STRING(oSchema.Get());
if (isWide)
cchSchema = (SQLSMALLINT)(PyBytes_GET_SIZE(oSchema.Get()) / sizeof(uint16_t));
}

Py_BEGIN_ALLOW_THREADS
if (isWide)
ret = SQLTablePrivilegesW(
cur->hstmt,
(SQLWCHAR*)szCatalog, cchCatalog,
(SQLWCHAR*)szSchema, cchSchema,
(SQLWCHAR*)szTable, cchTable
);
else
ret = SQLTablePrivileges(
cur->hstmt,
(SQLCHAR*)szCatalog, cchCatalog,
(SQLCHAR*)szSchema, cchSchema,
(SQLCHAR*)szTable, cchTable
);
Py_END_ALLOW_THREADS

if (!SQL_SUCCEEDED(ret))
return RaiseErrorFromHandle(cur->cnxn, "SQLTablePrivileges", cur->cnxn->hdbc, cur->hstmt);

SQLSMALLINT cCols;
Py_BEGIN_ALLOW_THREADS
ret = SQLNumResultCols(cur->hstmt, &cCols);
Py_END_ALLOW_THREADS
if (!SQL_SUCCEEDED(ret))
return RaiseErrorFromHandle(cur->cnxn, "SQLNumResultCols", cur->cnxn->hdbc, cur->hstmt);

if (!PrepareResults(cur, cCols))
return 0;

if (!create_name_map(cur, cCols, true))
return 0;

// Return the cursor so the results can be iterated over directly.
Py_INCREF(cur);
return (PyObject*)cur;
}


static char columns_doc[] =
"C.columns(table=None, catalog=None, schema=None, column=None)\n\n"
"Creates a results set of column names in specified tables by executing the ODBC SQLColumns function.\n"
Expand Down Expand Up @@ -2396,6 +2541,7 @@ static PyMethodDef Cursor_methods[] =
{ "fetchmany", (PyCFunction)Cursor_fetchmany, METH_VARARGS, fetchmany_doc },
{ "nextset", (PyCFunction)Cursor_nextset, METH_NOARGS, nextset_doc },
{ "tables", (PyCFunction)Cursor_tables, METH_VARARGS|METH_KEYWORDS, tables_doc },
{ "tablePrivileges", (PyCFunction)Cursor_tablePrivileges, METH_VARARGS|METH_KEYWORDS, tablePrivileges_doc },
{ "columns", (PyCFunction)Cursor_columns, METH_VARARGS|METH_KEYWORDS, columns_doc },
{ "statistics", (PyCFunction)Cursor_statistics, METH_VARARGS|METH_KEYWORDS, statistics_doc },
{ "rowIdColumns", (PyCFunction)Cursor_rowIdColumns, METH_VARARGS|METH_KEYWORDS, rowIdColumns_doc },
Expand Down
17 changes: 17 additions & 0 deletions src/pyodbc.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,23 @@ class Cursor:
"""
...

def tablesPrivileges(self,
table: str | None = None,
catalog: str | None = None,
schema: str | None = None) -> Cursor:
"""Return information about privileges granted for the tables in the
database.

Args:
table: Name of the database table.
catalog: Name of the catalog (database).
schema: Name of the table schema.

Returns:
The cursor object, containing table privilege info in the result set.
"""
...

def columns(self,
table: str | None = None,
catalog: str | None = None,
Expand Down
13 changes: 13 additions & 0 deletions tests/mysql_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,19 @@ def test_row_description(cursor: pyodbc.Cursor):
assert cursor.description == row.cursor_description


def test_table_privileges(cursor: pyodbc.Cursor):
# Confirm exposure of SQLTablePrivileges. We're limited in what we can test, as
# we can't control whether we're running with permission to create users or grant
# permissions. We can at least verify that the method generates a results set
# with the right columns.
cols = ["table_cat", "table_schem", "table_name", "grantor",
"grantee", "privilege", "is_grantable"]
cursor.tablePrivileges()
names = [col[0] for col in cursor.description]
assert len(cols) == len(names), "privileges results set has the wrong shape"
assert cols == names, "unexpected column names for privileges results set"


def test_executemany(cursor: pyodbc.Cursor):
cursor.execute("create table t1(a int, b varchar(10))")

Expand Down
14 changes: 14 additions & 0 deletions tests/postgresql_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,20 @@ def _get_column_size(row):
assert row.type_name == 'varchar'
assert _get_column_size(row) == 3


def test_table_privileges(cursor: pyodbc.Cursor):
# Confirm exposure of SQLTablePrivileges. We're limited in what we can test, as
# we can't control whether we're running with permission to create users or grant
# permissions. We can at least verify that the method generates a results set
# with the right columns.
cols = ["table_cat", "table_schem", "table_name", "grantor",
"grantee", "privilege", "is_grantable"]
cursor.tablePrivileges()
names = [col[0] for col in cursor.description]
assert len(cols) == len(names), "privileges results set has the wrong shape"
assert cols == names, "unexpected column names for privileges results set"


def test_cancel(cursor: pyodbc.Cursor):
# I'm not sure how to reliably cause a hang to cancel, so for now we'll settle with
# making sure SQLCancel is called correctly.
Expand Down
13 changes: 13 additions & 0 deletions tests/sqlserver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,19 @@ def test_columns(cursor: pyodbc.Cursor):
cursor.execute(f"drop table {table_name}")


def test_table_privileges(cursor: pyodbc.Cursor):
# Confirm exposure of SQLTablePrivileges. We're limited in what we can test, as
# we can't control whether we're running with permission to create users or grant
# permissions. We can at least verify that the method generates a results set
# with the right columns.
cols = ["table_cat", "table_schem", "table_name", "grantor",
"grantee", "privilege", "is_grantable"]
cursor.tablePrivileges()
names = [col[0] for col in cursor.description]
assert len(cols) == len(names), "privileges results set has the wrong shape"
assert cols == names, "unexpected column names for privileges results set"


def test_cancel(cursor: pyodbc.Cursor):
# I'm not sure how to reliably cause a hang to cancel, so for now we'll settle with
# making sure SQLCancel is called correctly.
Expand Down
Loading