Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 70 additions & 22 deletions accel.c
Original file line number Diff line number Diff line change
Expand Up @@ -1950,7 +1950,6 @@ static PyObject *read_row_from_packet(
case ACCEL_OUT_DICTS:
case ACCEL_OUT_ARROW:
PyDict_SetItem(py_result, py_state->py_names[i], py_item);
Py_INCREF(py_state->py_names[i]);
Py_DECREF(py_item);
break;
default:
Expand Down Expand Up @@ -2678,8 +2677,19 @@ static PyObject *load_rowdat_1_numpy(PyObject *self, PyObject *args, PyObject *k

exit:
if (ctypes) free(ctypes);
if (out_cols) free(out_cols);
if (mask_cols) free(mask_cols);
if (out_cols) {
for (i = 0; i < n_cols; i++) {
if (out_cols[i]) free(out_cols[i]);
}
free(out_cols);
}
if (mask_cols) {
for (i = 0; i < n_cols; i++) {
if (mask_cols[i]) free(mask_cols[i]);
}
free(mask_cols);
}
if (out_row_ids) free(out_row_ids);
if (data_formats) free(data_formats);
if (item_sizes) free(item_sizes);

Expand Down Expand Up @@ -2943,11 +2953,17 @@ static PyObject *dump_rowdat_1_numpy(PyObject *self, PyObject *args, PyObject *k
out_l = 256 * n_cols;
out_idx = 0;
out = malloc(out_l);
if (!out) goto error;
if (!out) {
PyErr_SetString(PyExc_MemoryError, "failed to allocate output buffer");
goto error;
}

// Get return types
returns = malloc(sizeof(int) * n_cols);
if (!returns) goto error;
if (!returns) {
PyErr_SetString(PyExc_MemoryError, "failed to allocate returns array");
goto error;
}

for (i = 0; i < n_cols; i++) {
PyObject *py_item = PySequence_GetItem(py_returns, i);
Expand All @@ -2959,11 +2975,20 @@ static PyObject *dump_rowdat_1_numpy(PyObject *self, PyObject *args, PyObject *k

// Get column array memory
cols = calloc(sizeof(char*), n_cols);
if (!cols) goto error;
if (!cols) {
PyErr_SetString(PyExc_MemoryError, "failed to allocate cols array");
goto error;
}
col_types = calloc(sizeof(NumpyColType), n_cols);
if (!col_types) goto error;
if (!col_types) {
PyErr_SetString(PyExc_MemoryError, "failed to allocate col_types array");
goto error;
}
masks = calloc(sizeof(char*), n_cols);
if (!masks) goto error;
if (!masks) {
PyErr_SetString(PyExc_MemoryError, "failed to allocate masks array");
goto error;
}
for (i = 0; i < n_cols; i++) {
PyObject *py_item = PyList_GetItem(py_cols, i);
if (!py_item) goto error;
Expand Down Expand Up @@ -2996,8 +3021,12 @@ static PyObject *dump_rowdat_1_numpy(PyObject *self, PyObject *args, PyObject *k
#define CHECKMEM(x) \
if ((out_idx + x) > out_l) { \
out_l = out_l * 2 + x; \
out = realloc(out, out_l); \
if (!out) goto error; \
char *new_out = realloc(out, out_l); \
if (!new_out) { \
PyErr_SetString(PyExc_MemoryError, "failed to reallocate output buffer"); \
goto error; \
} \
out = new_out; \
}

for (j = 0; j < n_rows; j++) {
Expand Down Expand Up @@ -4079,10 +4108,10 @@ static PyObject *dump_rowdat_1_numpy(PyObject *self, PyObject *args, PyObject *k
}
}

py_out = PyMemoryView_FromMemory(out, out_idx, PyBUF_WRITE);
if (!py_out) goto error;
py_out = PyBytes_FromStringAndSize(out, out_idx);

exit:
if (out) free(out);
if (returns) free(returns);
if (masks) free(masks);
if (cols) free(cols);
Expand All @@ -4091,7 +4120,6 @@ static PyObject *dump_rowdat_1_numpy(PyObject *self, PyObject *args, PyObject *k
return py_out;

error:
if (!py_out && out) free(out);
Py_XDECREF(py_out);
py_out = NULL;

Expand Down Expand Up @@ -4471,8 +4499,12 @@ static PyObject *dump_rowdat_1(PyObject *self, PyObject *args, PyObject *kwargs)
#define CHECKMEM(x) \
if ((out_idx + x) > out_l) { \
out_l = out_l * 2 + x; \
out = realloc(out, out_l); \
if (!out) goto error; \
char *new_out = realloc(out, out_l); \
if (!new_out) { \
PyErr_SetString(PyExc_MemoryError, "failed to reallocate output buffer"); \
goto error; \
} \
out = new_out; \
}

py_rows_iter = PyObject_GetIter(py_rows);
Expand All @@ -4483,12 +4515,20 @@ static PyObject *dump_rowdat_1(PyObject *self, PyObject *args, PyObject *kwargs)

while ((py_row = PyIter_Next(py_rows_iter))) {
py_row_iter = PyObject_GetIter(py_row);
if (!py_row_iter) goto error;
if (!py_row_iter) {
Py_DECREF(py_row);
goto error;
}

// First item is always a row ID
py_item = PyIter_Next(py_row_ids_iter);
if (!py_item) goto error;
if (!py_item) {
Py_DECREF(py_row_iter);
Py_DECREF(py_row);
goto error;
}
row_id = (int64_t)PyLong_AsLongLong(py_item);
Py_DECREF(py_item);

CHECKMEM(8);
memcpy(out+out_idx, &row_id, 8);
Expand Down Expand Up @@ -4631,12 +4671,16 @@ static PyObject *dump_rowdat_1(PyObject *self, PyObject *args, PyObject *kwargs)
out_idx += 8;
} else {
PyObject *py_bytes = PyUnicode_AsEncodedString(py_item, "utf-8", "strict");
if (!py_bytes) goto error;
if (!py_bytes) {
Py_DECREF(py_item);
goto error;
}

char *str = NULL;
Py_ssize_t str_l = 0;
if (PyBytes_AsStringAndSize(py_bytes, &str, &str_l) < 0) {
Py_DECREF(py_bytes);
Py_DECREF(py_item);
goto error;
}

Expand Down Expand Up @@ -4671,6 +4715,7 @@ static PyObject *dump_rowdat_1(PyObject *self, PyObject *args, PyObject *kwargs)
char *str = NULL;
Py_ssize_t str_l = 0;
if (PyBytes_AsStringAndSize(py_item, &str, &str_l) < 0) {
Py_DECREF(py_item);
goto error;
}

Expand All @@ -4684,6 +4729,7 @@ static PyObject *dump_rowdat_1(PyObject *self, PyObject *args, PyObject *kwargs)
break;

default:
Py_DECREF(py_item);
goto error;
}

Expand All @@ -4693,14 +4739,17 @@ static PyObject *dump_rowdat_1(PyObject *self, PyObject *args, PyObject *kwargs)
i++;
}

Py_DECREF(py_row_iter);
Py_DECREF(py_row);
py_row_iter = NULL;
py_row = NULL;
}

py_out = PyMemoryView_FromMemory(out, out_idx, PyBUF_WRITE);
if (!py_out) goto error;
// Convert the output buffer to a Python bytes object and free the buffer
py_out = PyBytes_FromStringAndSize(out, out_idx);

exit:
if (out) free(out);
if (returns) free(returns);

Py_XDECREF(py_item);
Expand All @@ -4712,7 +4761,6 @@ static PyObject *dump_rowdat_1(PyObject *self, PyObject *args, PyObject *kwargs)
return py_out;

error:
if (!py_out && out) free(out);
Py_XDECREF(py_out);
py_out = NULL;

Expand Down Expand Up @@ -4839,7 +4887,7 @@ PyMODINIT_FUNC PyInit__singlestoredb_accel(void) {

PyObj.create_numpy_array_kwargs = PyDict_New();
if (!PyObj.create_numpy_array_kwargs) goto error;
if (PyDict_SetItemString(PyObj.create_numpy_array_kwargs, "copy", Py_False)) {
if (PyDict_SetItemString(PyObj.create_numpy_array_kwargs, "copy", Py_True)) {
goto error;
}

Expand Down
36 changes: 18 additions & 18 deletions singlestoredb/tests/test_ext_func_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class TestRowdat1(unittest.TestCase):
def test_numpy_accel(self):
dump_res = rowdat_1._dump_numpy_accel(
col_types, numpy_row_ids, numpy_data,
).tobytes()
)
load_res = rowdat_1._load_numpy_accel(col_spec, dump_res)

ids = load_res[0]
Expand All @@ -294,7 +294,7 @@ def test_numpy_accel(self):
def test_numpy(self):
dump_res = rowdat_1._dump_numpy(
col_types, numpy_row_ids, numpy_data,
).tobytes()
)
load_res = rowdat_1._load_numpy(col_spec, dump_res)

ids = load_res[0]
Expand Down Expand Up @@ -387,7 +387,7 @@ def test_numpy_accel_limits(self, name, dtype, data, res):
with self.assertRaises(res, msg=f'Expected {res} for {data} in {dtype}'):
rowdat_1._dump_numpy_accel(
[dtype], numpy_row_ids, [(arr, None)],
).tobytes()
)

# Pure Python
if 'mediumint exceeds' in name:
Expand All @@ -396,21 +396,21 @@ def test_numpy_accel_limits(self, name, dtype, data, res):
with self.assertRaises(res, msg=f'Expected {res} for {data} in {dtype}'):
rowdat_1._dump_numpy(
[dtype], numpy_row_ids, [(arr, None)],
).tobytes()
)

else:
# Accelerated
dump_res = rowdat_1._dump_numpy_accel(
[dtype], numpy_row_ids, [(arr, None)],
).tobytes()
)
load_res = rowdat_1._load_numpy_accel([('x', dtype)], dump_res)
assert load_res[1][0][0] == res, \
f'Expected {res} for {data}, but got {load_res[1][0][0]} in {dtype}'

# Pure Python
dump_res = rowdat_1._dump_numpy(
[dtype], numpy_row_ids, [(arr, None)],
).tobytes()
)
load_res = rowdat_1._load_numpy([('x', dtype)], dump_res)
assert load_res[1][0][0] == res, \
f'Expected {res} for {data}, but got {load_res[1][0][0]} in {dtype}'
Expand Down Expand Up @@ -788,7 +788,7 @@ def test_numpy_accel_casts(self, name, dtype, data, res):
# Accelerated
dump_res = rowdat_1._dump_numpy_accel(
[dtype], numpy_row_ids, [(data, None)],
).tobytes()
)
load_res = rowdat_1._load_numpy_accel([('x', dtype)], dump_res)

if name == 'double from float32':
Expand All @@ -800,7 +800,7 @@ def test_numpy_accel_casts(self, name, dtype, data, res):
# Pure Python
dump_res = rowdat_1._dump_numpy(
[dtype], numpy_row_ids, [(data, None)],
).tobytes()
)
load_res = rowdat_1._load_numpy([('x', dtype)], dump_res)

if name == 'double from float32':
Expand All @@ -812,7 +812,7 @@ def test_numpy_accel_casts(self, name, dtype, data, res):
def test_python(self):
dump_res = rowdat_1._dump(
col_types, py_row_ids, py_col_data,
).tobytes()
)
load_res = rowdat_1._load(col_spec, dump_res)

ids = load_res[0]
Expand All @@ -824,7 +824,7 @@ def test_python(self):
def test_python_accel(self):
dump_res = rowdat_1._dump_accel(
col_types, py_row_ids, py_col_data,
).tobytes()
)
load_res = rowdat_1._load_accel(col_spec, dump_res)

ids = load_res[0]
Expand All @@ -836,7 +836,7 @@ def test_python_accel(self):
def test_polars(self):
dump_res = rowdat_1._dump_polars(
col_types, polars_row_ids, polars_data,
).tobytes()
)
load_res = rowdat_1._load_polars(col_spec, dump_res)

ids = load_res[0]
Expand All @@ -861,7 +861,7 @@ def test_polars(self):
def test_polars_accel(self):
dump_res = rowdat_1._dump_polars_accel(
col_types, polars_row_ids, polars_data,
).tobytes()
)
load_res = rowdat_1._load_polars_accel(col_spec, dump_res)

ids = load_res[0]
Expand All @@ -886,7 +886,7 @@ def test_polars_accel(self):
def test_pandas(self):
dump_res = rowdat_1._dump_pandas(
col_types, pandas_row_ids, pandas_data,
).tobytes()
)
load_res = rowdat_1._load_pandas(col_spec, dump_res)

ids = load_res[0]
Expand All @@ -911,7 +911,7 @@ def test_pandas(self):
def test_pandas_accel(self):
dump_res = rowdat_1._dump_pandas_accel(
col_types, pandas_row_ids, pandas_data,
).tobytes()
)
load_res = rowdat_1._load_pandas_accel(col_spec, dump_res)

ids = load_res[0]
Expand All @@ -936,7 +936,7 @@ def test_pandas_accel(self):
def test_pyarrow(self):
dump_res = rowdat_1._dump_arrow(
col_types, pyarrow_row_ids, pyarrow_data,
).tobytes()
)
load_res = rowdat_1._load_arrow(col_spec, dump_res)

ids = load_res[0]
Expand All @@ -961,7 +961,7 @@ def test_pyarrow(self):
def test_pyarrow_accel(self):
dump_res = rowdat_1._dump_arrow_accel(
col_types, pyarrow_row_ids, pyarrow_data,
).tobytes()
)
load_res = rowdat_1._load_arrow_accel(col_spec, dump_res)

ids = load_res[0]
Expand Down Expand Up @@ -1053,7 +1053,7 @@ def test_polars(self):
def test_pandas(self):
dump_res = rowdat_1._dump_pandas(
col_types, pandas_row_ids, pandas_data,
).tobytes()
)
load_res = rowdat_1._load_pandas(col_spec, dump_res)

ids = load_res[0]
Expand All @@ -1078,7 +1078,7 @@ def test_pandas(self):
def test_pyarrow(self):
dump_res = rowdat_1._dump_arrow(
col_types, pyarrow_row_ids, pyarrow_data,
).tobytes()
)
load_res = rowdat_1._load_arrow(col_spec, dump_res)

ids = load_res[0]
Expand Down
Loading