From 8ea48ee2ad0f1ba71a1cf93f0516b697d523de9b Mon Sep 17 00:00:00 2001 From: laggykiller Date: Sun, 9 Feb 2025 22:52:57 +0800 Subject: [PATCH] Merging upstream changes to allow compiling python3.13 wheels --- pyproject.toml | 2 +- setup.py | 2 +- src/blob.c | 12 +- src/connection.c | 217 +++++++---- src/connection.h | 1 + src/cursor.c | 2 +- src/row.c | 2 - src/util.c | 6 + src/util.h | 42 ++ {test => tests}/__init__.py | 0 {test => tests}/__main__.py | 16 +- test/test_backup.py => tests/backup.py | 25 +- test/test_dbapi.py => tests/dbapi.py | 359 +++++++++--------- test/test_factory.py => tests/factory.py | 60 +-- test/test_hooks.py => tests/hooks.py | 84 ++-- .../test_regression.py => tests/regression.py | 68 ++-- .../transactions.py | 40 +- test/test_ttypes.py => tests/ttypes.py | 88 ++--- .../userfunctions.py | 91 +++-- 19 files changed, 639 insertions(+), 478 deletions(-) rename {test => tests}/__init__.py (100%) rename {test => tests}/__main__.py (67%) rename test/test_backup.py => tests/backup.py (87%) rename test/test_dbapi.py => tests/dbapi.py (84%) rename test/test_factory.py => tests/factory.py (89%) rename test/test_hooks.py => tests/hooks.py (81%) rename test/test_regression.py => tests/regression.py (90%) rename test/test_transactions.py => tests/transactions.py (91%) rename test/test_ttypes.py => tests/ttypes.py (89%) rename test/test_userfunctions.py => tests/userfunctions.py (89%) diff --git a/pyproject.toml b/pyproject.toml index 0ebafb4..a6e5c04 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sqlcipher3-wheels" -version = "0.5.2.post1" +version = "0.5.4" description = "DB-API 2.0 interface for SQLCipher 3.x" readme = { content-type = "text/markdown", file = "README.md" } authors = [{ name = "Charles Leifer", email = "coleifer@gmail.com" }] diff --git a/setup.py b/setup.py index 666a5ed..e20289d 100644 --- a/setup.py +++ b/setup.py @@ -226,7 +226,7 @@ if __name__ == "__main__": # With pyproject.toml, all are not necessary except ext_modules # However, they are kept for building python 3.6 wheels name="sqlcipher3-wheels", - version="0.5.2.post1", + version="0.5.4", package_dir={"sqlcipher3": "sqlcipher3"}, packages=["sqlcipher3"], ext_modules=[module], diff --git a/src/blob.c b/src/blob.c index 70b327a..045097d 100644 --- a/src/blob.c +++ b/src/blob.c @@ -24,13 +24,15 @@ int pysqlite_blob_init(pysqlite_Blob *self, pysqlite_Connection* connection, static void remove_blob_from_connection_blob_list(pysqlite_Blob *self) { Py_ssize_t i; - PyObject *item; + PyObject *item, *ref; for (i = 0; i < PyList_GET_SIZE(self->connection->blobs); i++) { item = PyList_GET_ITEM(self->connection->blobs, i); - if (PyWeakref_GetObject(item) == (PyObject *)self) { - PyList_SetSlice(self->connection->blobs, i, i+1, NULL); - break; + if (PyWeakref_GetRef(item, &ref) == 1) { + if (ref == (PyObject *)self) { + PyList_SetSlice(self->connection->blobs, i, i+1, NULL); + break; + } } } } @@ -649,4 +651,4 @@ extern int pysqlite_blob_setup_types(void) { pysqlite_BlobType.tp_new = PyType_GenericNew; return PyType_Ready(&pysqlite_BlobType); -} +} \ No newline at end of file diff --git a/src/connection.c b/src/connection.c index e4badeb..62a3773 100644 --- a/src/connection.c +++ b/src/connection.c @@ -56,6 +56,10 @@ #define HAVE_ENCRYPTION #endif +#if PY_VERSION_HEX < 0x030D0000 + #define PyLong_AsInt _PyLong_AsInt +#endif + _Py_IDENTIFIER(cursor); static const char * const begin_statements[] = { @@ -199,6 +203,7 @@ int pysqlite_connection_init(pysqlite_Connection* self, PyObject* args, PyObject self->function_pinboard_trace_callback = NULL; self->function_pinboard_progress_handler = NULL; self->function_pinboard_authorizer_cb = NULL; + self->function_pinboard_busy_handler_cb = NULL; Py_XSETREF(self->collations, PyDict_New()); if (!self->collations) { @@ -229,9 +234,7 @@ void pysqlite_do_all_statements(pysqlite_Connection* self, int action, int reset for (i = 0; i < PyList_Size(self->statements); i++) { weakref = PyList_GetItem(self->statements, i); - statement = PyWeakref_GetObject(weakref); - if (statement != Py_None) { - Py_INCREF(statement); + if (PyWeakref_GetRef(weakref, &statement) == 1) { if (action == ACTION_RESET) { (void)pysqlite_statement_reset((pysqlite_Statement*)statement); } else { @@ -244,9 +247,9 @@ void pysqlite_do_all_statements(pysqlite_Connection* self, int action, int reset if (reset_cursors) { for (i = 0; i < PyList_Size(self->cursors); i++) { weakref = PyList_GetItem(self->cursors, i); - cursor = (pysqlite_Cursor*)PyWeakref_GetObject(weakref); - if ((PyObject*)cursor != Py_None) { + if (PyWeakref_GetRef(weakref, (PyObject**)&cursor) == 1) { cursor->reset = 1; + Py_DECREF(cursor); } } } @@ -265,6 +268,7 @@ void pysqlite_connection_dealloc(pysqlite_Connection* self) Py_XDECREF(self->function_pinboard_trace_callback); Py_XDECREF(self->function_pinboard_progress_handler); Py_XDECREF(self->function_pinboard_authorizer_cb); + Py_XDECREF(self->function_pinboard_busy_handler_cb); Py_XDECREF(self->row_factory); Py_XDECREF(self->text_factory); Py_XDECREF(self->collations); @@ -412,9 +416,9 @@ static void pysqlite_close_all_blobs(pysqlite_Connection *self) for (i = 0; i < PyList_GET_SIZE(self->blobs); i++) { weakref = PyList_GET_ITEM(self->blobs, i); - blob = PyWeakref_GetObject(weakref); - if (blob != Py_None) { + if (PyWeakref_GetRef(weakref, &blob) == 1) { pysqlite_blob_close((pysqlite_Blob*)blob); + Py_DECREF(blob); } } } @@ -936,6 +940,7 @@ static void _pysqlite_drop_unused_statement_references(pysqlite_Connection* self { PyObject* new_list; PyObject* weakref; + PyObject* ref; int i; /* we only need to do this once in a while */ @@ -952,7 +957,8 @@ static void _pysqlite_drop_unused_statement_references(pysqlite_Connection* self for (i = 0; i < PyList_Size(self->statements); i++) { weakref = PyList_GetItem(self->statements, i); - if (PyWeakref_GetObject(weakref) != Py_None) { + if (PyWeakref_GetRef(weakref, &ref) == 1) { + Py_DECREF(ref); if (PyList_Append(new_list, weakref) != 0) { Py_DECREF(new_list); return; @@ -967,6 +973,7 @@ static void _pysqlite_drop_unused_cursor_references(pysqlite_Connection* self) { PyObject* new_list; PyObject* weakref; + PyObject* ref; int i; /* we only need to do this once in a while */ @@ -983,7 +990,8 @@ static void _pysqlite_drop_unused_cursor_references(pysqlite_Connection* self) for (i = 0; i < PyList_Size(self->cursors); i++) { weakref = PyList_GetItem(self->cursors, i); - if (PyWeakref_GetObject(weakref) != Py_None) { + if (PyWeakref_GetRef(weakref, &ref) == 1) { + Py_DECREF(ref); if (PyList_Append(new_list, weakref) != 0) { Py_DECREF(new_list); return; @@ -1154,7 +1162,7 @@ static int _authorizer_callback(void* user_arg, int action, const char* arg1, co } else { if (PyLong_Check(ret)) { - rc = _PyLong_AsInt(ret); + rc = PyLong_AsInt(ret); if (rc == -1 && PyErr_Occurred()) { if (_pysqlite_enable_callback_tracebacks) PyErr_Print(); @@ -1200,6 +1208,36 @@ static int _progress_handler(void* user_arg) return rc; } +static int _busy_handler(void* user_arg, int n) +{ + int rc; + PyObject *ret; + PyGILState_STATE gilstate; + + gilstate = PyGILState_Ensure(); + ret = PyObject_CallFunction((PyObject*)user_arg, "i", n); + + if (ret == NULL) { + if (_pysqlite_enable_callback_tracebacks) + PyErr_Print(); + else + PyErr_Clear(); + + rc = 0; + } + else { + if (PyLong_Check(ret)) + rc = PyLong_AsInt(ret); + else + rc = 0; + + Py_DECREF(ret); + } + + PyGILState_Release(gilstate); + return rc; +} + #ifdef HAVE_TRACE_V2 static int _trace_callback(unsigned int type, void *ctx, void *stmt, void *sql) { @@ -1336,6 +1374,68 @@ static PyObject* pysqlite_connection_set_progress_handler(pysqlite_Connection* s Py_RETURN_NONE; } +static PyObject* pysqlite_connection_set_busy_handler(pysqlite_Connection* self, PyObject* args, PyObject* kwargs) +{ + PyObject* busy_handler; + + static char *kwlist[] = { "busy_handler", NULL }; + + if (!pysqlite_check_thread(self) || !pysqlite_check_connection(self)) { + return NULL; + } + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O:set_busy_handler", + kwlist, &busy_handler)) { + return NULL; + } + + int rc; + if (busy_handler == Py_None) { + rc = sqlite3_busy_handler(self->db, NULL, NULL); + Py_XSETREF(self->function_pinboard_busy_handler_cb, NULL); + } + else { + Py_INCREF(busy_handler); + Py_XSETREF(self->function_pinboard_busy_handler_cb, busy_handler); + rc = sqlite3_busy_handler(self->db, _busy_handler, (void*)busy_handler); + } + + if (rc != SQLITE_OK) { + PyErr_SetString(pysqlite_OperationalError, "Error setting busy handler"); + Py_XSETREF(self->function_pinboard_busy_handler_cb, NULL); + return NULL; + } + Py_RETURN_NONE; +} + +static PyObject* pysqlite_connection_set_busy_timeout(pysqlite_Connection* self, PyObject* args, PyObject* kwargs) +{ + double busy_timeout; + + static char *kwlist[] = { "timeout", NULL }; + + if (!pysqlite_check_thread(self) || !pysqlite_check_connection(self)) { + return NULL; + } + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "d:set_busy_timeout", + kwlist, &busy_timeout)) { + return NULL; + } + + int rc; + rc = sqlite3_busy_timeout(self->db, (int)busy_timeout * 1000); + if (rc != SQLITE_OK) { + PyErr_SetString(pysqlite_OperationalError, "Error setting busy timeout"); + return NULL; + } + else { + Py_XDECREF(self->function_pinboard_busy_handler_cb); + } + + Py_RETURN_NONE; +} + static PyObject* pysqlite_connection_set_trace_callback(pysqlite_Connection* self, PyObject* args, PyObject* kwargs) { PyObject* trace_callback; @@ -1478,8 +1578,6 @@ pysqlite_connection_set_isolation_level(pysqlite_Connection* self, PyObject* iso self->begin_statement = NULL; } else { const char * const *candidate; - PyObject *uppercase_level; - _Py_IDENTIFIER(upper); if (!PyUnicode_Check(isolation_level)) { PyErr_Format(PyExc_TypeError, @@ -1488,17 +1586,14 @@ pysqlite_connection_set_isolation_level(pysqlite_Connection* self, PyObject* iso return -1; } - uppercase_level = _PyObject_CallMethodIdObjArgs( - (PyObject *)&PyUnicode_Type, &PyId_upper, - isolation_level, NULL); - if (!uppercase_level) { + const char *level = PyUnicode_AsUTF8(isolation_level); + if (level == NULL) { return -1; } for (candidate = begin_statements; *candidate; candidate++) { - if (_PyUnicode_EqualToASCIIString(uppercase_level, *candidate + 6)) + if (sqlite3_stricmp(level, *candidate + 6) == 0) break; } - Py_DECREF(uppercase_level); if (!*candidate) { PyErr_SetString(PyExc_ValueError, "invalid value for isolation_level"); @@ -1523,9 +1618,6 @@ PyObject* pysqlite_connection_call(pysqlite_Connection* self, PyObject* args, Py return NULL; } - if (!_PyArg_NoKeywords(MODULE_NAME ".Connection", kwargs)) - return NULL; - if (!PyArg_ParseTuple(args, "U", &sql)) return NULL; @@ -1742,35 +1834,18 @@ pysqlite_connection_backup(pysqlite_Connection *self, PyObject *args, PyObject * const char *name = "main"; int rc; int callback_error = 0; - PyObject *sleep_obj = NULL; - int sleep_ms = 250; + double sleep_s = 0.25; + int sleep_ms = 0; sqlite3 *bck_conn; sqlite3_backup *bck_handle; static char *keywords[] = {"target", "pages", "progress", "name", "sleep", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "O!|$iOsO:backup", keywords, + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O!|$iOsd:backup", keywords, &pysqlite_ConnectionType, &target, - &pages, &progress, &name, &sleep_obj)) { + &pages, &progress, &name, &sleep_s)) { return NULL; } - // XXX: We use _PyTime_ROUND_CEILING to support 3.6.x, but it should - // use _PyTime_ROUND_TIMEOUT instead. - if (sleep_obj != NULL) { - _PyTime_t sleep_secs; - if (_PyTime_FromSecondsObject(&sleep_secs, sleep_obj, - _PyTime_ROUND_CEILING)) { - return NULL; - } - _PyTime_t ms = _PyTime_AsMilliseconds(sleep_secs, - _PyTime_ROUND_CEILING); - if (ms < INT_MIN || ms > INT_MAX) { - PyErr_SetString(PyExc_OverflowError, "sleep is too large"); - return NULL; - } - sleep_ms = (int)ms; - } - if (!pysqlite_check_connection((pysqlite_Connection *)target)) { return NULL; } @@ -1779,6 +1854,11 @@ pysqlite_connection_backup(pysqlite_Connection *self, PyObject *args, PyObject * PyErr_SetString(PyExc_ValueError, "target cannot be the same connection instance"); return NULL; } + if (sleep_s < 0) { + PyErr_SetString(PyExc_ValueError, "sleep must be greater-than or equal to zero"); + return NULL; + } + sleep_ms = (int)(sleep_s * 1000.0); #if SQLITE_VERSION_NUMBER < 3008008 /* Since 3.8.8 this is already done, per commit @@ -1865,15 +1945,10 @@ static PyObject * pysqlite_connection_create_collation(pysqlite_Connection* self, PyObject* args) { PyObject* callable; - PyObject* uppercase_name = 0; - PyObject* name; + PyObject* name = NULL; PyObject* retval; - Py_ssize_t i, len; - _Py_IDENTIFIER(upper); - const char *uppercase_name_str; + const char *name_str; int rc; - unsigned int kind; - const void *data; if (!pysqlite_check_thread(self) || !pysqlite_check_connection(self)) { goto finally; @@ -1884,32 +1959,8 @@ pysqlite_connection_create_collation(pysqlite_Connection* self, PyObject* args) goto finally; } - uppercase_name = _PyObject_CallMethodIdObjArgs((PyObject *)&PyUnicode_Type, - &PyId_upper, name, NULL); - if (!uppercase_name) { - goto finally; - } - - if (PyUnicode_READY(uppercase_name)) - goto finally; - len = PyUnicode_GET_LENGTH(uppercase_name); - kind = PyUnicode_KIND(uppercase_name); - data = PyUnicode_DATA(uppercase_name); - for (i=0; i= '0' && ch <= '9') - || (ch >= 'A' && ch <= 'Z') - || (ch == '_')) - { - continue; - } else { - PyErr_SetString(pysqlite_ProgrammingError, "invalid character in collation name"); - goto finally; - } - } - - uppercase_name_str = PyUnicode_AsUTF8(uppercase_name); - if (!uppercase_name_str) + name_str = PyUnicode_AsUTF8(name); + if (!name_str) goto finally; if (callable != Py_None && !PyCallable_Check(callable)) { @@ -1918,27 +1969,25 @@ pysqlite_connection_create_collation(pysqlite_Connection* self, PyObject* args) } if (callable != Py_None) { - if (PyDict_SetItem(self->collations, uppercase_name, callable) == -1) + if (PyDict_SetItem(self->collations, name, callable) == -1) goto finally; } else { - if (PyDict_DelItem(self->collations, uppercase_name) == -1) + if (PyDict_DelItem(self->collations, name) == -1) goto finally; } rc = sqlite3_create_collation(self->db, - uppercase_name_str, + name_str, SQLITE_UTF8, (callable != Py_None) ? callable : NULL, (callable != Py_None) ? pysqlite_collation_callback : NULL); if (rc != SQLITE_OK) { - PyDict_DelItem(self->collations, uppercase_name); + PyDict_DelItem(self->collations, name); _pysqlite_seterror(self->db); goto finally; } finally: - Py_XDECREF(uppercase_name); - if (PyErr_Occurred()) { retval = NULL; } else { @@ -2063,6 +2112,10 @@ static PyMethodDef connection_methods[] = { #endif {"set_authorizer", (PyCFunction)(void(*)(void))pysqlite_connection_set_authorizer, METH_VARARGS|METH_KEYWORDS, PyDoc_STR("Sets authorizer callback. Non-standard.")}, + {"set_busy_handler", (PyCFunction)(void(*)(void))pysqlite_connection_set_busy_handler, METH_VARARGS|METH_KEYWORDS, + PyDoc_STR("Sets busy handler. Non-standard.")}, + {"set_busy_timeout", (PyCFunction)(void(*)(void))pysqlite_connection_set_busy_timeout, METH_VARARGS|METH_KEYWORDS, + PyDoc_STR("Sets busy timeout. Non-standard.")}, #ifdef HAVE_LOAD_EXTENSION {"enable_load_extension", (PyCFunction)pysqlite_enable_load_extension, METH_VARARGS, PyDoc_STR("Enable dynamic loading of SQLite extension modules. Non-standard.")}, @@ -2163,4 +2216,4 @@ extern int pysqlite_connection_setup_types(void) { pysqlite_ConnectionType.tp_new = PyType_GenericNew; return PyType_Ready(&pysqlite_ConnectionType); -} +} \ No newline at end of file diff --git a/src/connection.h b/src/connection.h index 37c83b2..8da1a22 100644 --- a/src/connection.h +++ b/src/connection.h @@ -90,6 +90,7 @@ typedef struct PyObject* function_pinboard_trace_callback; PyObject* function_pinboard_progress_handler; PyObject* function_pinboard_authorizer_cb; + PyObject* function_pinboard_busy_handler_cb; /* a dictionary of registered collation name => collation callable mappings */ PyObject* collations; diff --git a/src/cursor.c b/src/cursor.c index 2ede578..7cdbff7 100644 --- a/src/cursor.c +++ b/src/cursor.c @@ -536,7 +536,7 @@ _pysqlite_query_execute(pysqlite_Cursor* self, int multiple, PyObject* args) } if (pysqlite_build_row_cast_map(self) != 0) { - _PyErr_FormatFromCause(pysqlite_OperationalError, "Error while building row_cast_map"); + PyErr_Format(pysqlite_OperationalError, "Error while building row_cast_map"); goto error; } diff --git a/src/row.c b/src/row.c index 6ab092b..bd86fa9 100644 --- a/src/row.c +++ b/src/row.c @@ -41,8 +41,6 @@ pysqlite_row_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) assert(type != NULL && type->tp_alloc != NULL); - if (!_PyArg_NoKeywords("Row", kwargs)) - return NULL; if (!PyArg_ParseTuple(args, "OO", &cursor, &data)) return NULL; diff --git a/src/util.c b/src/util.c index ebf0cb5..143bc12 100644 --- a/src/util.c +++ b/src/util.c @@ -157,9 +157,15 @@ _pysqlite_long_as_int64(PyObject * py_val) } else if (sizeof(value) < sizeof(sqlite_int64)) { sqlite_int64 int64val; +#if PY_VERSION_HEX < 0x030D0000 if (_PyLong_AsByteArray((PyLongObject *)py_val, (unsigned char *)&int64val, sizeof(int64val), IS_LITTLE_ENDIAN, 1 /* signed */) >= 0) { +#else + if (_PyLong_AsByteArray((PyLongObject *)py_val, + (unsigned char *)&int64val, sizeof(int64val), + IS_LITTLE_ENDIAN, 1 /* signed */, 1) >= 0) { +#endif return int64val; } } diff --git a/src/util.h b/src/util.h index f6eb713..5e6f92f 100644 --- a/src/util.h +++ b/src/util.h @@ -39,4 +39,46 @@ int _pysqlite_seterror(sqlite3* db); sqlite_int64 _pysqlite_long_as_int64(PyObject * value); +#ifndef _Py_CAST +# define _Py_CAST(type, expr) ((type)(expr)) +#endif + +// Cast argument to PyObject* type. +#ifndef _PyObject_CAST +# define _PyObject_CAST(op) _Py_CAST(PyObject*, op) +#endif + +#if PY_VERSION_HEX < 0x030A00A3 && !defined(Py_NewRef) +static inline PyObject* _Py_NewRef(PyObject *obj) +{ + Py_INCREF(obj); + return obj; +} +#define Py_NewRef(obj) _Py_NewRef(_PyObject_CAST(obj)) +#endif + +#if PY_VERSION_HEX < 0x030D0000 +static inline int PyWeakref_GetRef(PyObject *ref, PyObject **pobj) +{ + PyObject *obj; + if (ref != NULL && !PyWeakref_Check(ref)) { + *pobj = NULL; + PyErr_SetString(PyExc_TypeError, "expected a weakref"); + return -1; + } + obj = PyWeakref_GetObject(ref); + if (obj == NULL) { + // SystemError if ref is NULL + *pobj = NULL; + return -1; + } + if (obj == Py_None) { + *pobj = NULL; + return 0; + } + *pobj = Py_NewRef(obj); + return (*pobj != NULL); +} +#endif + #endif diff --git a/test/__init__.py b/tests/__init__.py similarity index 100% rename from test/__init__.py rename to tests/__init__.py diff --git a/test/__main__.py b/tests/__main__.py similarity index 67% rename from test/__main__.py rename to tests/__main__.py index 86a16d4..1a43df9 100644 --- a/test/__main__.py +++ b/tests/__main__.py @@ -2,14 +2,14 @@ import optparse import sys import unittest -from test.test_backup import suite as backup_suite -from test.test_dbapi import suite as dbapi_suite -from test.test_factory import suite as factory_suite -from test.test_hooks import suite as hooks_suite -from test.test_regression import suite as regression_suite -from test.test_transactions import suite as transactions_suite -from test.test_ttypes import suite as types_suite -from test.test_userfunctions import suite as userfunctions_suite +from tests.backup import suite as backup_suite +from tests.dbapi import suite as dbapi_suite +from tests.factory import suite as factory_suite +from tests.hooks import suite as hooks_suite +from tests.regression import suite as regression_suite +from tests.transactions import suite as transactions_suite +from tests.ttypes import suite as types_suite +from tests.userfunctions import suite as userfunctions_suite def test(verbosity=1, failfast=False): diff --git a/test/test_backup.py b/tests/backup.py similarity index 87% rename from test/test_backup.py rename to tests/backup.py index 29c6916..b149879 100644 --- a/test/test_backup.py +++ b/tests/backup.py @@ -95,6 +95,20 @@ class BackupTests(unittest.TestCase): self.assertEqual(len(journal), 1) self.assertEqual(journal[0], 0) + def test_sleep(self): + with self.assertRaises(ValueError) as bm: + with sqlite.connect(':memory:') as bck: + self.cx.backup(bck, sleep=-1) + self.assertEqual(str(bm.exception), 'sleep must be greater-than or equal to zero') + + with self.assertRaises(TypeError): + with sqlite.connect(':memory:') as bck: + self.cx.backup(bck, sleep=None) + + with sqlite.connect(':memory:') as bck: + self.cx.backup(bck, sleep=10) + self.verify_backup(bck) + def test_non_callable_progress(self): with self.assertRaises(TypeError) as cm: with sqlite.connect(':memory:') as bck: @@ -156,7 +170,14 @@ class BackupTests(unittest.TestCase): def suite(): - return unittest.makeSuite(BackupTests) + loader = unittest.TestLoader() + tests = [loader.loadTestsFromTestCase(t) for t in ( + BackupTests,)] + return unittest.TestSuite(tests) + +def test(): + runner = unittest.TextTestRunner() + runner.run(suite()) if __name__ == "__main__": - unittest.main() + test() diff --git a/test/test_dbapi.py b/tests/dbapi.py similarity index 84% rename from test/test_dbapi.py rename to tests/dbapi.py index 855d59f..b31c0fc 100644 --- a/test/test_dbapi.py +++ b/tests/dbapi.py @@ -26,66 +26,66 @@ import unittest from sqlcipher3 import dbapi2 as sqlite #from test.support import TESTFN, unlink -TESTFN = '/tmp/sqlcipher3_test' +TESTFN = '/tmp/pysqlite3_test' from os import unlink class ModuleTests(unittest.TestCase): - def CheckAPILevel(self): + def test_APILevel(self): self.assertEqual(sqlite.apilevel, "2.0", "apilevel is %s, should be 2.0" % sqlite.apilevel) - def CheckThreadSafety(self): + def test_ThreadSafety(self): self.assertEqual(sqlite.threadsafety, 1, "threadsafety is %d, should be 1" % sqlite.threadsafety) - def CheckParamStyle(self): + def test_ParamStyle(self): self.assertEqual(sqlite.paramstyle, "qmark", "paramstyle is '%s', should be 'qmark'" % sqlite.paramstyle) - def CheckWarning(self): + def test_Warning(self): self.assertTrue(issubclass(sqlite.Warning, Exception), "Warning is not a subclass of Exception") - def CheckError(self): + def test_Error(self): self.assertTrue(issubclass(sqlite.Error, Exception), "Error is not a subclass of Exception") - def CheckInterfaceError(self): + def test_InterfaceError(self): self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error), "InterfaceError is not a subclass of Error") - def CheckDatabaseError(self): + def test_DatabaseError(self): self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error), "DatabaseError is not a subclass of Error") - def CheckDataError(self): + def test_DataError(self): self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError), "DataError is not a subclass of DatabaseError") - def CheckOperationalError(self): + def test_OperationalError(self): self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError), "OperationalError is not a subclass of DatabaseError") - def CheckIntegrityError(self): + def test_IntegrityError(self): self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError), "IntegrityError is not a subclass of DatabaseError") - def CheckInternalError(self): + def test_InternalError(self): self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError), "InternalError is not a subclass of DatabaseError") - def CheckProgrammingError(self): + def test_ProgrammingError(self): self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError), "ProgrammingError is not a subclass of DatabaseError") - def CheckNotSupportedError(self): + def test_NotSupportedError(self): self.assertTrue(issubclass(sqlite.NotSupportedError, sqlite.DatabaseError), "NotSupportedError is not a subclass of DatabaseError") - def CheckErrorCodeOnException(self): + def test_ErrorCodeOnException(self): with self.assertRaises(sqlite.Error) as cm: db = sqlite.connect('/no/such/file/exists') e = cm.exception @@ -104,38 +104,38 @@ class ConnectionTests(unittest.TestCase): def tearDown(self): self.cx.close() - def CheckCommit(self): + def test_Commit(self): self.cx.commit() - def CheckCommitAfterNoChanges(self): + def test_CommitAfterNoChanges(self): """ A commit should also work when no changes were made to the database. """ self.cx.commit() self.cx.commit() - def CheckRollback(self): + def test_Rollback(self): self.cx.rollback() - def CheckRollbackAfterNoChanges(self): + def test_RollbackAfterNoChanges(self): """ A rollback should also work when no changes were made to the database. """ self.cx.rollback() self.cx.rollback() - def CheckCursor(self): + def test_Cursor(self): cu = self.cx.cursor() - def CheckFailedOpen(self): + def test_FailedOpen(self): YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db" with self.assertRaises(sqlite.OperationalError): con = sqlite.connect(YOU_CANNOT_OPEN_THIS) - def CheckClose(self): + def test_Close(self): self.cx.close() - def CheckExceptions(self): + def test_Exceptions(self): # Optional DB-API extension. self.assertEqual(self.cx.Warning, sqlite.Warning) self.assertEqual(self.cx.Error, sqlite.Error) @@ -148,7 +148,7 @@ class ConnectionTests(unittest.TestCase): self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError) self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError) - def CheckInTransaction(self): + def test_InTransaction(self): # Can't use db from setUp because we want to test initial state. cx = sqlite.connect(":memory:") cu = cx.cursor() @@ -166,11 +166,11 @@ class ConnectionTests(unittest.TestCase): row = cu.fetchone() self.assertEqual(cx.in_transaction, False) - def CheckInTransactionRO(self): + def test_InTransactionRO(self): with self.assertRaises(AttributeError): self.cx.in_transaction = True - def CheckOpenWithPathLikeObject(self): + def test_OpenWithPathLikeObject(self): """ Checks that we can successfully connect to a database using an object that is PathLike, i.e. has __fspath__(). """ self.addCleanup(unlink, TESTFN) @@ -181,7 +181,7 @@ class ConnectionTests(unittest.TestCase): with sqlite.connect(path) as cx: cx.execute('create table test(id integer)') - def CheckOpenUri(self): + def test_OpenUri(self): if sqlite.sqlite_version_info < (3, 7, 7): with self.assertRaises(sqlite.NotSupportedError): sqlite.connect(':memory:', uri=True) @@ -197,7 +197,7 @@ class ConnectionTests(unittest.TestCase): @unittest.skipIf(sqlite.sqlite_version_info >= (3, 3, 1), 'needs sqlite versions older than 3.3.1') - def CheckSameThreadErrorOnOldVersion(self): + def test_SameThreadErrorOnOldVersion(self): with self.assertRaises(sqlite.NotSupportedError) as cm: sqlite.connect(':memory:', check_same_thread=False) self.assertEqual(str(cm.exception), 'shared connections not available') @@ -216,21 +216,21 @@ class CursorTests(unittest.TestCase): self.cu.close() self.cx.close() - def CheckExecuteNoArgs(self): + def test_ExecuteNoArgs(self): self.cu.execute("delete from test") - def CheckExecuteIllegalSql(self): + def test_ExecuteIllegalSql(self): with self.assertRaises(sqlite.OperationalError): self.cu.execute("select asdf") - def CheckExecuteTooMuchSql(self): + def test_ExecuteTooMuchSql(self): with self.assertRaises(sqlite.Warning): self.cu.execute("select 5+4; select 4+5") - def CheckExecuteTooMuchSql2(self): + def test_ExecuteTooMuchSql2(self): self.cu.execute("select 5+4; -- foo bar") - def CheckExecuteTooMuchSql3(self): + def test_ExecuteTooMuchSql3(self): self.cu.execute(""" select 5+4; @@ -239,53 +239,53 @@ class CursorTests(unittest.TestCase): */ """) - def CheckExecuteWrongSqlArg(self): + def test_ExecuteWrongSqlArg(self): with self.assertRaises(TypeError): self.cu.execute(42) - def CheckExecuteArgInt(self): + def test_ExecuteArgInt(self): self.cu.execute("insert into test(id) values (?)", (42,)) - def CheckExecuteArgFloat(self): + def test_ExecuteArgFloat(self): self.cu.execute("insert into test(income) values (?)", (2500.32,)) - def CheckExecuteArgString(self): + def test_ExecuteArgString(self): self.cu.execute("insert into test(name) values (?)", ("Hugo",)) - def CheckExecuteArgStringWithZeroByte(self): + def test_ExecuteArgStringWithZeroByte(self): self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",)) self.cu.execute("select name from test where id=?", (self.cu.lastrowid,)) row = self.cu.fetchone() self.assertEqual(row[0], "Hu\x00go") - def CheckExecuteNonIterable(self): + def test_ExecuteNonIterable(self): with self.assertRaises(ValueError) as cm: self.cu.execute("insert into test(id) values (?)", 42) self.assertEqual(str(cm.exception), 'parameters are of unsupported type') - def CheckExecuteWrongNoOfArgs1(self): + def test_ExecuteWrongNoOfArgs1(self): # too many parameters with self.assertRaises(sqlite.ProgrammingError): self.cu.execute("insert into test(id) values (?)", (17, "Egon")) - def CheckExecuteWrongNoOfArgs2(self): + def test_ExecuteWrongNoOfArgs2(self): # too little parameters with self.assertRaises(sqlite.ProgrammingError): self.cu.execute("insert into test(id) values (?)") - def CheckExecuteWrongNoOfArgs3(self): + def test_ExecuteWrongNoOfArgs3(self): # no parameters, parameters are needed with self.assertRaises(sqlite.ProgrammingError): self.cu.execute("insert into test(id) values (?)") - def CheckExecuteParamList(self): + def test_ExecuteParamList(self): self.cu.execute("insert into test(name) values ('foo')") self.cu.execute("select name from test where name=?", ["foo"]) row = self.cu.fetchone() self.assertEqual(row[0], "foo") - def CheckExecuteParamSequence(self): + def test_ExecuteParamSequence(self): class L(object): def __len__(self): return 1 @@ -298,13 +298,13 @@ class CursorTests(unittest.TestCase): row = self.cu.fetchone() self.assertEqual(row[0], "foo") - def CheckExecuteDictMapping(self): + def test_ExecuteDictMapping(self): self.cu.execute("insert into test(name) values ('foo')") self.cu.execute("select name from test where name=:name", {"name": "foo"}) row = self.cu.fetchone() self.assertEqual(row[0], "foo") - def CheckExecuteDictMapping_Mapping(self): + def test_ExecuteDictMapping_Mapping(self): class D(dict): def __missing__(self, key): return "foo" @@ -314,32 +314,32 @@ class CursorTests(unittest.TestCase): row = self.cu.fetchone() self.assertEqual(row[0], "foo") - def CheckExecuteDictMappingTooLittleArgs(self): + def test_ExecuteDictMappingTooLittleArgs(self): self.cu.execute("insert into test(name) values ('foo')") with self.assertRaises(sqlite.ProgrammingError): self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"}) - def CheckExecuteDictMappingNoArgs(self): + def test_ExecuteDictMappingNoArgs(self): self.cu.execute("insert into test(name) values ('foo')") with self.assertRaises(sqlite.ProgrammingError): self.cu.execute("select name from test where name=:name") - def CheckExecuteDictMappingUnnamed(self): + def test_ExecuteDictMappingUnnamed(self): self.cu.execute("insert into test(name) values ('foo')") with self.assertRaises(sqlite.ProgrammingError): self.cu.execute("select name from test where name=?", {"name": "foo"}) - def CheckClose(self): + def test_Close(self): self.cu.close() - def CheckRowcountExecute(self): + def test_RowcountExecute(self): self.cu.execute("delete from test") self.cu.execute("insert into test(name) values ('foo')") self.cu.execute("insert into test(name) values ('foo')") self.cu.execute("update test set name='bar'") self.assertEqual(self.cu.rowcount, 2) - def CheckRowcountSelect(self): + def test_RowcountSelect(self): """ pysqlite does not know the rowcount of SELECT statements, because we don't fetch all rows after executing the select statement. The rowcount @@ -348,12 +348,12 @@ class CursorTests(unittest.TestCase): self.cu.execute("select 5 union select 6") self.assertEqual(self.cu.rowcount, -1) - def CheckRowcountExecutemany(self): + def test_RowcountExecutemany(self): self.cu.execute("delete from test") self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)]) self.assertEqual(self.cu.rowcount, 3) - def CheckTotalChanges(self): + def test_TotalChanges(self): self.cu.execute("insert into test(name) values ('foo')") self.cu.execute("insert into test(name) values ('foo')") self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value') @@ -362,10 +362,10 @@ class CursorTests(unittest.TestCase): # Sequences are required by the DB-API, iterators # enhancements in pysqlite. - def CheckExecuteManySequence(self): + def test_ExecuteManySequence(self): self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)]) - def CheckExecuteManyIterator(self): + def test_ExecuteManyIterator(self): class MyIter: def __init__(self): self.value = 5 @@ -379,26 +379,26 @@ class CursorTests(unittest.TestCase): self.cu.executemany("insert into test(income) values (?)", MyIter()) - def CheckExecuteManyGenerator(self): + def test_ExecuteManyGenerator(self): def mygen(): for i in range(5): yield (i,) self.cu.executemany("insert into test(income) values (?)", mygen()) - def CheckExecuteManyWrongSqlArg(self): + def test_ExecuteManyWrongSqlArg(self): with self.assertRaises(TypeError): self.cu.executemany(42, [(3,)]) - def CheckExecuteManySelect(self): + def test_ExecuteManySelect(self): with self.assertRaises(sqlite.ProgrammingError): self.cu.executemany("select ?", [(3,)]) - def CheckExecuteManyNotIterable(self): + def test_ExecuteManyNotIterable(self): with self.assertRaises(TypeError): self.cu.executemany("insert into test(income) values (?)", 42) - def CheckFetchIter(self): + def test_FetchIter(self): # Optional DB-API extension. self.cu.execute("delete from test") self.cu.execute("insert into test(id) values (?)", (5,)) @@ -410,19 +410,19 @@ class CursorTests(unittest.TestCase): self.assertEqual(lst[0], 5) self.assertEqual(lst[1], 6) - def CheckFetchone(self): + def test_Fetchone(self): self.cu.execute("select name from test") row = self.cu.fetchone() self.assertEqual(row[0], "foo") row = self.cu.fetchone() self.assertEqual(row, None) - def CheckFetchoneNoStatement(self): + def test_FetchoneNoStatement(self): cur = self.cx.cursor() row = cur.fetchone() self.assertEqual(row, None) - def CheckArraySize(self): + def test_ArraySize(self): # must default ot 1 self.assertEqual(self.cu.arraysize, 1) @@ -439,51 +439,51 @@ class CursorTests(unittest.TestCase): self.assertEqual(len(res), 2) - def CheckFetchmany(self): + def test_Fetchmany(self): self.cu.execute("select name from test") res = self.cu.fetchmany(100) self.assertEqual(len(res), 1) res = self.cu.fetchmany(100) self.assertEqual(res, []) - def CheckFetchmanyKwArg(self): + def test_FetchmanyKwArg(self): """Checks if fetchmany works with keyword arguments""" self.cu.execute("select name from test") res = self.cu.fetchmany(size=100) self.assertEqual(len(res), 1) - def CheckFetchall(self): + def test_Fetchall(self): self.cu.execute("select name from test") res = self.cu.fetchall() self.assertEqual(len(res), 1) res = self.cu.fetchall() self.assertEqual(res, []) - def CheckSetinputsizes(self): + def test_Setinputsizes(self): self.cu.setinputsizes([3, 4, 5]) - def CheckSetoutputsize(self): + def test_Setoutputsize(self): self.cu.setoutputsize(5, 0) - def CheckSetoutputsizeNoColumn(self): + def test_SetoutputsizeNoColumn(self): self.cu.setoutputsize(42) - def CheckCursorConnection(self): + def test_CursorConnection(self): # Optional DB-API extension. self.assertEqual(self.cu.connection, self.cx) - def CheckWrongCursorCallable(self): + def test_WrongCursorCallable(self): with self.assertRaises(TypeError): def f(): pass cur = self.cx.cursor(f) - def CheckCursorWrongClass(self): + def test_CursorWrongClass(self): class Foo: pass foo = Foo() with self.assertRaises(TypeError): cur = sqlite.Cursor(foo) - def CheckLastRowIDOnReplace(self): + def test_LastRowIDOnReplace(self): """ INSERT OR REPLACE and REPLACE INTO should produce the same behavior. """ @@ -493,7 +493,7 @@ class CursorTests(unittest.TestCase): self.cu.execute(sql.format(statement), (1, 'foo')) self.assertEqual(self.cu.lastrowid, 1) - def CheckLastRowIDOnIgnore(self): + def test_LastRowIDOnIgnore(self): self.cu.execute( "insert or ignore into test(unique_test) values (?)", ('test',)) @@ -503,7 +503,7 @@ class CursorTests(unittest.TestCase): ('test',)) self.assertEqual(self.cu.lastrowid, 2) - def CheckLastRowIDInsertOR(self): + def test_LastRowIDInsertOR(self): results = [] for statement in ('FAIL', 'ABORT', 'ROLLBACK'): sql = 'INSERT OR {} INTO test(unique_test) VALUES (?)' @@ -533,160 +533,160 @@ class BlobTests(unittest.TestCase): self.blob.close() self.cx.close() - def CheckLength(self): + def test_Length(self): self.assertEqual(len(self.blob), 100) - def CheckTell(self): + def test_Tell(self): self.assertEqual(self.blob.tell(), 0) - def CheckSeekFromBlobStart(self): + def test_SeekFromBlobStart(self): self.blob.seek(10) self.assertEqual(self.blob.tell(), 10) self.blob.seek(10, 0) self.assertEqual(self.blob.tell(), 10) - def CheckSeekFromCurrentPosition(self): + def test_SeekFromCurrentPosition(self): self.blob.seek(10, 1) self.blob.seek(10, 1) self.assertEqual(self.blob.tell(), 20) - def CheckSeekFromBlobEnd(self): + def test_SeekFromBlobEnd(self): self.blob.seek(-10, 2) self.assertEqual(self.blob.tell(), 90) - def CheckBlobSeekOverBlobSize(self): + def test_BlobSeekOverBlobSize(self): with self.assertRaises(ValueError): self.blob.seek(1000) - def CheckBlobSeekUnderBlobSize(self): + def test_BlobSeekUnderBlobSize(self): with self.assertRaises(ValueError): self.blob.seek(-10) - def CheckBlobRead(self): + def test_BlobRead(self): self.assertEqual(self.blob.read(), self.blob_data) - def CheckBlobReadSize(self): + def test_BlobReadSize(self): self.assertEqual(len(self.blob.read(10)), 10) - def CheckBlobReadAdvanceOffset(self): + def test_BlobReadAdvanceOffset(self): self.blob.read(10) self.assertEqual(self.blob.tell(), 10) - def CheckBlobReadStartAtOffset(self): + def test_BlobReadStartAtOffset(self): self.blob.seek(10) self.blob.write(self.second_data[:10]) self.blob.seek(10) self.assertEqual(self.blob.read(10), self.second_data[:10]) - def CheckBlobWrite(self): + def test_BlobWrite(self): self.blob.write(self.second_data) self.assertEqual(self.cx.execute("select blob_col from test").fetchone()[0], self.second_data) - def CheckBlobWriteAtOffset(self): + def test_BlobWriteAtOffset(self): self.blob.seek(50) self.blob.write(self.second_data[:50]) self.assertEqual(self.cx.execute("select blob_col from test").fetchone()[0], self.blob_data[:50] + self.second_data[:50]) - def CheckBlobWriteAdvanceOffset(self): + def test_BlobWriteAdvanceOffset(self): self.blob.write(self.second_data[:50]) self.assertEqual(self.blob.tell(), 50) - def CheckBlobWriteMoreThenBlobSize(self): + def test_BlobWriteMoreThenBlobSize(self): with self.assertRaises(ValueError): self.blob.write(b"a" * 1000) - def CheckBlobReadAfterRowChange(self): + def test_BlobReadAfterRowChange(self): self.cx.execute("UPDATE test SET blob_col='aaaa' where id=1") with self.assertRaises(sqlite.OperationalError): self.blob.read() - def CheckBlobWriteAfterRowChange(self): + def test_BlobWriteAfterRowChange(self): self.cx.execute("UPDATE test SET blob_col='aaaa' where id=1") with self.assertRaises(sqlite.OperationalError): self.blob.write(b"aaa") - def CheckBlobWriteWhenReadOnly(self): + def test_BlobWriteWhenReadOnly(self): read_only_blob = \ self.cx.open_blob("test", "blob_col", 1, readonly=True) with self.assertRaises(sqlite.OperationalError): read_only_blob.write(b"aaa") read_only_blob.close() - def CheckBlobOpenWithBadDb(self): + def test_BlobOpenWithBadDb(self): with self.assertRaises(sqlite.OperationalError): self.cx.open_blob("test", "blob_col", 1, dbname="notexisintg") - def CheckBlobOpenWithBadTable(self): + def test_BlobOpenWithBadTable(self): with self.assertRaises(sqlite.OperationalError): self.cx.open_blob("notexisintg", "blob_col", 1) - def CheckBlobOpenWithBadColumn(self): + def test_BlobOpenWithBadColumn(self): with self.assertRaises(sqlite.OperationalError): self.cx.open_blob("test", "notexisting", 1) - def CheckBlobOpenWithBadRow(self): + def test_BlobOpenWithBadRow(self): with self.assertRaises(sqlite.OperationalError): self.cx.open_blob("test", "blob_col", 2) - def CheckBlobGetItem(self): + def test_BlobGetItem(self): self.assertEqual(self.blob[5], b"a") - def CheckBlobGetItemIndexOutOfRange(self): + def test_BlobGetItemIndexOutOfRange(self): with self.assertRaises(IndexError): self.blob[105] with self.assertRaises(IndexError): self.blob[-105] - def CheckBlobGetItemNegativeIndex(self): + def test_BlobGetItemNegativeIndex(self): self.assertEqual(self.blob[-5], b"a") - def CheckBlobGetItemInvalidIndex(self): + def test_BlobGetItemInvalidIndex(self): with self.assertRaises(TypeError): self.blob[b"a"] - def CheckBlobGetSlice(self): + def test_BlobGetSlice(self): self.assertEqual(self.blob[5:10], b"aaaaa") - def CheckBlobGetSliceNegativeIndex(self): + def test_BlobGetSliceNegativeIndex(self): self.assertEqual(self.blob[5:-5], self.blob_data[5:-5]) - def CheckBlobGetSliceInvalidIndex(self): + def test_BlobGetSliceInvalidIndex(self): with self.assertRaises(TypeError): self.blob[5:b"a"] - def CheckBlobGetSliceWithSkip(self): + def test_BlobGetSliceWithSkip(self): self.blob.write(b"abcdefghij") self.assertEqual(self.blob[0:10:2], b"acegi") - def CheckBlobSetItem(self): + def test_BlobSetItem(self): self.blob[0] = b"b" self.assertEqual(self.cx.execute("select blob_col from test").fetchone()[0], b"b" + self.blob_data[1:]) - def CheckBlobSetSlice(self): + def test_BlobSetSlice(self): self.blob[0:5] = b"bbbbb" self.assertEqual(self.cx.execute("select blob_col from test").fetchone()[0], b"bbbbb" + self.blob_data[5:]) - def CheckBlobSetSliceWithSkip(self): + def test_BlobSetSliceWithSkip(self): self.blob[0:10:2] = b"bbbbb" self.assertEqual(self.cx.execute("select blob_col from test").fetchone()[0], b"bababababa" + self.blob_data[10:]) - def CheckBlobGetEmptySlice(self): + def test_BlobGetEmptySlice(self): self.assertEqual(self.blob[5:5], b"") - def CheckBlobSetSliceWrongLength(self): + def test_BlobSetSliceWrongLength(self): with self.assertRaises(IndexError): self.blob[5:10] = b"a" - def CheckBlobConcatNotSupported(self): + def test_BlobConcatNotSupported(self): with self.assertRaises(SystemError): self.blob + self.blob - def CheckBlobRepeateNotSupported(self): + def test_BlobRepeateNotSupported(self): with self.assertRaises(SystemError): self.blob * 5 - def CheckBlobContainsNotSupported(self): + def test_BlobContainsNotSupported(self): with self.assertRaises(SystemError): b"aaaaa" in self.blob @@ -701,7 +701,7 @@ class ThreadTests(unittest.TestCase): self.cur.close() self.con.close() - def CheckConCursor(self): + def test_ConCursor(self): def run(con, errors): try: cur = con.cursor() @@ -719,7 +719,7 @@ class ThreadTests(unittest.TestCase): if len(errors) > 0: self.fail("\n".join(errors)) - def CheckConCommit(self): + def test_ConCommit(self): def run(con, errors): try: con.commit() @@ -737,7 +737,7 @@ class ThreadTests(unittest.TestCase): if len(errors) > 0: self.fail("\n".join(errors)) - def CheckConRollback(self): + def test_ConRollback(self): def run(con, errors): try: con.rollback() @@ -755,7 +755,7 @@ class ThreadTests(unittest.TestCase): if len(errors) > 0: self.fail("\n".join(errors)) - def CheckConClose(self): + def test_ConClose(self): def run(con, errors): try: con.close() @@ -773,7 +773,7 @@ class ThreadTests(unittest.TestCase): if len(errors) > 0: self.fail("\n".join(errors)) - def CheckCurImplicitBegin(self): + def test_CurImplicitBegin(self): def run(cur, errors): try: cur.execute("insert into test(name) values ('a')") @@ -791,7 +791,7 @@ class ThreadTests(unittest.TestCase): if len(errors) > 0: self.fail("\n".join(errors)) - def CheckCurClose(self): + def test_CurClose(self): def run(cur, errors): try: cur.close() @@ -809,7 +809,7 @@ class ThreadTests(unittest.TestCase): if len(errors) > 0: self.fail("\n".join(errors)) - def CheckCurExecute(self): + def test_CurExecute(self): def run(cur, errors): try: cur.execute("select name from test") @@ -828,7 +828,7 @@ class ThreadTests(unittest.TestCase): if len(errors) > 0: self.fail("\n".join(errors)) - def CheckCurIterNext(self): + def test_CurIterNext(self): def run(cur, errors): try: row = cur.fetchone() @@ -849,29 +849,29 @@ class ThreadTests(unittest.TestCase): self.fail("\n".join(errors)) class ConstructorTests(unittest.TestCase): - def CheckDate(self): + def test_Date(self): d = sqlite.Date(2004, 10, 28) - def CheckTime(self): + def test_Time(self): t = sqlite.Time(12, 39, 35) - def CheckTimestamp(self): + def test_Timestamp(self): ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35) - def CheckDateFromTicks(self): + def test_DateFromTicks(self): d = sqlite.DateFromTicks(42) - def CheckTimeFromTicks(self): + def test_TimeFromTicks(self): t = sqlite.TimeFromTicks(42) - def CheckTimestampFromTicks(self): + def test_TimestampFromTicks(self): ts = sqlite.TimestampFromTicks(42) - def CheckBinary(self): + def test_Binary(self): b = sqlite.Binary(b"\0'") class ExtensionTests(unittest.TestCase): - def CheckScriptStringSql(self): + def test_ScriptStringSql(self): con = sqlite.connect(":memory:") cur = con.cursor() cur.executescript(""" @@ -884,31 +884,31 @@ class ExtensionTests(unittest.TestCase): res = cur.fetchone()[0] self.assertEqual(res, 5) - def CheckScriptSyntaxError(self): + def test_ScriptSyntaxError(self): con = sqlite.connect(":memory:") cur = con.cursor() with self.assertRaises(sqlite.OperationalError): cur.executescript("create table test(x); asdf; create table test2(x)") - def CheckScriptErrorNormal(self): + def test_ScriptErrorNormal(self): con = sqlite.connect(":memory:") cur = con.cursor() with self.assertRaises(sqlite.OperationalError): cur.executescript("create table test(sadfsadfdsa); select foo from hurz;") - def CheckCursorExecutescriptAsBytes(self): + def test_CursorExecutescriptAsBytes(self): con = sqlite.connect(":memory:") cur = con.cursor() with self.assertRaises(ValueError) as cm: cur.executescript(b"create table test(foo); insert into test(foo) values (5);") self.assertEqual(str(cm.exception), 'script argument must be unicode.') - def CheckConnectionExecute(self): + def test_ConnectionExecute(self): con = sqlite.connect(":memory:") result = con.execute("select 5").fetchone()[0] self.assertEqual(result, 5, "Basic test of Connection.execute") - def CheckConnectionExecutemany(self): + def test_ConnectionExecutemany(self): con = sqlite.connect(":memory:") con.execute("create table test(foo)") con.executemany("insert into test(foo) values (?)", [(3,), (4,)]) @@ -916,39 +916,39 @@ class ExtensionTests(unittest.TestCase): self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany") self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany") - def CheckConnectionExecutescript(self): + def test_ConnectionExecutescript(self): con = sqlite.connect(":memory:") con.executescript("create table test(foo); insert into test(foo) values (5);") result = con.execute("select foo from test").fetchone()[0] self.assertEqual(result, 5, "Basic test of Connection.executescript") class ClosedConTests(unittest.TestCase): - def CheckClosedConCursor(self): + def test_ClosedConCursor(self): con = sqlite.connect(":memory:") con.close() with self.assertRaises(sqlite.ProgrammingError): cur = con.cursor() - def CheckClosedConCommit(self): + def test_ClosedConCommit(self): con = sqlite.connect(":memory:") con.close() with self.assertRaises(sqlite.ProgrammingError): con.commit() - def CheckClosedConRollback(self): + def test_ClosedConRollback(self): con = sqlite.connect(":memory:") con.close() with self.assertRaises(sqlite.ProgrammingError): con.rollback() - def CheckClosedCurExecute(self): + def test_ClosedCurExecute(self): con = sqlite.connect(":memory:") cur = con.cursor() con.close() with self.assertRaises(sqlite.ProgrammingError): cur.execute("select 4") - def CheckClosedBlobRead(self): + def test_ClosedBlobRead(self): con = sqlite.connect(":memory:") con.execute("create table test(id integer primary key, blob_col blob)") con.execute("insert into test(blob_col) values (zeroblob(100))") @@ -957,14 +957,14 @@ class ClosedConTests(unittest.TestCase): with self.assertRaises(sqlite.ProgrammingError): blob.read() - def CheckClosedCreateFunction(self): + def test_ClosedCreateFunction(self): con = sqlite.connect(":memory:") con.close() def f(x): return 17 with self.assertRaises(sqlite.ProgrammingError): con.create_function("foo", 1, f) - def CheckClosedCreateAggregate(self): + def test_ClosedCreateAggregate(self): con = sqlite.connect(":memory:") con.close() class Agg: @@ -977,7 +977,7 @@ class ClosedConTests(unittest.TestCase): with self.assertRaises(sqlite.ProgrammingError): con.create_aggregate("foo", 1, Agg) - def CheckClosedSetAuthorizer(self): + def test_ClosedSetAuthorizer(self): con = sqlite.connect(":memory:") con.close() def authorizer(*args): @@ -985,21 +985,21 @@ class ClosedConTests(unittest.TestCase): with self.assertRaises(sqlite.ProgrammingError): con.set_authorizer(authorizer) - def CheckClosedSetProgressCallback(self): + def test_ClosedSetProgressCallback(self): con = sqlite.connect(":memory:") con.close() def progress(): pass with self.assertRaises(sqlite.ProgrammingError): con.set_progress_handler(progress, 100) - def CheckClosedCall(self): + def test_ClosedCall(self): con = sqlite.connect(":memory:") con.close() with self.assertRaises(sqlite.ProgrammingError): con() class ClosedCurTests(unittest.TestCase): - def CheckClosed(self): + def test_Closed(self): con = sqlite.connect(":memory:") cur = con.cursor() cur.close() @@ -1023,7 +1023,7 @@ class SqliteColumnTypeTests(unittest.TestCase): self.cx.execute('create table test(a TEXT, b datetime)') self.cx.execute('create view test_view as select * from test') - def CheckDeclTypes(self): + def test_DeclTypes(self): curs = self.cx.execute('select * from test') self.assertEqual(curs.description, ( ('a', 'TEXT', None, None, None, None, None), @@ -1063,7 +1063,7 @@ class SqliteOnConflictTests(unittest.TestCase): self.cu.close() self.cx.close() - def CheckOnConflictRollbackWithExplicitTransaction(self): + def test_OnConflictRollbackWithExplicitTransaction(self): self.cx.isolation_level = None # autocommit mode self.cu = self.cx.cursor() # Start an explicit transaction. @@ -1078,7 +1078,7 @@ class SqliteOnConflictTests(unittest.TestCase): # Transaction should have rolled back and nothing should be in table. self.assertEqual(self.cu.fetchall(), []) - def CheckOnConflictAbortRaisesWithExplicitTransactions(self): + def test_OnConflictAbortRaisesWithExplicitTransactions(self): # Abort cancels the current sql statement but doesn't change anything # about the current transaction. self.cx.isolation_level = None # autocommit mode @@ -1094,7 +1094,7 @@ class SqliteOnConflictTests(unittest.TestCase): # Expect the first two inserts to work, third to do nothing. self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)]) - def CheckOnConflictRollbackWithoutTransaction(self): + def test_OnConflictRollbackWithoutTransaction(self): # Start of implicit transaction self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") @@ -1104,7 +1104,7 @@ class SqliteOnConflictTests(unittest.TestCase): # Implicit transaction is rolled back on error. self.assertEqual(self.cu.fetchall(), []) - def CheckOnConflictAbortRaisesWithoutTransactions(self): + def test_OnConflictAbortRaisesWithoutTransactions(self): # Abort cancels the current sql statement but doesn't change anything # about the current transaction. self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") @@ -1115,20 +1115,20 @@ class SqliteOnConflictTests(unittest.TestCase): self.cu.execute("SELECT name, unique_name FROM test") self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)]) - def CheckOnConflictFail(self): + def test_OnConflictFail(self): self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')") with self.assertRaises(sqlite.IntegrityError): self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')") self.assertEqual(self.cu.fetchall(), []) - def CheckOnConflictIgnore(self): + def test_OnConflictIgnore(self): self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')") # Nothing should happen. self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')") self.cu.execute("SELECT unique_name FROM test") self.assertEqual(self.cu.fetchall(), [('foo',)]) - def CheckOnConflictReplace(self): + def test_OnConflictReplace(self): self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Data!', 'foo')") # There shouldn't be an IntegrityError exception. self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Very different data!', 'foo')") @@ -1145,31 +1145,31 @@ class ClosedBlobTests(unittest.TestCase): def tearDown(self): self.cx.close() - def CheckClosedRead(self): + def test_ClosedRead(self): self.blob = self.cx.open_blob("test", "blob_col", 1) self.blob.close() with self.assertRaises(sqlite.ProgrammingError): self.blob.read() - def CheckClosedWrite(self): + def test_ClosedWrite(self): self.blob = self.cx.open_blob("test", "blob_col", 1) self.blob.close() with self.assertRaises(sqlite.ProgrammingError): self.blob.write(b"aaaaaaaaa") - def CheckClosedSeek(self): + def test_ClosedSeek(self): self.blob = self.cx.open_blob("test", "blob_col", 1) self.blob.close() with self.assertRaises(sqlite.ProgrammingError): self.blob.seek(10) - def CheckClosedTell(self): + def test_ClosedTell(self): self.blob = self.cx.open_blob("test", "blob_col", 1) self.blob.close() with self.assertRaises(sqlite.ProgrammingError): self.blob.tell() - def CheckClosedClose(self): + def test_ClosedClose(self): self.blob = self.cx.open_blob("test", "blob_col", 1) self.blob.close() with self.assertRaises(sqlite.ProgrammingError): @@ -1185,13 +1185,13 @@ class BlobContextManagerTests(unittest.TestCase): def tearDown(self): self.cx.close() - def CheckContextExecute(self): + def test_ContextExecute(self): data = b"a" * 100 with self.cx.open_blob("test", "blob_col", 1) as blob: blob.write(data) self.assertEqual(self.cx.execute("select blob_col from test").fetchone()[0], data) - def CheckContextCloseBlob(self): + def test_ContextCloseBlob(self): with self.cx.open_blob("test", "blob_col", 1) as blob: blob.seek(10) with self.assertRaises(sqlite.ProgrammingError): @@ -1199,25 +1199,22 @@ class BlobContextManagerTests(unittest.TestCase): def suite(): - module_suite = unittest.makeSuite(ModuleTests, "Check") - connection_suite = unittest.makeSuite(ConnectionTests, "Check") - cursor_suite = unittest.makeSuite(CursorTests, "Check") - thread_suite = unittest.makeSuite(ThreadTests, "Check") - constructor_suite = unittest.makeSuite(ConstructorTests, "Check") - ext_suite = unittest.makeSuite(ExtensionTests, "Check") - closed_con_suite = unittest.makeSuite(ClosedConTests, "Check") - closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check") - columntypes_suite = unittest.makeSuite(SqliteColumnTypeTests, "Check") - on_conflict_suite = unittest.makeSuite(SqliteOnConflictTests, "Check") - blob_suite = unittest.makeSuite(BlobTests, "Check") - closed_blob_suite = unittest.makeSuite(ClosedBlobTests, "Check") - blob_context_manager_suite = unittest.makeSuite(BlobContextManagerTests, "Check") - return unittest.TestSuite(( - module_suite, connection_suite, cursor_suite, thread_suite, - constructor_suite, ext_suite, closed_con_suite, closed_cur_suite, - columntypes_suite, on_conflict_suite, blob_suite, closed_blob_suite, - blob_context_manager_suite, - )) + loader = unittest.TestLoader() + tests = [loader.loadTestsFromTestCase(t) for t in ( + ModuleTests, + ConnectionTests, + CursorTests, + ThreadTests, + ConstructorTests, + ExtensionTests, + ClosedConTests, + ClosedCurTests, + SqliteColumnTypeTests, + SqliteOnConflictTests, + BlobTests, + ClosedBlobTests, + BlobContextManagerTests)] + return unittest.TestSuite(tests) def test(): runner = unittest.TextTestRunner() diff --git a/test/test_factory.py b/tests/factory.py similarity index 89% rename from test/test_factory.py rename to tests/factory.py index 992aaa0..f98f9dc 100644 --- a/test/test_factory.py +++ b/tests/factory.py @@ -47,7 +47,7 @@ class ConnectionFactoryTests(unittest.TestCase): def tearDown(self): self.con.close() - def CheckIsInstance(self): + def test_IsInstance(self): self.assertIsInstance(self.con, MyConnection) class CursorFactoryTests(unittest.TestCase): @@ -57,7 +57,7 @@ class CursorFactoryTests(unittest.TestCase): def tearDown(self): self.con.close() - def CheckIsInstance(self): + def test_IsInstance(self): cur = self.con.cursor() self.assertIsInstance(cur, sqlite.Cursor) cur = self.con.cursor(MyCursor) @@ -65,7 +65,7 @@ class CursorFactoryTests(unittest.TestCase): cur = self.con.cursor(factory=lambda con: MyCursor(con)) self.assertIsInstance(cur, MyCursor) - def CheckInvalidFactory(self): + def test_InvalidFactory(self): # not a callable at all self.assertRaises(TypeError, self.con.cursor, None) # invalid callable with not exact one argument @@ -77,7 +77,7 @@ class RowFactoryTestsBackwardsCompat(unittest.TestCase): def setUp(self): self.con = sqlite.connect(":memory:") - def CheckIsProducedByFactory(self): + def test_IsProducedByFactory(self): cur = self.con.cursor(factory=MyCursor) cur.execute("select 4+5 as foo") row = cur.fetchone() @@ -91,12 +91,12 @@ class RowFactoryTests(unittest.TestCase): def setUp(self): self.con = sqlite.connect(":memory:") - def CheckCustomFactory(self): + def test_CustomFactory(self): self.con.row_factory = lambda cur, row: list(row) row = self.con.execute("select 1, 2").fetchone() self.assertIsInstance(row, list) - def CheckSqliteRowIndex(self): + def test_SqliteRowIndex(self): self.con.row_factory = sqlite.Row row = self.con.execute("select 1 as a_1, 2 as b").fetchone() self.assertIsInstance(row, sqlite.Row) @@ -125,7 +125,7 @@ class RowFactoryTests(unittest.TestCase): with self.assertRaises(IndexError): row[2**1000] - def CheckSqliteRowIndexUnicode(self): + def test_SqliteRowIndexUnicode(self): self.con.row_factory = sqlite.Row row = self.con.execute("select 1 as \xff").fetchone() self.assertEqual(row["\xff"], 1) @@ -134,7 +134,7 @@ class RowFactoryTests(unittest.TestCase): with self.assertRaises(IndexError): row['\xdf'] - def CheckSqliteRowSlice(self): + def test_SqliteRowSlice(self): # A sqlite.Row can be sliced like a list. self.con.row_factory = sqlite.Row row = self.con.execute("select 1, 2, 3, 4").fetchone() @@ -152,21 +152,21 @@ class RowFactoryTests(unittest.TestCase): self.assertEqual(row[0:4:2], (1, 3)) self.assertEqual(row[3:0:-2], (4, 2)) - def CheckSqliteRowIter(self): + def test_SqliteRowIter(self): """Checks if the row object is iterable""" self.con.row_factory = sqlite.Row row = self.con.execute("select 1 as a, 2 as b").fetchone() for col in row: pass - def CheckSqliteRowAsTuple(self): + def test_SqliteRowAsTuple(self): """Checks if the row object can be converted to a tuple""" self.con.row_factory = sqlite.Row row = self.con.execute("select 1 as a, 2 as b").fetchone() t = tuple(row) self.assertEqual(t, (row['a'], row['b'])) - def CheckSqliteRowAsDict(self): + def test_SqliteRowAsDict(self): """Checks if the row object can be correctly converted to a dictionary""" self.con.row_factory = sqlite.Row row = self.con.execute("select 1 as a, 2 as b").fetchone() @@ -174,7 +174,7 @@ class RowFactoryTests(unittest.TestCase): self.assertEqual(d["a"], row["a"]) self.assertEqual(d["b"], row["b"]) - def CheckSqliteRowHashCmp(self): + def test_SqliteRowHashCmp(self): """Checks if the row object compares and hashes correctly""" self.con.row_factory = sqlite.Row row_1 = self.con.execute("select 1 as a, 2 as b").fetchone() @@ -208,7 +208,7 @@ class RowFactoryTests(unittest.TestCase): self.assertEqual(hash(row_1), hash(row_2)) - def CheckSqliteRowAsSequence(self): + def test_SqliteRowAsSequence(self): """ Checks if the row object can act like a sequence """ self.con.row_factory = sqlite.Row row = self.con.execute("select 1 as a, 2 as b").fetchone() @@ -217,7 +217,7 @@ class RowFactoryTests(unittest.TestCase): self.assertEqual(list(reversed(row)), list(reversed(as_tuple))) self.assertIsInstance(row, Sequence) - def CheckFakeCursorClass(self): + def test_FakeCursorClass(self): # Issue #24257: Incorrect use of PyObject_IsInstance() caused # segmentation fault. # Issue #27861: Also applies for cursor factory. @@ -234,26 +234,26 @@ class TextFactoryTests(unittest.TestCase): def setUp(self): self.con = sqlite.connect(":memory:") - def CheckUnicode(self): + def test_Unicode(self): austria = "Österreich" row = self.con.execute("select ?", (austria,)).fetchone() self.assertEqual(type(row[0]), str, "type of row[0] must be unicode") - def CheckString(self): + def test_String(self): self.con.text_factory = bytes austria = "Österreich" row = self.con.execute("select ?", (austria,)).fetchone() self.assertEqual(type(row[0]), bytes, "type of row[0] must be bytes") self.assertEqual(row[0], austria.encode("utf-8"), "column must equal original data in UTF-8") - def CheckCustom(self): + def test_Custom(self): self.con.text_factory = lambda x: str(x, "utf-8", "ignore") austria = "Österreich" row = self.con.execute("select ?", (austria,)).fetchone() self.assertEqual(type(row[0]), str, "type of row[0] must be unicode") self.assertTrue(row[0].endswith("reich"), "column must contain original data") - def CheckOptimizedUnicode(self): + def test_OptimizedUnicode(self): # In py3k, str objects are always returned when text_factory # is OptimizedUnicode self.con.text_factory = sqlite.OptimizedUnicode @@ -273,25 +273,25 @@ class TextFactoryTestsWithEmbeddedZeroBytes(unittest.TestCase): self.con.execute("create table test (value text)") self.con.execute("insert into test (value) values (?)", ("a\x00b",)) - def CheckString(self): + def test_String(self): # text_factory defaults to str row = self.con.execute("select value from test").fetchone() self.assertIs(type(row[0]), str) self.assertEqual(row[0], "a\x00b") - def CheckBytes(self): + def test_Bytes(self): self.con.text_factory = bytes row = self.con.execute("select value from test").fetchone() self.assertIs(type(row[0]), bytes) self.assertEqual(row[0], b"a\x00b") - def CheckBytearray(self): + def test_Bytearray(self): self.con.text_factory = bytearray row = self.con.execute("select value from test").fetchone() self.assertIs(type(row[0]), bytearray) self.assertEqual(row[0], b"a\x00b") - def CheckCustom(self): + def test_Custom(self): # A custom factory should receive a bytes argument self.con.text_factory = lambda x: x row = self.con.execute("select value from test").fetchone() @@ -302,13 +302,15 @@ class TextFactoryTestsWithEmbeddedZeroBytes(unittest.TestCase): self.con.close() def suite(): - connection_suite = unittest.makeSuite(ConnectionFactoryTests, "Check") - cursor_suite = unittest.makeSuite(CursorFactoryTests, "Check") - row_suite_compat = unittest.makeSuite(RowFactoryTestsBackwardsCompat, "Check") - row_suite = unittest.makeSuite(RowFactoryTests, "Check") - text_suite = unittest.makeSuite(TextFactoryTests, "Check") - text_zero_bytes_suite = unittest.makeSuite(TextFactoryTestsWithEmbeddedZeroBytes, "Check") - return unittest.TestSuite((connection_suite, cursor_suite, row_suite_compat, row_suite, text_suite, text_zero_bytes_suite)) + loader = unittest.TestLoader() + tests = [loader.loadTestsFromTestCase(t) for t in ( + ConnectionFactoryTests, + CursorFactoryTests, + RowFactoryTestsBackwardsCompat, + RowFactoryTests, + TextFactoryTests, + TextFactoryTestsWithEmbeddedZeroBytes)] + return unittest.TestSuite(tests) def test(): runner = unittest.TextTestRunner() diff --git a/test/test_hooks.py b/tests/hooks.py similarity index 81% rename from test/test_hooks.py rename to tests/hooks.py index 154a4b7..d0a1ede 100644 --- a/test/test_hooks.py +++ b/tests/hooks.py @@ -23,27 +23,27 @@ import os import unittest + from sqlcipher3 import dbapi2 as sqlite class CollationTests(unittest.TestCase): - def CheckCreateCollationNotString(self): + def test_CreateCollationNotString(self): con = sqlite.connect(":memory:") with self.assertRaises(TypeError): con.create_collation(None, lambda x, y: (x > y) - (x < y)) - def CheckCreateCollationNotCallable(self): + def test_CreateCollationNotCallable(self): con = sqlite.connect(":memory:") with self.assertRaises(TypeError) as cm: con.create_collation("X", 42) self.assertEqual(str(cm.exception), 'parameter must be callable') - def CheckCreateCollationNotAscii(self): + def test_CreateCollationNotAscii(self): con = sqlite.connect(":memory:") - with self.assertRaises(sqlite.ProgrammingError): - con.create_collation("collä", lambda x, y: (x > y) - (x < y)) + con.create_collation("collä", lambda x, y: (x > y) - (x < y)) - def CheckCreateCollationBadUpper(self): + def test_CreateCollationBadUpper(self): class BadUpperStr(str): def upper(self): return None @@ -62,7 +62,7 @@ class CollationTests(unittest.TestCase): @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 1), 'old SQLite versions crash on this test') - def CheckCollationIsUsed(self): + def test_CollationIsUsed(self): def mycoll(x, y): # reverse order return -((x > y) - (x < y)) @@ -87,7 +87,7 @@ class CollationTests(unittest.TestCase): result = con.execute(sql).fetchall() self.assertEqual(str(cm.exception), 'no such collation sequence: mycoll') - def CheckCollationReturnsLargeInteger(self): + def test_CollationReturnsLargeInteger(self): def mycoll(x, y): # reverse order return -((x > y) - (x < y)) * 2**32 @@ -106,7 +106,7 @@ class CollationTests(unittest.TestCase): self.assertEqual(result, [('c',), ('b',), ('a',)], msg="the expected order was not returned") - def CheckCollationRegisterTwice(self): + def test_CollationRegisterTwice(self): """ Register two different collation functions under the same name. Verify that the last one is actually used. @@ -120,7 +120,7 @@ class CollationTests(unittest.TestCase): self.assertEqual(result[0][0], 'b') self.assertEqual(result[1][0], 'a') - def CheckDeregisterCollation(self): + def test_DeregisterCollation(self): """ Register a collation, then deregister it. Make sure an error is raised if we try to use it. @@ -133,7 +133,7 @@ class CollationTests(unittest.TestCase): self.assertEqual(str(cm.exception), 'no such collation sequence: mycoll') class ProgressTests(unittest.TestCase): - def CheckProgressHandlerUsed(self): + def test_ProgressHandlerUsed(self): """ Test that the progress handler is invoked once it is set. """ @@ -149,7 +149,7 @@ class ProgressTests(unittest.TestCase): self.assertTrue(progress_calls) - def CheckOpcodeCount(self): + def test_OpcodeCount(self): """ Test that the opcode argument is respected. """ @@ -172,7 +172,7 @@ class ProgressTests(unittest.TestCase): second_count = len(progress_calls) self.assertGreaterEqual(first_count, second_count) - def CheckCancelOperation(self): + def test_CancelOperation(self): """ Test that returning a non-zero value stops the operation in progress. """ @@ -186,7 +186,7 @@ class ProgressTests(unittest.TestCase): curs.execute, "create table bar (a, b)") - def CheckClearHandler(self): + def test_ClearHandler(self): """ Test that setting the progress handler to None clears the previously set handler. """ @@ -202,7 +202,7 @@ class ProgressTests(unittest.TestCase): self.assertEqual(action, 0, "progress handler was not cleared") class TraceCallbackTests(unittest.TestCase): - def CheckTraceCallbackUsed(self): + def test_TraceCallbackUsed(self): """ Test that the trace callback is invoked once it is set. """ @@ -215,7 +215,7 @@ class TraceCallbackTests(unittest.TestCase): self.assertTrue(traced_statements) self.assertTrue(any("create table foo" in stmt for stmt in traced_statements)) - def CheckTraceCallbackError(self): + def test_TraceCallbackError(self): """ Test behavior when exception raised in trace callback. """ @@ -226,7 +226,7 @@ class TraceCallbackTests(unittest.TestCase): con.execute("create table foo(a, b)") con.set_trace_callback(None) - def CheckClearTraceCallback(self): + def test_ClearTraceCallback(self): """ Test that setting the trace callback to None clears the previously set callback. """ @@ -239,7 +239,7 @@ class TraceCallbackTests(unittest.TestCase): con.execute("create table foo(a, b)") self.assertFalse(traced_statements, "trace callback was not cleared") - def CheckUnicodeContent(self): + def test_UnicodeContent(self): """ Test that the statement can contain unicode literals. """ @@ -259,7 +259,7 @@ class TraceCallbackTests(unittest.TestCase): "Unicode data %s garbled in trace callback: %s" % (ascii(unicode_value), ', '.join(map(ascii, traced_statements)))) - def CheckTraceCallbackContent(self): + def test_TraceCallbackContent(self): # set_trace_callback() shouldn't produce duplicate content (bpo-26187) traced_statements = [] def trace(statement): @@ -278,11 +278,49 @@ class TraceCallbackTests(unittest.TestCase): self.assertEqual(traced_statements, queries) +class TestBusyHandlerTimeout(unittest.TestCase): + def test_busy_handler(self): + accum = [] + def custom_handler(n): + accum.append(n) + return 0 if n == 3 else 1 + + self.addCleanup(os.unlink, 'busy.db') + conn1 = sqlite.connect('busy.db') + conn2 = sqlite.connect('busy.db') + conn2.set_busy_handler(custom_handler) + + conn1.execute('begin exclusive') + with self.assertRaises(sqlite.OperationalError): + conn2.execute('create table test(id)') + self.assertEqual(accum, [0, 1, 2, 3]) + accum.clear() + + conn2.set_busy_handler(None) + with self.assertRaises(sqlite.OperationalError): + conn2.execute('create table test(id)') + self.assertEqual(accum, []) + + conn2.set_busy_handler(custom_handler) + with self.assertRaises(sqlite.OperationalError): + conn2.execute('create table test(id)') + self.assertEqual(accum, [0, 1, 2, 3]) + accum.clear() + + conn2.set_busy_timeout(0.01) # Clears busy handler. + with self.assertRaises(sqlite.OperationalError): + conn2.execute('create table test(id)') + self.assertEqual(accum, []) + + def suite(): - collation_suite = unittest.makeSuite(CollationTests, "Check") - progress_suite = unittest.makeSuite(ProgressTests, "Check") - trace_suite = unittest.makeSuite(TraceCallbackTests, "Check") - return unittest.TestSuite((collation_suite, progress_suite, trace_suite)) + loader = unittest.TestLoader() + tests = [loader.loadTestsFromTestCase(t) for t in ( + CollationTests, + ProgressTests, + TraceCallbackTests, + TestBusyHandlerTimeout)] + return unittest.TestSuite(tests) def test(): runner = unittest.TextTestRunner() diff --git a/test/test_regression.py b/tests/regression.py similarity index 90% rename from test/test_regression.py rename to tests/regression.py index ca67059..7a4c01e 100644 --- a/test/test_regression.py +++ b/tests/regression.py @@ -35,12 +35,12 @@ class RegressionTests(unittest.TestCase): def tearDown(self): self.con.close() - def CheckPragmaUserVersion(self): + def test_PragmaUserVersion(self): # This used to crash pysqlite because this pragma command returns NULL for the column name cur = self.con.cursor() cur.execute("pragma user_version") - def CheckPragmaSchemaVersion(self): + def test_PragmaSchemaVersion(self): # This still crashed pysqlite <= 2.2.1 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) try: @@ -50,7 +50,7 @@ class RegressionTests(unittest.TestCase): cur.close() con.close() - def CheckStatementReset(self): + def test_StatementReset(self): # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are # reset before a rollback, but only those that are still in the # statement cache. The others are not accessible from the connection object. @@ -65,7 +65,7 @@ class RegressionTests(unittest.TestCase): con.rollback() - def CheckColumnNameWithSpaces(self): + def test_ColumnNameWithSpaces(self): cur = self.con.cursor() cur.execute('select 1 as "foo bar [datetime]"') self.assertEqual(cur.description[0][0], "foo bar [datetime]") @@ -73,7 +73,7 @@ class RegressionTests(unittest.TestCase): cur.execute('select 1 as "foo baz"') self.assertEqual(cur.description[0][0], "foo baz") - def CheckStatementFinalizationOnCloseDb(self): + def test_StatementFinalizationOnCloseDb(self): # pysqlite versions <= 2.3.3 only finalized statements in the statement # cache when closing the database. statements that were still # referenced in cursors weren't closed and could provoke " @@ -88,7 +88,7 @@ class RegressionTests(unittest.TestCase): con.close() @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2), 'needs sqlite 3.2.2 or newer') - def CheckOnConflictRollback(self): + def test_OnConflictRollback(self): con = sqlite.connect(":memory:") con.execute("create table foo(x, unique(x) on conflict rollback)") con.execute("insert into foo(x) values (1)") @@ -102,7 +102,7 @@ class RegressionTests(unittest.TestCase): except sqlite.OperationalError: self.fail("pysqlite knew nothing about the implicit ROLLBACK") - def CheckWorkaroundForBuggySqliteTransferBindings(self): + def test_WorkaroundForBuggySqliteTransferBindings(self): """ pysqlite would crash with older SQLite versions unless a workaround is implemented. @@ -111,14 +111,14 @@ class RegressionTests(unittest.TestCase): self.con.execute("drop table foo") self.con.execute("create table foo(bar)") - def CheckEmptyStatement(self): + def test_EmptyStatement(self): """ pysqlite used to segfault with SQLite versions 3.5.x. These return NULL for "no-operation" statements """ self.con.execute("") - def CheckTypeMapUsage(self): + def test_TypeMapUsage(self): """ pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling a statement. This test exhibits the problem. @@ -133,7 +133,7 @@ class RegressionTests(unittest.TestCase): con.execute("insert into foo(bar) values (5)") con.execute(SELECT) - def CheckErrorMsgDecodeError(self): + def test_ErrorMsgDecodeError(self): # When porting the module to Python 3.0, the error message about # decoding errors disappeared. This verifies they're back again. with self.assertRaises(sqlite.OperationalError) as cm: @@ -142,13 +142,13 @@ class RegressionTests(unittest.TestCase): msg = "Could not decode to UTF-8 column 'colname' with text 'xxx" self.assertIn(msg, str(cm.exception)) - def CheckRegisterAdapter(self): + def test_RegisterAdapter(self): """ See issue 3312. """ self.assertRaises(TypeError, sqlite.register_adapter, {}, None) - def CheckSetIsolationLevel(self): + def test_SetIsolationLevel(self): # See issue 27881. class CustomStr(str): def upper(self): @@ -170,7 +170,7 @@ class RegressionTests(unittest.TestCase): con.isolation_level = "DEFERRED" pairs = [ (1, TypeError), (b'', TypeError), ("abc", ValueError), - ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError), + ("\xe9", ValueError), ] for value, exc in pairs: with self.subTest(level=value): @@ -178,7 +178,7 @@ class RegressionTests(unittest.TestCase): con.isolation_level = value self.assertEqual(con.isolation_level, "DEFERRED") - def CheckCursorConstructorCallCheck(self): + def test_CursorConstructorCallCheck(self): """ Verifies that cursor methods check whether base class __init__ was called. @@ -195,14 +195,14 @@ class RegressionTests(unittest.TestCase): r'^Base Cursor\.__init__ not called\.$'): cur.close() - def CheckStrSubclass(self): + def test_StrSubclass(self): """ The Python 3.0 port of the module didn't cope with values of subclasses of str. """ class MyStr(str): pass self.con.execute("select ?", (MyStr("abc"),)) - def CheckConnectionConstructorCallCheck(self): + def test_ConnectionConstructorCallCheck(self): """ Verifies that connection methods check whether base class __init__ was called. @@ -215,7 +215,7 @@ class RegressionTests(unittest.TestCase): with self.assertRaises(sqlite.ProgrammingError): cur = con.cursor() - def CheckCursorRegistration(self): + def test_CursorRegistration(self): """ Verifies that subclassed cursor classes are correctly registered with the connection object, too. (fetch-across-rollback problem) @@ -237,7 +237,7 @@ class RegressionTests(unittest.TestCase): with self.assertRaises(sqlite.InterfaceError): cur.fetchall() - def CheckAutoCommit(self): + def test_AutoCommit(self): """ Verifies that creating a connection in autocommit mode works. 2.5.3 introduced a regression so that these could no longer @@ -245,7 +245,7 @@ class RegressionTests(unittest.TestCase): """ con = sqlite.connect(":memory:", isolation_level=None) - def CheckPragmaAutocommit(self): + def test_PragmaAutocommit(self): """ Verifies that running a PRAGMA statement that does an autocommit does work. This did not work in 2.5.3/2.5.4. @@ -257,21 +257,21 @@ class RegressionTests(unittest.TestCase): cur.execute("pragma page_size") row = cur.fetchone() - def CheckConnectionCall(self): + def test_ConnectionCall(self): """ Call a connection with a non-string SQL request: check error handling of the statement constructor. """ self.assertRaises(TypeError, self.con, 1) - def CheckCollation(self): + def test_Collation(self): def collation_cb(a, b): return 1 - self.assertRaises(sqlite.ProgrammingError, self.con.create_collation, + self.assertRaises(UnicodeEncodeError, self.con.create_collation, # Lone surrogate cannot be encoded to the default encoding (utf8) "\uDC80", collation_cb) - def CheckRecursiveCursorUse(self): + def test_RecursiveCursorUse(self): """ http://bugs.python.org/issue10811 @@ -292,7 +292,7 @@ class RegressionTests(unittest.TestCase): cur.executemany("insert into b (baz) values (?)", ((i,) for i in foo())) - def CheckConvertTimestampMicrosecondPadding(self): + def test_ConvertTimestampMicrosecondPadding(self): """ http://bugs.python.org/issue14720 @@ -318,13 +318,13 @@ class RegressionTests(unittest.TestCase): datetime.datetime(2012, 4, 4, 15, 6, 0, 123456), ]) - def CheckInvalidIsolationLevelType(self): + def test_InvalidIsolationLevelType(self): # isolation level is a string, not an integer self.assertRaises(TypeError, sqlite.connect, ":memory:", isolation_level=123) - def CheckNullCharacter(self): + def test_NullCharacter(self): # Issue #21147 con = sqlite.connect(":memory:") self.assertRaises(ValueError, con, "\0select 1") @@ -333,7 +333,7 @@ class RegressionTests(unittest.TestCase): self.assertRaises(ValueError, cur.execute, " \0select 2") self.assertRaises(ValueError, cur.execute, "select 2\0") - def CheckCommitCursorReset(self): + def test_CommitCursorReset(self): """ Connection.commit() did reset cursors, which made sqlite3 to return rows multiple times when fetched from cursors @@ -364,7 +364,7 @@ class RegressionTests(unittest.TestCase): counter += 1 self.assertEqual(counter, 3, "should have returned exactly three rows") - def CheckBpo31770(self): + def test_Bpo31770(self): """ The interpreter shouldn't crash in case Cursor.__init__() is called more than once. @@ -380,11 +380,11 @@ class RegressionTests(unittest.TestCase): del ref #support.gc_collect() - def CheckDelIsolation_levelSegfault(self): + def test_DelIsolation_levelSegfault(self): with self.assertRaises(AttributeError): del self.con.isolation_level - def CheckBpo37347(self): + def test_Bpo37347(self): class Printer: def log(self, *args): return sqlite.SQLITE_OK @@ -400,10 +400,10 @@ class RegressionTests(unittest.TestCase): def suite(): - regression_suite = unittest.makeSuite(RegressionTests, "Check") - return unittest.TestSuite(( - regression_suite, - )) + loader = unittest.TestLoader() + tests = [loader.loadTestsFromTestCase(t) for t in ( + RegressionTests,)] + return unittest.TestSuite(tests) def test(): runner = unittest.TextTestRunner() diff --git a/test/test_transactions.py b/tests/transactions.py similarity index 91% rename from test/test_transactions.py rename to tests/transactions.py index fff320c..038f83a 100644 --- a/test/test_transactions.py +++ b/tests/transactions.py @@ -52,7 +52,7 @@ class TransactionTests(unittest.TestCase): except OSError: pass - def CheckDMLDoesNotAutoCommitBefore(self): + def test_DMLDoesNotAutoCommitBefore(self): self.cur1.execute("create table test(i)") self.cur1.execute("insert into test(i) values (5)") self.cur1.execute("create table test2(j)") @@ -60,14 +60,14 @@ class TransactionTests(unittest.TestCase): res = self.cur2.fetchall() self.assertEqual(len(res), 0) - def CheckInsertStartsTransaction(self): + def test_InsertStartsTransaction(self): self.cur1.execute("create table test(i)") self.cur1.execute("insert into test(i) values (5)") self.cur2.execute("select i from test") res = self.cur2.fetchall() self.assertEqual(len(res), 0) - def CheckUpdateStartsTransaction(self): + def test_UpdateStartsTransaction(self): self.cur1.execute("create table test(i)") self.cur1.execute("insert into test(i) values (5)") self.con1.commit() @@ -76,7 +76,7 @@ class TransactionTests(unittest.TestCase): res = self.cur2.fetchone()[0] self.assertEqual(res, 5) - def CheckDeleteStartsTransaction(self): + def test_DeleteStartsTransaction(self): self.cur1.execute("create table test(i)") self.cur1.execute("insert into test(i) values (5)") self.con1.commit() @@ -85,7 +85,7 @@ class TransactionTests(unittest.TestCase): res = self.cur2.fetchall() self.assertEqual(len(res), 1) - def CheckReplaceStartsTransaction(self): + def test_ReplaceStartsTransaction(self): self.cur1.execute("create table test(i)") self.cur1.execute("insert into test(i) values (5)") self.con1.commit() @@ -95,7 +95,7 @@ class TransactionTests(unittest.TestCase): self.assertEqual(len(res), 1) self.assertEqual(res[0][0], 5) - def CheckToggleAutoCommit(self): + def test_ToggleAutoCommit(self): self.cur1.execute("create table test(i)") self.cur1.execute("insert into test(i) values (5)") self.con1.isolation_level = None @@ -113,7 +113,7 @@ class TransactionTests(unittest.TestCase): @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2), 'test hangs on sqlite versions older than 3.2.2') - def CheckRaiseTimeout(self): + def test_RaiseTimeout(self): self.cur1.execute("create table test(i)") self.cur1.execute("insert into test(i) values (5)") with self.assertRaises(sqlite.OperationalError): @@ -121,7 +121,7 @@ class TransactionTests(unittest.TestCase): @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2), 'test hangs on sqlite versions older than 3.2.2') - def CheckLocking(self): + def test_Locking(self): """ This tests the improved concurrency with pysqlite 2.3.4. You needed to roll back con2 before you could commit con1. @@ -133,7 +133,7 @@ class TransactionTests(unittest.TestCase): # NO self.con2.rollback() HERE!!! self.con1.commit() - def CheckRollbackCursorConsistency(self): + def test_RollbackCursorConsistency(self): """ Checks if cursors on the connection are set into a "reset" state when a rollback is done on the connection. @@ -153,12 +153,12 @@ class SpecialCommandTests(unittest.TestCase): self.con = sqlite.connect(":memory:") self.cur = self.con.cursor() - def CheckDropTable(self): + def test_DropTable(self): self.cur.execute("create table test(i)") self.cur.execute("insert into test(i) values (5)") self.cur.execute("drop table test") - def CheckPragma(self): + def test_Pragma(self): self.cur.execute("create table test(i)") self.cur.execute("insert into test(i) values (5)") self.cur.execute("pragma count_changes=1") @@ -171,7 +171,7 @@ class TransactionalDDL(unittest.TestCase): def setUp(self): self.con = sqlite.connect(":memory:") - def CheckDdlDoesNotAutostartTransaction(self): + def test_DdlDoesNotAutostartTransaction(self): # For backwards compatibility reasons, DDL statements should not # implicitly start a transaction. self.con.execute("create table test(i)") @@ -184,7 +184,7 @@ class TransactionalDDL(unittest.TestCase): result = self.con.execute("select * from test2").fetchall() self.assertEqual(result, []) - def CheckImmediateTransactionalDDL(self): + def test_ImmediateTransactionalDDL(self): # You can achieve transactional DDL by issuing a BEGIN # statement manually. self.con.execute("begin immediate") @@ -193,7 +193,7 @@ class TransactionalDDL(unittest.TestCase): with self.assertRaises(sqlite.OperationalError): self.con.execute("select * from test") - def CheckTransactionalDDL(self): + def test_TransactionalDDL(self): # You can achieve transactional DDL by issuing a BEGIN # statement manually. self.con.execute("begin") @@ -285,11 +285,13 @@ class DMLStatementDetectionTestCase(unittest.TestCase): def suite(): - default_suite = unittest.makeSuite(TransactionTests, "Check") - special_command_suite = unittest.makeSuite(SpecialCommandTests, "Check") - ddl_suite = unittest.makeSuite(TransactionalDDL, "Check") - dml_suite = unittest.makeSuite(DMLStatementDetectionTestCase) - return unittest.TestSuite((default_suite, special_command_suite, ddl_suite, dml_suite)) + loader = unittest.TestLoader() + tests = [loader.loadTestsFromTestCase(t) for t in ( + TransactionTests, + SpecialCommandTests, + TransactionalDDL, + DMLStatementDetectionTestCase)] + return unittest.TestSuite(tests) def test(): runner = unittest.TextTestRunner() diff --git a/test/test_ttypes.py b/tests/ttypes.py similarity index 89% rename from test/test_ttypes.py rename to tests/ttypes.py index 78706b7..69a8ac8 100644 --- a/test/test_ttypes.py +++ b/tests/ttypes.py @@ -40,33 +40,33 @@ class SqliteTypeTests(unittest.TestCase): self.cur.close() self.con.close() - def CheckString(self): + def test_String(self): self.cur.execute("insert into test(s) values (?)", ("Österreich",)) self.cur.execute("select s from test") row = self.cur.fetchone() self.assertEqual(row[0], "Österreich") - def CheckSmallInt(self): + def test_SmallInt(self): self.cur.execute("insert into test(i) values (?)", (42,)) self.cur.execute("select i from test") row = self.cur.fetchone() self.assertEqual(row[0], 42) - def CheckLargeInt(self): + def test_LargeInt(self): num = 2**40 self.cur.execute("insert into test(i) values (?)", (num,)) self.cur.execute("select i from test") row = self.cur.fetchone() self.assertEqual(row[0], num) - def CheckFloat(self): + def test_Float(self): val = 3.14 self.cur.execute("insert into test(f) values (?)", (val,)) self.cur.execute("select f from test") row = self.cur.fetchone() self.assertEqual(row[0], val) - def CheckBlob(self): + def test_Blob(self): sample = b"Guglhupf" val = memoryview(sample) self.cur.execute("insert into test(b) values (?)", (val,)) @@ -74,7 +74,7 @@ class SqliteTypeTests(unittest.TestCase): row = self.cur.fetchone() self.assertEqual(row[0], sample) - def CheckUnicodeExecute(self): + def test_UnicodeExecute(self): self.cur.execute("select 'Österreich'") row = self.cur.fetchone() self.assertEqual(row[0], "Österreich") @@ -133,21 +133,21 @@ class DeclTypesTests(unittest.TestCase): self.cur.close() self.con.close() - def CheckString(self): + def test_String(self): # default self.cur.execute("insert into test(s) values (?)", ("foo",)) self.cur.execute('select s as "s [WRONG]" from test') row = self.cur.fetchone() self.assertEqual(row[0], "foo") - def CheckSmallInt(self): + def test_SmallInt(self): # default self.cur.execute("insert into test(i) values (?)", (42,)) self.cur.execute("select i from test") row = self.cur.fetchone() self.assertEqual(row[0], 42) - def CheckLargeInt(self): + def test_LargeInt(self): # default num = 2**40 self.cur.execute("insert into test(i) values (?)", (num,)) @@ -155,7 +155,7 @@ class DeclTypesTests(unittest.TestCase): row = self.cur.fetchone() self.assertEqual(row[0], num) - def CheckFloat(self): + def test_Float(self): # custom val = 3.14 self.cur.execute("insert into test(f) values (?)", (val,)) @@ -163,7 +163,7 @@ class DeclTypesTests(unittest.TestCase): row = self.cur.fetchone() self.assertEqual(row[0], 47.2) - def CheckBool(self): + def test_Bool(self): # custom self.cur.execute("insert into test(b) values (?)", (False,)) self.cur.execute("select b from test") @@ -176,7 +176,7 @@ class DeclTypesTests(unittest.TestCase): row = self.cur.fetchone() self.assertIs(row[0], True) - def CheckUnicode(self): + def test_Unicode(self): # default val = "\xd6sterreich" self.cur.execute("insert into test(u) values (?)", (val,)) @@ -184,14 +184,14 @@ class DeclTypesTests(unittest.TestCase): row = self.cur.fetchone() self.assertEqual(row[0], val) - def CheckFoo(self): + def test_Foo(self): val = DeclTypesTests.Foo("bla") self.cur.execute("insert into test(foo) values (?)", (val,)) self.cur.execute("select foo from test") row = self.cur.fetchone() self.assertEqual(row[0], val) - def CheckErrorInConform(self): + def test_ErrorInConform(self): val = DeclTypesTests.BadConform(TypeError) with self.assertRaises(sqlite.InterfaceError): self.cur.execute("insert into test(bad) values (?)", (val,)) @@ -204,19 +204,19 @@ class DeclTypesTests(unittest.TestCase): with self.assertRaises(KeyboardInterrupt): self.cur.execute("insert into test(bad) values (:val)", {"val": val}) - def CheckUnsupportedSeq(self): + def test_UnsupportedSeq(self): class Bar: pass val = Bar() with self.assertRaises(sqlite.InterfaceError): self.cur.execute("insert into test(f) values (?)", (val,)) - def CheckUnsupportedDict(self): + def test_UnsupportedDict(self): class Bar: pass val = Bar() with self.assertRaises(sqlite.InterfaceError): self.cur.execute("insert into test(f) values (:val)", {"val": val}) - def CheckBlob(self): + def test_Blob(self): # default sample = b"Guglhupf" val = memoryview(sample) @@ -225,13 +225,13 @@ class DeclTypesTests(unittest.TestCase): row = self.cur.fetchone() self.assertEqual(row[0], sample) - def CheckNumber1(self): + def test_Number1(self): self.cur.execute("insert into test(n1) values (5)") value = self.cur.execute("select n1 from test").fetchone()[0] # if the converter is not used, it's an int instead of a float self.assertEqual(type(value), float) - def CheckNumber2(self): + def test_Number2(self): """Checks whether converter names are cut off at '(' characters""" self.cur.execute("insert into test(n2) values (5)") value = self.cur.execute("select n2 from test").fetchone()[0] @@ -257,7 +257,7 @@ class ColNamesTests(unittest.TestCase): self.cur.close() self.con.close() - def CheckDeclTypeNotUsed(self): + def test_DeclTypeNotUsed(self): """ Assures that the declared type is not used when PARSE_DECLTYPES is not set. @@ -267,13 +267,13 @@ class ColNamesTests(unittest.TestCase): val = self.cur.fetchone()[0] self.assertEqual(val, "xxx") - def CheckNone(self): + def test_None(self): self.cur.execute("insert into test(x) values (?)", (None,)) self.cur.execute("select x from test") val = self.cur.fetchone()[0] self.assertEqual(val, None) - def CheckColName(self): + def test_ColName(self): self.cur.execute("insert into test(x) values (?)", ("xxx",)) self.cur.execute('select x as "x y [bar]" from test') val = self.cur.fetchone()[0] @@ -283,12 +283,12 @@ class ColNamesTests(unittest.TestCase): # whitespace should be stripped. self.assertEqual(self.cur.description[0][0], "x y") - def CheckCaseInConverterName(self): + def test_CaseInConverterName(self): self.cur.execute("select 'other' as \"x [b1b1]\"") val = self.cur.fetchone()[0] self.assertEqual(val, "MARKER") - def CheckCursorDescriptionNoRow(self): + def test_CursorDescriptionNoRow(self): """ cursor.description should at least provide the column name(s), even if no row returned. @@ -296,7 +296,7 @@ class ColNamesTests(unittest.TestCase): self.cur.execute("select * from test where 0 = 1") self.assertEqual(self.cur.description[0][0], "x") - def CheckCursorDescriptionInsert(self): + def test_CursorDescriptionInsert(self): self.cur.execute("insert into test values (1)") self.assertIsNone(self.cur.description) @@ -313,19 +313,19 @@ class CommonTableExpressionTests(unittest.TestCase): self.cur.close() self.con.close() - def CheckCursorDescriptionCTESimple(self): + def test_CursorDescriptionCTESimple(self): self.cur.execute("with one as (select 1) select * from one") self.assertIsNotNone(self.cur.description) self.assertEqual(self.cur.description[0][0], "1") - def CheckCursorDescriptionCTESMultipleColumns(self): + def test_CursorDescriptionCTESMultipleColumns(self): self.cur.execute("insert into test values(1)") self.cur.execute("insert into test values(2)") self.cur.execute("with testCTE as (select * from test) select * from testCTE") self.assertIsNotNone(self.cur.description) self.assertEqual(self.cur.description[0][0], "x") - def CheckCursorDescriptionCTE(self): + def test_CursorDescriptionCTE(self): self.cur.execute("insert into test values (1)") self.cur.execute("with bar as (select * from test) select * from test where x = 1") self.assertIsNotNone(self.cur.description) @@ -354,7 +354,7 @@ class ObjectAdaptationTests(unittest.TestCase): self.cur.close() self.con.close() - def CheckCasterIsUsed(self): + def test_CasterIsUsed(self): self.cur.execute("select ?", (4,)) val = self.cur.fetchone()[0] self.assertEqual(type(val), float) @@ -372,7 +372,7 @@ class BinaryConverterTests(unittest.TestCase): def tearDown(self): self.con.close() - def CheckBinaryInputForConverter(self): + def test_BinaryInputForConverter(self): testdata = b"abcdefg" * 10 result = self.con.execute('select ? as "x [bin]"', (memoryview(zlib.compress(testdata)),)).fetchone()[0] self.assertEqual(testdata, result) @@ -387,14 +387,14 @@ class DateTimeTests(unittest.TestCase): self.cur.close() self.con.close() - def CheckSqliteDate(self): + def test_SqliteDate(self): d = sqlite.Date(2004, 2, 14) self.cur.execute("insert into test(d) values (?)", (d,)) self.cur.execute("select d from test") d2 = self.cur.fetchone()[0] self.assertEqual(d, d2) - def CheckSqliteTimestamp(self): + def test_SqliteTimestamp(self): ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0) self.cur.execute("insert into test(ts) values (?)", (ts,)) self.cur.execute("select ts from test") @@ -403,7 +403,7 @@ class DateTimeTests(unittest.TestCase): @unittest.skipIf(sqlite.sqlite_version_info < (3, 1), 'the date functions are available on 3.1 or later') - def CheckSqlTimestamp(self): + def test_SqlTimestamp(self): now = datetime.datetime.utcnow() self.cur.execute("insert into test(ts) values (current_timestamp)") self.cur.execute("select ts from test") @@ -411,14 +411,14 @@ class DateTimeTests(unittest.TestCase): self.assertEqual(type(ts), datetime.datetime) self.assertEqual(ts.year, now.year) - def CheckDateTimeSubSeconds(self): + def test_DateTimeSubSeconds(self): ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 500000) self.cur.execute("insert into test(ts) values (?)", (ts,)) self.cur.execute("select ts from test") ts2 = self.cur.fetchone()[0] self.assertEqual(ts, ts2) - def CheckDateTimeSubSecondsFloatingPoint(self): + def test_DateTimeSubSecondsFloatingPoint(self): ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 510241) self.cur.execute("insert into test(ts) values (?)", (ts,)) self.cur.execute("select ts from test") @@ -426,14 +426,16 @@ class DateTimeTests(unittest.TestCase): self.assertEqual(ts, ts2) def suite(): - sqlite_type_suite = unittest.makeSuite(SqliteTypeTests, "Check") - decltypes_type_suite = unittest.makeSuite(DeclTypesTests, "Check") - colnames_type_suite = unittest.makeSuite(ColNamesTests, "Check") - adaptation_suite = unittest.makeSuite(ObjectAdaptationTests, "Check") - bin_suite = unittest.makeSuite(BinaryConverterTests, "Check") - date_suite = unittest.makeSuite(DateTimeTests, "Check") - cte_suite = unittest.makeSuite(CommonTableExpressionTests, "Check") - return unittest.TestSuite((sqlite_type_suite, decltypes_type_suite, colnames_type_suite, adaptation_suite, bin_suite, date_suite, cte_suite)) + loader = unittest.TestLoader() + tests = [loader.loadTestsFromTestCase(t) for t in ( + SqliteTypeTests, + DeclTypesTests, + ColNamesTests, + ObjectAdaptationTests, + BinaryConverterTests, + DateTimeTests, + CommonTableExpressionTests)] + return unittest.TestSuite(tests) def test(): runner = unittest.TextTestRunner() diff --git a/test/test_userfunctions.py b/tests/userfunctions.py similarity index 89% rename from test/test_userfunctions.py rename to tests/userfunctions.py index c5f53c0..a0d0487 100644 --- a/test/test_userfunctions.py +++ b/tests/userfunctions.py @@ -162,11 +162,11 @@ class FunctionTests(unittest.TestCase): def tearDown(self): self.con.close() - def CheckFuncErrorOnCreate(self): + def test_FuncErrorOnCreate(self): with self.assertRaises(sqlite.OperationalError): self.con.create_function("bla", -100, lambda x: 2*x) - def CheckFuncRefCount(self): + def test_FuncRefCount(self): def getfunc(): def f(): return 1 @@ -178,28 +178,28 @@ class FunctionTests(unittest.TestCase): cur = self.con.cursor() cur.execute("select reftest()") - def CheckFuncReturnText(self): + def test_FuncReturnText(self): cur = self.con.cursor() cur.execute("select returntext()") val = cur.fetchone()[0] self.assertEqual(type(val), str) self.assertEqual(val, "foo") - def CheckFuncReturnUnicode(self): + def test_FuncReturnUnicode(self): cur = self.con.cursor() cur.execute("select returnunicode()") val = cur.fetchone()[0] self.assertEqual(type(val), str) self.assertEqual(val, "bar") - def CheckFuncReturnInt(self): + def test_FuncReturnInt(self): cur = self.con.cursor() cur.execute("select returnint()") val = cur.fetchone()[0] self.assertEqual(type(val), int) self.assertEqual(val, 42) - def CheckFuncReturnFloat(self): + def test_FuncReturnFloat(self): cur = self.con.cursor() cur.execute("select returnfloat()") val = cur.fetchone()[0] @@ -207,90 +207,90 @@ class FunctionTests(unittest.TestCase): if val < 3.139 or val > 3.141: self.fail("wrong value") - def CheckFuncReturnNull(self): + def test_FuncReturnNull(self): cur = self.con.cursor() cur.execute("select returnnull()") val = cur.fetchone()[0] self.assertEqual(type(val), type(None)) self.assertEqual(val, None) - def CheckFuncReturnBlob(self): + def test_FuncReturnBlob(self): cur = self.con.cursor() cur.execute("select returnblob()") val = cur.fetchone()[0] self.assertEqual(type(val), bytes) self.assertEqual(val, b"blob") - def CheckFuncReturnLongLong(self): + def test_FuncReturnLongLong(self): cur = self.con.cursor() cur.execute("select returnlonglong()") val = cur.fetchone()[0] self.assertEqual(val, 1<<31) - def CheckFuncException(self): + def test_FuncException(self): cur = self.con.cursor() with self.assertRaises(sqlite.OperationalError) as cm: cur.execute("select raiseexception()") cur.fetchone() self.assertEqual(str(cm.exception), 'user-defined function raised exception') - def CheckParamString(self): + def test_ParamString(self): cur = self.con.cursor() cur.execute("select isstring(?)", ("foo",)) val = cur.fetchone()[0] self.assertEqual(val, 1) - def CheckParamInt(self): + def test_ParamInt(self): cur = self.con.cursor() cur.execute("select isint(?)", (42,)) val = cur.fetchone()[0] self.assertEqual(val, 1) - def CheckParamFloat(self): + def test_ParamFloat(self): cur = self.con.cursor() cur.execute("select isfloat(?)", (3.14,)) val = cur.fetchone()[0] self.assertEqual(val, 1) - def CheckParamNone(self): + def test_ParamNone(self): cur = self.con.cursor() cur.execute("select isnone(?)", (None,)) val = cur.fetchone()[0] self.assertEqual(val, 1) - def CheckParamBlob(self): + def test_ParamBlob(self): cur = self.con.cursor() cur.execute("select isblob(?)", (memoryview(b"blob"),)) val = cur.fetchone()[0] self.assertEqual(val, 1) - def CheckParamLongLong(self): + def test_ParamLongLong(self): cur = self.con.cursor() cur.execute("select islonglong(?)", (1<<42,)) val = cur.fetchone()[0] self.assertEqual(val, 1) - def CheckAnyArguments(self): + def test_AnyArguments(self): cur = self.con.cursor() cur.execute("select spam(?, ?)", (1, 2)) val = cur.fetchone()[0] self.assertEqual(val, 2) - def CheckFuncNonDeterministic(self): + def test_FuncNonDeterministic(self): mock = unittest.mock.Mock(return_value=None) self.con.create_function("deterministic", 0, mock, deterministic=False) self.con.execute("select deterministic() = deterministic()") self.assertEqual(mock.call_count, 2) @unittest.skipIf(sqlite.sqlite_version_info < (3, 8, 3), "deterministic parameter not supported") - def CheckFuncDeterministic(self): + def test_FuncDeterministic(self): mock = unittest.mock.Mock(return_value=None) self.con.create_function("deterministic", 0, mock, True) self.con.execute("select 1 where deterministic() AND deterministic()") self.assertEqual(mock.call_count, 1) @unittest.skipIf(sqlite.sqlite_version_info >= (3, 8, 3), "SQLite < 3.8.3 needed") - def CheckFuncDeterministicNotSupported(self): + def test_FuncDeterministicNotSupported(self): with self.assertRaises(sqlite.NotSupportedError): self.con.create_function("deterministic", 0, int, deterministic=True) @@ -325,81 +325,81 @@ class AggregateTests(unittest.TestCase): #self.con.close() pass - def CheckAggrErrorOnCreate(self): + def test_AggrErrorOnCreate(self): with self.assertRaises(sqlite.OperationalError): self.con.create_function("bla", -100, AggrSum) - def CheckAggrNoStep(self): + def test_AggrNoStep(self): cur = self.con.cursor() with self.assertRaises(AttributeError) as cm: cur.execute("select nostep(t) from test") self.assertEqual(str(cm.exception), "'AggrNoStep' object has no attribute 'step'") - def CheckAggrNoFinalize(self): + def test_AggrNoFinalize(self): cur = self.con.cursor() with self.assertRaises(sqlite.OperationalError) as cm: cur.execute("select nofinalize(t) from test") val = cur.fetchone()[0] self.assertEqual(str(cm.exception), "user-defined aggregate's 'finalize' method raised error") - def CheckAggrExceptionInInit(self): + def test_AggrExceptionInInit(self): cur = self.con.cursor() with self.assertRaises(sqlite.OperationalError) as cm: cur.execute("select excInit(t) from test") val = cur.fetchone()[0] self.assertEqual(str(cm.exception), "user-defined aggregate's '__init__' method raised error") - def CheckAggrExceptionInStep(self): + def test_AggrExceptionInStep(self): cur = self.con.cursor() with self.assertRaises(sqlite.OperationalError) as cm: cur.execute("select excStep(t) from test") val = cur.fetchone()[0] self.assertEqual(str(cm.exception), "user-defined aggregate's 'step' method raised error") - def CheckAggrExceptionInFinalize(self): + def test_AggrExceptionInFinalize(self): cur = self.con.cursor() with self.assertRaises(sqlite.OperationalError) as cm: cur.execute("select excFinalize(t) from test") val = cur.fetchone()[0] self.assertEqual(str(cm.exception), "user-defined aggregate's 'finalize' method raised error") - def CheckAggrCheckParamStr(self): + def test_AggrCheckParamStr(self): cur = self.con.cursor() cur.execute("select checkType('str', ?)", ("foo",)) val = cur.fetchone()[0] self.assertEqual(val, 1) - def CheckAggrCheckParamInt(self): + def test_AggrCheckParamInt(self): cur = self.con.cursor() cur.execute("select checkType('int', ?)", (42,)) val = cur.fetchone()[0] self.assertEqual(val, 1) - def CheckAggrCheckParamsInt(self): + def test_AggrCheckParamsInt(self): cur = self.con.cursor() cur.execute("select checkTypes('int', ?, ?)", (42, 24)) val = cur.fetchone()[0] self.assertEqual(val, 2) - def CheckAggrCheckParamFloat(self): + def test_AggrCheckParamFloat(self): cur = self.con.cursor() cur.execute("select checkType('float', ?)", (3.14,)) val = cur.fetchone()[0] self.assertEqual(val, 1) - def CheckAggrCheckParamNone(self): + def test_AggrCheckParamNone(self): cur = self.con.cursor() cur.execute("select checkType('None', ?)", (None,)) val = cur.fetchone()[0] self.assertEqual(val, 1) - def CheckAggrCheckParamBlob(self): + def test_AggrCheckParamBlob(self): cur = self.con.cursor() cur.execute("select checkType('blob', ?)", (memoryview(b"blob"),)) val = cur.fetchone()[0] self.assertEqual(val, 1) - def CheckAggrCheckAggrSum(self): + def test_AggrCheckAggrSum(self): cur = self.con.cursor() cur.execute("delete from test") cur.executemany("insert into test(i) values (?)", [(10,), (20,), (30,)]) @@ -407,7 +407,7 @@ class AggregateTests(unittest.TestCase): val = cur.fetchone()[0] self.assertEqual(val, 60) - def CheckAggrNoMatch(self): + def test_AggrNoMatch(self): cur = self.con.execute('select mysum(i) from (select 1 as i) where i == 0') val = cur.fetchone()[0] self.assertIsNone(val) @@ -516,19 +516,16 @@ class AuthorizerLargeIntegerTests(AuthorizerTests): def suite(): - function_suite = unittest.makeSuite(FunctionTests, "Check") - aggregate_suite = unittest.makeSuite(AggregateTests, "Check") - window_suite = unittest.makeSuite(WindowFunctionTests) - authorizer_suite = unittest.makeSuite(AuthorizerTests) - return unittest.TestSuite(( - function_suite, - aggregate_suite, - window_suite, - authorizer_suite, - unittest.makeSuite(AuthorizerRaiseExceptionTests), - unittest.makeSuite(AuthorizerIllegalTypeTests), - unittest.makeSuite(AuthorizerLargeIntegerTests), - )) + loader = unittest.TestLoader() + tests = [loader.loadTestsFromTestCase(t) for t in ( + FunctionTests, + AggregateTests, + WindowFunctionTests, + AuthorizerTests, + AuthorizerRaiseExceptionTests, + AuthorizerIllegalTypeTests, + AuthorizerLargeIntegerTests)] + return unittest.TestSuite(tests) def test(): runner = unittest.TextTestRunner()