Merge enhancements from pysqlite3, errorcode/name and blob i/o.

This commit is contained in:
Charles Leifer 2019-08-09 11:00:40 -05:00
parent 9109f9faf1
commit a524fd33bc
10 changed files with 1168 additions and 24 deletions

View file

@ -136,8 +136,8 @@ class BackupTests(unittest.TestCase):
def test_database_source_name(self):
with sqlite.connect(':memory:') as bck:
self.cx.backup(bck, name='main')
with sqlite.connect(':memory:') as bck:
self.cx.backup(bck, name='temp')
#with sqlite.connect(':memory:') as bck:
# self.cx.backup(bck, name='temp')
with self.assertRaises(sqlite.OperationalError) as cm:
with sqlite.connect(':memory:') as bck:
self.cx.backup(bck, name='non-existing')

View file

@ -23,10 +23,10 @@
import threading
import unittest
from sqlcipher3 import dbapi2 as sqlite
from pysqlite3 import dbapi2 as sqlite
#from test.support import TESTFN, unlink
TESTFN = '/tmp/sqlcipher3_test'
TESTFN = '/tmp/pysqlite3_test'
from os import unlink
@ -85,6 +85,14 @@ class ModuleTests(unittest.TestCase):
sqlite.DatabaseError),
"NotSupportedError is not a subclass of DatabaseError")
def CheckErrorCodeOnException(self):
with self.assertRaises(sqlite.Error) as cm:
db = sqlite.connect('/no/such/file/exists')
e = cm.exception
self.assertEqual(e.sqlite_errorcode, sqlite.SQLITE_CANTOPEN)
self.assertEqual(e.sqlite_errorname, "SQLITE_CANTOPEN")
self.assertEqual(str(e), "unable to open database file")
class ConnectionTests(unittest.TestCase):
def setUp(self):
@ -512,7 +520,177 @@ class CursorTests(unittest.TestCase):
]
self.assertEqual(results, expected)
class BlobTests(unittest.TestCase):
def setUp(self):
self.cx = sqlite.connect(":memory:")
self.cx.execute("create table test(id integer primary key, blob_col blob)")
self.blob_data = b"a" * 100
self.cx.execute("insert into test(blob_col) values (?)", (self.blob_data, ))
self.blob = self.cx.open_blob("test", "blob_col", 1)
self.second_data = b"b" * 100
def tearDown(self):
self.blob.close()
self.cx.close()
def CheckLength(self):
self.assertEqual(len(self.blob), 100)
def CheckTell(self):
self.assertEqual(self.blob.tell(), 0)
def CheckSeekFromBlobStart(self):
self.blob.seek(10)
self.assertEqual(self.blob.tell(), 10)
self.blob.seek(10, 0)
self.assertEqual(self.blob.tell(), 10)
def CheckSeekFromCurrentPosition(self):
self.blob.seek(10, 1)
self.blob.seek(10, 1)
self.assertEqual(self.blob.tell(), 20)
def CheckSeekFromBlobEnd(self):
self.blob.seek(-10, 2)
self.assertEqual(self.blob.tell(), 90)
def CheckBlobSeekOverBlobSize(self):
with self.assertRaises(ValueError):
self.blob.seek(1000)
def CheckBlobSeekUnderBlobSize(self):
with self.assertRaises(ValueError):
self.blob.seek(-10)
def CheckBlobRead(self):
self.assertEqual(self.blob.read(), self.blob_data)
def CheckBlobReadSize(self):
self.assertEqual(len(self.blob.read(10)), 10)
def CheckBlobReadAdvanceOffset(self):
self.blob.read(10)
self.assertEqual(self.blob.tell(), 10)
def CheckBlobReadStartAtOffset(self):
self.blob.seek(10)
self.blob.write(self.second_data[:10])
self.blob.seek(10)
self.assertEqual(self.blob.read(10), self.second_data[:10])
def CheckBlobWrite(self):
self.blob.write(self.second_data)
self.assertEqual(self.cx.execute("select blob_col from test").fetchone()[0], self.second_data)
def CheckBlobWriteAtOffset(self):
self.blob.seek(50)
self.blob.write(self.second_data[:50])
self.assertEqual(self.cx.execute("select blob_col from test").fetchone()[0],
self.blob_data[:50] + self.second_data[:50])
def CheckBlobWriteAdvanceOffset(self):
self.blob.write(self.second_data[:50])
self.assertEqual(self.blob.tell(), 50)
def CheckBlobWriteMoreThenBlobSize(self):
with self.assertRaises(sqlite.OperationalError):
self.blob.write(b"a" * 1000)
def CheckBlobReadAfterRowChange(self):
self.cx.execute("UPDATE test SET blob_col='aaaa' where id=1")
with self.assertRaises(sqlite.OperationalError):
self.blob.read()
def CheckBlobWriteAfterRowChange(self):
self.cx.execute("UPDATE test SET blob_col='aaaa' where id=1")
with self.assertRaises(sqlite.OperationalError):
self.blob.write(b"aaa")
def CheckBlobWriteWhenReadOnly(self):
read_only_blob = \
self.cx.open_blob("test", "blob_col", 1, readonly=True)
with self.assertRaises(sqlite.OperationalError):
read_only_blob.write(b"aaa")
read_only_blob.close()
def CheckBlobOpenWithBadDb(self):
with self.assertRaises(sqlite.OperationalError):
self.cx.open_blob("test", "blob_col", 1, dbname="notexisintg")
def CheckBlobOpenWithBadTable(self):
with self.assertRaises(sqlite.OperationalError):
self.cx.open_blob("notexisintg", "blob_col", 1)
def CheckBlobOpenWithBadColumn(self):
with self.assertRaises(sqlite.OperationalError):
self.cx.open_blob("test", "notexisting", 1)
def CheckBlobOpenWithBadRow(self):
with self.assertRaises(sqlite.OperationalError):
self.cx.open_blob("test", "blob_col", 2)
def CheckBlobGetItem(self):
self.assertEqual(self.blob[5], b"a")
def CheckBlobGetItemIndexOutOfRange(self):
with self.assertRaises(IndexError):
self.blob[105]
with self.assertRaises(IndexError):
self.blob[-105]
def CheckBlobGetItemNegativeIndex(self):
self.assertEqual(self.blob[-5], b"a")
def CheckBlobGetItemInvalidIndex(self):
with self.assertRaises(TypeError):
self.blob[b"a"]
def CheckBlobGetSlice(self):
self.assertEqual(self.blob[5:10], b"aaaaa")
def CheckBlobGetSliceNegativeIndex(self):
self.assertEqual(self.blob[5:-5], self.blob_data[5:-5])
def CheckBlobGetSliceInvalidIndex(self):
with self.assertRaises(TypeError):
self.blob[5:b"a"]
def CheckBlobGetSliceWithSkip(self):
self.blob.write(b"abcdefghij")
self.assertEqual(self.blob[0:10:2], b"acegi")
def CheckBlobSetItem(self):
self.blob[0] = b"b"
self.assertEqual(self.cx.execute("select blob_col from test").fetchone()[0], b"b" + self.blob_data[1:])
def CheckBlobSetSlice(self):
self.blob[0:5] = b"bbbbb"
self.assertEqual(self.cx.execute("select blob_col from test").fetchone()[0], b"bbbbb" + self.blob_data[5:])
def CheckBlobSetSliceWithSkip(self):
self.blob[0:10:2] = b"bbbbb"
self.assertEqual(self.cx.execute("select blob_col from test").fetchone()[0], b"bababababa" + self.blob_data[10:])
def CheckBlobGetEmptySlice(self):
self.assertEqual(self.blob[5:5], b"")
def CheckBlobSetSliceWrongLength(self):
with self.assertRaises(IndexError):
self.blob[5:10] = b"a"
def CheckBlobConcatNotSupported(self):
with self.assertRaises(SystemError):
self.blob + self.blob
def CheckBlobRepeateNotSupported(self):
with self.assertRaises(SystemError):
self.blob * 5
def CheckBlobContainsNotSupported(self):
with self.assertRaises(SystemError):
b"aaaaa" in self.blob
@unittest.skipUnless(threading, 'This test requires threading.')
class ThreadTests(unittest.TestCase):
def setUp(self):
self.con = sqlite.connect(":memory:")
@ -770,6 +948,15 @@ class ClosedConTests(unittest.TestCase):
with self.assertRaises(sqlite.ProgrammingError):
cur.execute("select 4")
def CheckClosedBlobRead(self):
con = sqlite.connect(":memory:")
con.execute("create table test(id integer primary key, blob_col blob)")
con.execute("insert into test(blob_col) values (zeroblob(100))")
blob = con.open_blob("test", "blob_col", 1)
con.close()
with self.assertRaises(sqlite.ProgrammingError):
blob.read()
def CheckClosedCreateFunction(self):
con = sqlite.connect(":memory:")
con.close()
@ -923,6 +1110,68 @@ class SqliteOnConflictTests(unittest.TestCase):
self.assertEqual(self.cu.fetchall(), [('Very different data!', 'foo')])
class ClosedBlobTests(unittest.TestCase):
def setUp(self):
self.cx = sqlite.connect(":memory:")
self.cx.execute("create table test(id integer primary key, blob_col blob)")
self.cx.execute("insert into test(blob_col) values (zeroblob(100))")
def tearDown(self):
self.cx.close()
def CheckClosedRead(self):
self.blob = self.cx.open_blob("test", "blob_col", 1)
self.blob.close()
with self.assertRaises(sqlite.ProgrammingError):
self.blob.read()
def CheckClosedWrite(self):
self.blob = self.cx.open_blob("test", "blob_col", 1)
self.blob.close()
with self.assertRaises(sqlite.ProgrammingError):
self.blob.write(b"aaaaaaaaa")
def CheckClosedSeek(self):
self.blob = self.cx.open_blob("test", "blob_col", 1)
self.blob.close()
with self.assertRaises(sqlite.ProgrammingError):
self.blob.seek(10)
def CheckClosedTell(self):
self.blob = self.cx.open_blob("test", "blob_col", 1)
self.blob.close()
with self.assertRaises(sqlite.ProgrammingError):
self.blob.tell()
def CheckClosedClose(self):
self.blob = self.cx.open_blob("test", "blob_col", 1)
self.blob.close()
with self.assertRaises(sqlite.ProgrammingError):
self.blob.close()
class BlobContextManagerTests(unittest.TestCase):
def setUp(self):
self.cx = sqlite.connect(":memory:")
self.cx.execute("create table test(id integer primary key, blob_col blob)")
self.cx.execute("insert into test(blob_col) values (zeroblob(100))")
def tearDown(self):
self.cx.close()
def CheckContextExecute(self):
data = b"a" * 100
with self.cx.open_blob("test", "blob_col", 1) as blob:
blob.write(data)
self.assertEqual(self.cx.execute("select blob_col from test").fetchone()[0], data)
def CheckContextCloseBlob(self):
with self.cx.open_blob("test", "blob_col", 1) as blob:
blob.seek(10)
with self.assertRaises(sqlite.ProgrammingError):
blob.close()
def suite():
module_suite = unittest.makeSuite(ModuleTests, "Check")
connection_suite = unittest.makeSuite(ConnectionTests, "Check")
@ -933,10 +1182,14 @@ def suite():
closed_con_suite = unittest.makeSuite(ClosedConTests, "Check")
closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check")
on_conflict_suite = unittest.makeSuite(SqliteOnConflictTests, "Check")
blob_suite = unittest.makeSuite(BlobTests, "Check")
closed_blob_suite = unittest.makeSuite(ClosedBlobTests, "Check")
blob_context_manager_suite = unittest.makeSuite(BlobContextManagerTests, "Check")
return unittest.TestSuite((
module_suite, connection_suite, cursor_suite, thread_suite,
constructor_suite, ext_suite, closed_con_suite, closed_cur_suite,
on_conflict_suite,
on_conflict_suite, blob_suite, closed_blob_suite,
blob_context_manager_suite,
))
def test():