Skip to content

Commit c717e89

Browse files
committed
Add support for SQLTablePrivileges
We're limited in what we can test, as we don't know that we'll be running with GRANT permissions in the pipeline test harness, but we can at least make sure the function returns a results set with the right shape and column names. No point in testing on SQLite, which doesn't implement a PRIVILEGES model. I also suppressed .DS_Store in .gitignore. Closes #1340
1 parent cfe0575 commit c717e89

5 files changed

Lines changed: 187 additions & 0 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ pyodbc.conf
5858
tmp
5959
tags
6060
xxx_*
61+
.DS_Store
6162

6263
# The Access unit tests copy empty.accdb and empty.mdb to these names and use them.
6364
test.accdb

src/cursor.cpp

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1411,6 +1411,151 @@ static PyObject* Cursor_tables(PyObject* self, PyObject* args, PyObject* kwargs)
14111411
}
14121412

14131413

1414+
static char tablePrivileges_doc[] =
1415+
"C.tablePrivileges(table=None, catalog=None, schema=None) --> self\n"
1416+
"\n"
1417+
"Executes SQLTablePrivileges and creates a results set of tables and the\n"
1418+
"privileges associated with those tables.\n"
1419+
"\n"
1420+
"Pattern strings can be provided to filter the results set. Some data\n"
1421+
"sources add additional filtering to suppress rows based on the rights\n"
1422+
"of the current user.\n"
1423+
"\n"
1424+
"For the table and schema values the '_' and '%' characters are interpreted\n"
1425+
"as wildcards. The escape character is driver specific, so you should use\n"
1426+
"the Connection.searchescape property if you need to include one of the\n"
1427+
"wildcard characters as part of the name being searched.\n"
1428+
"\n"
1429+
"If the SQL_ATTR_METADATA_ID statement attribute is set to SQL_TRUE, the\n"
1430+
"arguments are treated as identifiers and their case is not significant.\n"
1431+
"Otherwise they are treated literally, and case is significant.\n"
1432+
"\n"
1433+
"table\n"
1434+
" Optional string search pattern for table names.\n\n"
1435+
"catalog\n"
1436+
" Table catalog name. If a driver supports catalogs for some tables but\n"
1437+
" not for others, such as when the driver retrieves data from different\n"
1438+
" DBMSs, an empty string ('') denotes those tables that do not have\n"
1439+
" catalogs. This argument cannot contain a search string pattern.\n\n"
1440+
"schema\n"
1441+
" Search string pattern for schema names. If a driver supports schemas\n"
1442+
" for some tables but not others, an empty string ('') is used to restrict\n"
1443+
" the results to tables that do not have schemas.\n\n"
1444+
"Each row fetched has the following columns:\n"
1445+
" 0) table_cat\n"
1446+
" 1) table_schem\n"
1447+
" 2) table_name\n"
1448+
" 3) grantor\n"
1449+
" 4) grantee\n"
1450+
" 5) privilege\n"
1451+
" 6) is_grantable";
1452+
1453+
1454+
static PyObject* Cursor_tablePrivileges(PyObject* self, PyObject* args, PyObject* kwargs)
1455+
{
1456+
PyObject* pCatalog = 0;
1457+
PyObject* pSchema = 0;
1458+
PyObject* pTable = 0;
1459+
1460+
char* kwnames[] = { "table", "catalog", "schema", 0 };
1461+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OOO", kwnames, &pTable, &pCatalog, &pSchema))
1462+
return 0;
1463+
1464+
Cursor* cur = Cursor_Validate(self, CURSOR_REQUIRE_OPEN);
1465+
1466+
if (!free_results(cur, FREE_STATEMENT | FREE_PREPARED))
1467+
return 0;
1468+
1469+
SQLRETURN ret = 0;
1470+
1471+
// Use the cursor's encoding.
1472+
const TextEnc* penc = &cur->cnxn->unicode_enc;
1473+
bool isWide = penc->ctype == SQL_C_WCHAR;
1474+
Object oTable;
1475+
Object oCatalog;
1476+
Object oSchema;
1477+
if (pTable && pTable != Py_None)
1478+
{
1479+
oTable = penc->Encode(pTable);
1480+
if (!oTable)
1481+
return 0;
1482+
}
1483+
if (pCatalog && pCatalog != Py_None)
1484+
{
1485+
oCatalog = penc->Encode(pCatalog);
1486+
if (!oCatalog)
1487+
return 0;
1488+
}
1489+
if (pSchema && pSchema != Py_None)
1490+
{
1491+
oSchema = penc->Encode(pSchema);
1492+
if (!oSchema)
1493+
return 0;
1494+
}
1495+
char* szTable = 0;
1496+
SQLSMALLINT cchTable = SQL_NTS;
1497+
if (oTable)
1498+
{
1499+
szTable = PyBytes_AS_STRING(oTable.Get());
1500+
if (isWide)
1501+
cchTable = (SQLSMALLINT)(PyBytes_GET_SIZE(oTable.Get()) / sizeof(uint16_t));
1502+
}
1503+
char* szCatalog = 0;
1504+
SQLSMALLINT cchCatalog = SQL_NTS;
1505+
if (oCatalog)
1506+
{
1507+
szCatalog = PyBytes_AS_STRING(oCatalog.Get());
1508+
if (isWide)
1509+
cchCatalog = (SQLSMALLINT)(PyBytes_GET_SIZE(oCatalog.Get()) / sizeof(uint16_t));
1510+
}
1511+
char* szSchema = 0;
1512+
SQLSMALLINT cchSchema = SQL_NTS;
1513+
if (oSchema)
1514+
{
1515+
szSchema = PyBytes_AS_STRING(oSchema.Get());
1516+
if (isWide)
1517+
cchSchema = (SQLSMALLINT)(PyBytes_GET_SIZE(oSchema.Get()) / sizeof(uint16_t));
1518+
}
1519+
1520+
Py_BEGIN_ALLOW_THREADS
1521+
if (isWide)
1522+
ret = SQLTablePrivilegesW(
1523+
cur->hstmt,
1524+
(SQLWCHAR*)szCatalog, cchCatalog,
1525+
(SQLWCHAR*)szSchema, cchSchema,
1526+
(SQLWCHAR*)szTable, cchTable
1527+
);
1528+
else
1529+
ret = SQLTablePrivileges(
1530+
cur->hstmt,
1531+
(SQLCHAR*)szCatalog, cchCatalog,
1532+
(SQLCHAR*)szSchema, cchSchema,
1533+
(SQLCHAR*)szTable, cchTable
1534+
);
1535+
Py_END_ALLOW_THREADS
1536+
1537+
if (!SQL_SUCCEEDED(ret))
1538+
return RaiseErrorFromHandle(cur->cnxn, "SQLTablePrivileges", cur->cnxn->hdbc, cur->hstmt);
1539+
1540+
SQLSMALLINT cCols;
1541+
Py_BEGIN_ALLOW_THREADS
1542+
ret = SQLNumResultCols(cur->hstmt, &cCols);
1543+
Py_END_ALLOW_THREADS
1544+
if (!SQL_SUCCEEDED(ret))
1545+
return RaiseErrorFromHandle(cur->cnxn, "SQLNumResultCols", cur->cnxn->hdbc, cur->hstmt);
1546+
1547+
if (!PrepareResults(cur, cCols))
1548+
return 0;
1549+
1550+
if (!create_name_map(cur, cCols, true))
1551+
return 0;
1552+
1553+
// Return the cursor so the results can be iterated over directly.
1554+
Py_INCREF(cur);
1555+
return (PyObject*)cur;
1556+
}
1557+
1558+
14141559
static char columns_doc[] =
14151560
"C.columns(table=None, catalog=None, schema=None, column=None)\n\n"
14161561
"Creates a results set of column names in specified tables by executing the ODBC SQLColumns function.\n"
@@ -2396,6 +2541,7 @@ static PyMethodDef Cursor_methods[] =
23962541
{ "fetchmany", (PyCFunction)Cursor_fetchmany, METH_VARARGS, fetchmany_doc },
23972542
{ "nextset", (PyCFunction)Cursor_nextset, METH_NOARGS, nextset_doc },
23982543
{ "tables", (PyCFunction)Cursor_tables, METH_VARARGS|METH_KEYWORDS, tables_doc },
2544+
{ "tablePrivileges", (PyCFunction)Cursor_tablePrivileges, METH_VARARGS|METH_KEYWORDS, tablePrivileges_doc },
23992545
{ "columns", (PyCFunction)Cursor_columns, METH_VARARGS|METH_KEYWORDS, columns_doc },
24002546
{ "statistics", (PyCFunction)Cursor_statistics, METH_VARARGS|METH_KEYWORDS, statistics_doc },
24012547
{ "rowIdColumns", (PyCFunction)Cursor_rowIdColumns, METH_VARARGS|METH_KEYWORDS, rowIdColumns_doc },

tests/mysql_test.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,19 @@ def test_row_description(cursor: pyodbc.Cursor):
353353
assert cursor.description == row.cursor_description
354354

355355

356+
def test_table_privileges(cursor: pyodbc.Cursor):
357+
# Confirm exposure of SQLTablePrivileges. We're limited in what we can test, as
358+
# we can't control whether we're running with permission to create users or grant
359+
# permissions. We can at least verify that the method generates a results set
360+
# with the right columns.
361+
cols = ["table_cat", "table_schem", "table_name", "grantor",
362+
"grantee", "privilege", "is_grantable"]
363+
cursor.tablePrivileges()
364+
names = [col[0] for col in cursor.description]
365+
assert len(cols) == len(names), "privileges results set has the wrong shape"
366+
assert cols == names, "unexpected column names for privileges results set"
367+
368+
356369
def test_executemany(cursor: pyodbc.Cursor):
357370
cursor.execute("create table t1(a int, b varchar(10))")
358371

tests/postgresql_test.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,20 @@ def _get_column_size(row):
490490
assert row.type_name == 'varchar'
491491
assert _get_column_size(row) == 3
492492

493+
494+
def test_table_privileges(cursor: pyodbc.Cursor):
495+
# Confirm exposure of SQLTablePrivileges. We're limited in what we can test, as
496+
# we can't control whether we're running with permission to create users or grant
497+
# permissions. We can at least verify that the method generates a results set
498+
# with the right columns.
499+
cols = ["table_cat", "table_schem", "table_name", "grantor",
500+
"grantee", "privilege", "is_grantable"]
501+
cursor.tablePrivileges()
502+
names = [col[0] for col in cursor.description]
503+
assert len(cols) == len(names), "privileges results set has the wrong shape"
504+
assert cols == names, "unexpected column names for privileges results set"
505+
506+
493507
def test_cancel(cursor: pyodbc.Cursor):
494508
# I'm not sure how to reliably cause a hang to cancel, so for now we'll settle with
495509
# making sure SQLCancel is called correctly.

tests/sqlserver_test.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1426,6 +1426,19 @@ def test_columns(cursor: pyodbc.Cursor):
14261426
cursor.execute(f"drop table {table_name}")
14271427

14281428

1429+
def test_table_privileges(cursor: pyodbc.Cursor):
1430+
# Confirm exposure of SQLTablePrivileges. We're limited in what we can test, as
1431+
# we can't control whether we're running with permission to create users or grant
1432+
# permissions. We can at least verify that the method generates a results set
1433+
# with the right columns.
1434+
cols = ["table_cat", "table_schem", "table_name", "grantor",
1435+
"grantee", "privilege", "is_grantable"]
1436+
cursor.tablePrivileges()
1437+
names = [col[0] for col in cursor.description]
1438+
assert len(cols) == len(names), "privileges results set has the wrong shape"
1439+
assert cols == names, "unexpected column names for privileges results set"
1440+
1441+
14291442
def test_cancel(cursor: pyodbc.Cursor):
14301443
# I'm not sure how to reliably cause a hang to cancel, so for now we'll settle with
14311444
# making sure SQLCancel is called correctly.

0 commit comments

Comments
 (0)