| """ |
| Basic TestCases for BTree and hash DBs, with and without a DBEnv, with |
| various DB flags, etc. |
| """ |
| |
| import os |
| import errno |
| import string |
| from pprint import pprint |
| import unittest |
| import time |
| import sys |
| |
| from test_all import db, test_support, verbose, get_new_environment_path, \ |
| get_new_database_path |
| |
| DASH = '-' |
| |
| |
| #---------------------------------------------------------------------- |
| |
| class VersionTestCase(unittest.TestCase): |
| def test00_version(self): |
| info = db.version() |
| if verbose: |
| print '\n', '-=' * 20 |
| print 'bsddb.db.version(): %s' % (info, ) |
| print db.DB_VERSION_STRING |
| print '-=' * 20 |
| self.assertEqual(info, (db.DB_VERSION_MAJOR, db.DB_VERSION_MINOR, |
| db.DB_VERSION_PATCH)) |
| |
| #---------------------------------------------------------------------- |
| |
| class BasicTestCase(unittest.TestCase): |
| dbtype = db.DB_UNKNOWN # must be set in derived class |
| cachesize = (0, 1024*1024, 1) |
| dbopenflags = 0 |
| dbsetflags = 0 |
| dbmode = 0660 |
| dbname = None |
| useEnv = 0 |
| envflags = 0 |
| envsetflags = 0 |
| |
| _numKeys = 1002 # PRIVATE. NOTE: must be an even value |
| |
| def setUp(self): |
| if self.useEnv: |
| self.homeDir=get_new_environment_path() |
| try: |
| self.env = db.DBEnv() |
| self.env.set_lg_max(1024*1024) |
| self.env.set_tx_max(30) |
| self._t = int(time.time()) |
| self.env.set_tx_timestamp(self._t) |
| self.env.set_flags(self.envsetflags, 1) |
| self.env.open(self.homeDir, self.envflags | db.DB_CREATE) |
| self.filename = "test" |
| # Yes, a bare except is intended, since we're re-raising the exc. |
| except: |
| test_support.rmtree(self.homeDir) |
| raise |
| else: |
| self.env = None |
| self.filename = get_new_database_path() |
| |
| # create and open the DB |
| self.d = db.DB(self.env) |
| if not self.useEnv : |
| self.d.set_cachesize(*self.cachesize) |
| cachesize = self.d.get_cachesize() |
| self.assertEqual(cachesize[0], self.cachesize[0]) |
| self.assertEqual(cachesize[2], self.cachesize[2]) |
| # Berkeley DB expands the cache 25% accounting overhead, |
| # if the cache is small. |
| self.assertEqual(125, int(100.0*cachesize[1]/self.cachesize[1])) |
| self.d.set_flags(self.dbsetflags) |
| if self.dbname: |
| self.d.open(self.filename, self.dbname, self.dbtype, |
| self.dbopenflags|db.DB_CREATE, self.dbmode) |
| else: |
| self.d.open(self.filename, # try out keyword args |
| mode = self.dbmode, |
| dbtype = self.dbtype, |
| flags = self.dbopenflags|db.DB_CREATE) |
| |
| if not self.useEnv: |
| self.assertRaises(db.DBInvalidArgError, |
| self.d.set_cachesize, *self.cachesize) |
| |
| self.populateDB() |
| |
| |
| def tearDown(self): |
| self.d.close() |
| if self.env is not None: |
| self.env.close() |
| test_support.rmtree(self.homeDir) |
| else: |
| os.remove(self.filename) |
| |
| |
| |
| def populateDB(self, _txn=None): |
| d = self.d |
| |
| for x in range(self._numKeys//2): |
| key = '%04d' % (self._numKeys - x) # insert keys in reverse order |
| data = self.makeData(key) |
| d.put(key, data, _txn) |
| |
| d.put('empty value', '', _txn) |
| |
| for x in range(self._numKeys//2-1): |
| key = '%04d' % x # and now some in forward order |
| data = self.makeData(key) |
| d.put(key, data, _txn) |
| |
| if _txn: |
| _txn.commit() |
| |
| num = len(d) |
| if verbose: |
| print "created %d records" % num |
| |
| |
| def makeData(self, key): |
| return DASH.join([key] * 5) |
| |
| |
| |
| #---------------------------------------- |
| |
| def test01_GetsAndPuts(self): |
| d = self.d |
| |
| if verbose: |
| print '\n', '-=' * 30 |
| print "Running %s.test01_GetsAndPuts..." % self.__class__.__name__ |
| |
| for key in ['0001', '0100', '0400', '0700', '0999']: |
| data = d.get(key) |
| if verbose: |
| print data |
| |
| self.assertEqual(d.get('0321'), '0321-0321-0321-0321-0321') |
| |
| # By default non-existent keys return None... |
| self.assertEqual(d.get('abcd'), None) |
| |
| # ...but they raise exceptions in other situations. Call |
| # set_get_returns_none() to change it. |
| try: |
| d.delete('abcd') |
| except db.DBNotFoundError, val: |
| if sys.version_info < (2, 6) : |
| self.assertEqual(val[0], db.DB_NOTFOUND) |
| else : |
| self.assertEqual(val.args[0], db.DB_NOTFOUND) |
| if verbose: print val |
| else: |
| self.fail("expected exception") |
| |
| |
| d.put('abcd', 'a new record') |
| self.assertEqual(d.get('abcd'), 'a new record') |
| |
| d.put('abcd', 'same key') |
| if self.dbsetflags & db.DB_DUP: |
| self.assertEqual(d.get('abcd'), 'a new record') |
| else: |
| self.assertEqual(d.get('abcd'), 'same key') |
| |
| |
| try: |
| d.put('abcd', 'this should fail', flags=db.DB_NOOVERWRITE) |
| except db.DBKeyExistError, val: |
| if sys.version_info < (2, 6) : |
| self.assertEqual(val[0], db.DB_KEYEXIST) |
| else : |
| self.assertEqual(val.args[0], db.DB_KEYEXIST) |
| if verbose: print val |
| else: |
| self.fail("expected exception") |
| |
| if self.dbsetflags & db.DB_DUP: |
| self.assertEqual(d.get('abcd'), 'a new record') |
| else: |
| self.assertEqual(d.get('abcd'), 'same key') |
| |
| |
| d.sync() |
| d.close() |
| del d |
| |
| self.d = db.DB(self.env) |
| if self.dbname: |
| self.d.open(self.filename, self.dbname) |
| else: |
| self.d.open(self.filename) |
| d = self.d |
| |
| self.assertEqual(d.get('0321'), '0321-0321-0321-0321-0321') |
| if self.dbsetflags & db.DB_DUP: |
| self.assertEqual(d.get('abcd'), 'a new record') |
| else: |
| self.assertEqual(d.get('abcd'), 'same key') |
| |
| rec = d.get_both('0555', '0555-0555-0555-0555-0555') |
| if verbose: |
| print rec |
| |
| self.assertEqual(d.get_both('0555', 'bad data'), None) |
| |
| # test default value |
| data = d.get('bad key', 'bad data') |
| self.assertEqual(data, 'bad data') |
| |
| # any object can pass through |
| data = d.get('bad key', self) |
| self.assertEqual(data, self) |
| |
| s = d.stat() |
| self.assertEqual(type(s), type({})) |
| if verbose: |
| print 'd.stat() returned this dictionary:' |
| pprint(s) |
| |
| |
| #---------------------------------------- |
| |
| def test02_DictionaryMethods(self): |
| d = self.d |
| |
| if verbose: |
| print '\n', '-=' * 30 |
| print "Running %s.test02_DictionaryMethods..." % \ |
| self.__class__.__name__ |
| |
| for key in ['0002', '0101', '0401', '0701', '0998']: |
| data = d[key] |
| self.assertEqual(data, self.makeData(key)) |
| if verbose: |
| print data |
| |
| self.assertEqual(len(d), self._numKeys) |
| keys = d.keys() |
| self.assertEqual(len(keys), self._numKeys) |
| self.assertEqual(type(keys), type([])) |
| |
| d['new record'] = 'a new record' |
| self.assertEqual(len(d), self._numKeys+1) |
| keys = d.keys() |
| self.assertEqual(len(keys), self._numKeys+1) |
| |
| d['new record'] = 'a replacement record' |
| self.assertEqual(len(d), self._numKeys+1) |
| keys = d.keys() |
| self.assertEqual(len(keys), self._numKeys+1) |
| |
| if verbose: |
| print "the first 10 keys are:" |
| pprint(keys[:10]) |
| |
| self.assertEqual(d['new record'], 'a replacement record') |
| |
| # We check also the positional parameter |
| self.assertEqual(d.has_key('0001', None), 1) |
| # We check also the keyword parameter |
| self.assertEqual(d.has_key('spam', txn=None), 0) |
| |
| items = d.items() |
| self.assertEqual(len(items), self._numKeys+1) |
| self.assertEqual(type(items), type([])) |
| self.assertEqual(type(items[0]), type(())) |
| self.assertEqual(len(items[0]), 2) |
| |
| if verbose: |
| print "the first 10 items are:" |
| pprint(items[:10]) |
| |
| values = d.values() |
| self.assertEqual(len(values), self._numKeys+1) |
| self.assertEqual(type(values), type([])) |
| |
| if verbose: |
| print "the first 10 values are:" |
| pprint(values[:10]) |
| |
| |
| #---------------------------------------- |
| |
| def test02b_SequenceMethods(self): |
| d = self.d |
| |
| for key in ['0002', '0101', '0401', '0701', '0998']: |
| data = d[key] |
| self.assertEqual(data, self.makeData(key)) |
| if verbose: |
| print data |
| |
| self.assertTrue(hasattr(d, "__contains__")) |
| self.assertTrue("0401" in d) |
| self.assertFalse("1234" in d) |
| |
| |
| #---------------------------------------- |
| |
| def test03_SimpleCursorStuff(self, get_raises_error=0, set_raises_error=0): |
| if verbose: |
| print '\n', '-=' * 30 |
| print "Running %s.test03_SimpleCursorStuff (get_error %s, set_error %s)..." % \ |
| (self.__class__.__name__, get_raises_error, set_raises_error) |
| |
| if self.env and self.dbopenflags & db.DB_AUTO_COMMIT: |
| txn = self.env.txn_begin() |
| else: |
| txn = None |
| c = self.d.cursor(txn=txn) |
| |
| rec = c.first() |
| count = 0 |
| while rec is not None: |
| count = count + 1 |
| if verbose and count % 100 == 0: |
| print rec |
| try: |
| rec = c.next() |
| except db.DBNotFoundError, val: |
| if get_raises_error: |
| if sys.version_info < (2, 6) : |
| self.assertEqual(val[0], db.DB_NOTFOUND) |
| else : |
| self.assertEqual(val.args[0], db.DB_NOTFOUND) |
| if verbose: print val |
| rec = None |
| else: |
| self.fail("unexpected DBNotFoundError") |
| self.assertEqual(c.get_current_size(), len(c.current()[1]), |
| "%s != len(%r)" % (c.get_current_size(), c.current()[1])) |
| |
| self.assertEqual(count, self._numKeys) |
| |
| |
| rec = c.last() |
| count = 0 |
| while rec is not None: |
| count = count + 1 |
| if verbose and count % 100 == 0: |
| print rec |
| try: |
| rec = c.prev() |
| except db.DBNotFoundError, val: |
| if get_raises_error: |
| if sys.version_info < (2, 6) : |
| self.assertEqual(val[0], db.DB_NOTFOUND) |
| else : |
| self.assertEqual(val.args[0], db.DB_NOTFOUND) |
| if verbose: print val |
| rec = None |
| else: |
| self.fail("unexpected DBNotFoundError") |
| |
| self.assertEqual(count, self._numKeys) |
| |
| rec = c.set('0505') |
| rec2 = c.current() |
| self.assertEqual(rec, rec2) |
| self.assertEqual(rec[0], '0505') |
| self.assertEqual(rec[1], self.makeData('0505')) |
| self.assertEqual(c.get_current_size(), len(rec[1])) |
| |
| # make sure we get empty values properly |
| rec = c.set('empty value') |
| self.assertEqual(rec[1], '') |
| self.assertEqual(c.get_current_size(), 0) |
| |
| try: |
| n = c.set('bad key') |
| except db.DBNotFoundError, val: |
| if sys.version_info < (2, 6) : |
| self.assertEqual(val[0], db.DB_NOTFOUND) |
| else : |
| self.assertEqual(val.args[0], db.DB_NOTFOUND) |
| if verbose: print val |
| else: |
| if set_raises_error: |
| self.fail("expected exception") |
| if n is not None: |
| self.fail("expected None: %r" % (n,)) |
| |
| rec = c.get_both('0404', self.makeData('0404')) |
| self.assertEqual(rec, ('0404', self.makeData('0404'))) |
| |
| try: |
| n = c.get_both('0404', 'bad data') |
| except db.DBNotFoundError, val: |
| if sys.version_info < (2, 6) : |
| self.assertEqual(val[0], db.DB_NOTFOUND) |
| else : |
| self.assertEqual(val.args[0], db.DB_NOTFOUND) |
| if verbose: print val |
| else: |
| if get_raises_error: |
| self.fail("expected exception") |
| if n is not None: |
| self.fail("expected None: %r" % (n,)) |
| |
| if self.d.get_type() == db.DB_BTREE: |
| rec = c.set_range('011') |
| if verbose: |
| print "searched for '011', found: ", rec |
| |
| rec = c.set_range('011',dlen=0,doff=0) |
| if verbose: |
| print "searched (partial) for '011', found: ", rec |
| if rec[1] != '': self.fail('expected empty data portion') |
| |
| ev = c.set_range('empty value') |
| if verbose: |
| print "search for 'empty value' returned", ev |
| if ev[1] != '': self.fail('empty value lookup failed') |
| |
| c.set('0499') |
| c.delete() |
| try: |
| rec = c.current() |
| except db.DBKeyEmptyError, val: |
| if get_raises_error: |
| if sys.version_info < (2, 6) : |
| self.assertEqual(val[0], db.DB_KEYEMPTY) |
| else : |
| self.assertEqual(val.args[0], db.DB_KEYEMPTY) |
| if verbose: print val |
| else: |
| self.fail("unexpected DBKeyEmptyError") |
| else: |
| if get_raises_error: |
| self.fail('DBKeyEmptyError exception expected') |
| |
| c.next() |
| c2 = c.dup(db.DB_POSITION) |
| self.assertEqual(c.current(), c2.current()) |
| |
| c2.put('', 'a new value', db.DB_CURRENT) |
| self.assertEqual(c.current(), c2.current()) |
| self.assertEqual(c.current()[1], 'a new value') |
| |
| c2.put('', 'er', db.DB_CURRENT, dlen=0, doff=5) |
| self.assertEqual(c2.current()[1], 'a newer value') |
| |
| c.close() |
| c2.close() |
| if txn: |
| txn.commit() |
| |
| # time to abuse the closed cursors and hope we don't crash |
| methods_to_test = { |
| 'current': (), |
| 'delete': (), |
| 'dup': (db.DB_POSITION,), |
| 'first': (), |
| 'get': (0,), |
| 'next': (), |
| 'prev': (), |
| 'last': (), |
| 'put':('', 'spam', db.DB_CURRENT), |
| 'set': ("0505",), |
| } |
| for method, args in methods_to_test.items(): |
| try: |
| if verbose: |
| print "attempting to use a closed cursor's %s method" % \ |
| method |
| # a bug may cause a NULL pointer dereference... |
| getattr(c, method)(*args) |
| except db.DBError, val: |
| if sys.version_info < (2, 6) : |
| self.assertEqual(val[0], 0) |
| else : |
| self.assertEqual(val.args[0], 0) |
| if verbose: print val |
| else: |
| self.fail("no exception raised when using a buggy cursor's" |
| "%s method" % method) |
| |
| # |
| # free cursor referencing a closed database, it should not barf: |
| # |
| oldcursor = self.d.cursor(txn=txn) |
| self.d.close() |
| |
| # this would originally cause a segfault when the cursor for a |
| # closed database was cleaned up. it should not anymore. |
| # SF pybsddb bug id 667343 |
| del oldcursor |
| |
| def test03b_SimpleCursorWithoutGetReturnsNone0(self): |
| # same test but raise exceptions instead of returning None |
| if verbose: |
| print '\n', '-=' * 30 |
| print "Running %s.test03b_SimpleCursorStuffWithoutGetReturnsNone..." % \ |
| self.__class__.__name__ |
| |
| old = self.d.set_get_returns_none(0) |
| self.assertEqual(old, 2) |
| self.test03_SimpleCursorStuff(get_raises_error=1, set_raises_error=1) |
| |
| def test03b_SimpleCursorWithGetReturnsNone1(self): |
| # same test but raise exceptions instead of returning None |
| if verbose: |
| print '\n', '-=' * 30 |
| print "Running %s.test03b_SimpleCursorStuffWithoutGetReturnsNone..." % \ |
| self.__class__.__name__ |
| |
| old = self.d.set_get_returns_none(1) |
| self.test03_SimpleCursorStuff(get_raises_error=0, set_raises_error=1) |
| |
| |
| def test03c_SimpleCursorGetReturnsNone2(self): |
| # same test but raise exceptions instead of returning None |
| if verbose: |
| print '\n', '-=' * 30 |
| print "Running %s.test03c_SimpleCursorStuffWithoutSetReturnsNone..." % \ |
| self.__class__.__name__ |
| |
| old = self.d.set_get_returns_none(1) |
| self.assertEqual(old, 2) |
| old = self.d.set_get_returns_none(2) |
| self.assertEqual(old, 1) |
| self.test03_SimpleCursorStuff(get_raises_error=0, set_raises_error=0) |
| |
| if db.version() >= (4, 6): |
| def test03d_SimpleCursorPriority(self) : |
| c = self.d.cursor() |
| c.set_priority(db.DB_PRIORITY_VERY_LOW) # Positional |
| self.assertEqual(db.DB_PRIORITY_VERY_LOW, c.get_priority()) |
| c.set_priority(priority=db.DB_PRIORITY_HIGH) # Keyword |
| self.assertEqual(db.DB_PRIORITY_HIGH, c.get_priority()) |
| c.close() |
| |
| #---------------------------------------- |
| |
| def test04_PartialGetAndPut(self): |
| d = self.d |
| if verbose: |
| print '\n', '-=' * 30 |
| print "Running %s.test04_PartialGetAndPut..." % \ |
| self.__class__.__name__ |
| |
| key = "partialTest" |
| data = "1" * 1000 + "2" * 1000 |
| d.put(key, data) |
| self.assertEqual(d.get(key), data) |
| self.assertEqual(d.get(key, dlen=20, doff=990), |
| ("1" * 10) + ("2" * 10)) |
| |
| d.put("partialtest2", ("1" * 30000) + "robin" ) |
| self.assertEqual(d.get("partialtest2", dlen=5, doff=30000), "robin") |
| |
| # There seems to be a bug in DB here... Commented out the test for |
| # now. |
| ##self.assertEqual(d.get("partialtest2", dlen=5, doff=30010), "") |
| |
| if self.dbsetflags != db.DB_DUP: |
| # Partial put with duplicate records requires a cursor |
| d.put(key, "0000", dlen=2000, doff=0) |
| self.assertEqual(d.get(key), "0000") |
| |
| d.put(key, "1111", dlen=1, doff=2) |
| self.assertEqual(d.get(key), "0011110") |
| |
| #---------------------------------------- |
| |
| def test05_GetSize(self): |
| d = self.d |
| if verbose: |
| print '\n', '-=' * 30 |
| print "Running %s.test05_GetSize..." % self.__class__.__name__ |
| |
| for i in range(1, 50000, 500): |
| key = "size%s" % i |
| #print "before ", i, |
| d.put(key, "1" * i) |
| #print "after", |
| self.assertEqual(d.get_size(key), i) |
| #print "done" |
| |
| #---------------------------------------- |
| |
| def test06_Truncate(self): |
| d = self.d |
| if verbose: |
| print '\n', '-=' * 30 |
| print "Running %s.test06_Truncate..." % self.__class__.__name__ |
| |
| d.put("abcde", "ABCDE"); |
| num = d.truncate() |
| self.assertTrue(num >= 1, "truncate returned <= 0 on non-empty database") |
| num = d.truncate() |
| self.assertEqual(num, 0, |
| "truncate on empty DB returned nonzero (%r)" % (num,)) |
| |
| #---------------------------------------- |
| |
| def test07_verify(self): |
| # Verify bug solved in 4.7.3pre8 |
| self.d.close() |
| d = db.DB(self.env) |
| d.verify(self.filename) |
| |
| |
| #---------------------------------------- |
| |
| if db.version() >= (4, 6): |
| def test08_exists(self) : |
| self.d.put("abcde", "ABCDE") |
| self.assertTrue(self.d.exists("abcde") == True, |
| "DB->exists() returns wrong value") |
| self.assertTrue(self.d.exists("x") == False, |
| "DB->exists() returns wrong value") |
| |
| #---------------------------------------- |
| |
| if db.version() >= (4, 7): |
| def test_compact(self) : |
| d = self.d |
| self.assertEqual(0, d.compact(flags=db.DB_FREELIST_ONLY)) |
| self.assertEqual(0, d.compact(flags=db.DB_FREELIST_ONLY)) |
| d.put("abcde", "ABCDE"); |
| d.put("bcde", "BCDE"); |
| d.put("abc", "ABC"); |
| d.put("monty", "python"); |
| d.delete("abc") |
| d.delete("bcde") |
| d.compact(start='abcde', stop='monty', txn=None, |
| compact_fillpercent=42, compact_pages=1, |
| compact_timeout=50000000, |
| flags=db.DB_FREELIST_ONLY|db.DB_FREE_SPACE) |
| |
| #---------------------------------------- |
| |
| #---------------------------------------------------------------------- |
| |
| |
| class BasicBTreeTestCase(BasicTestCase): |
| dbtype = db.DB_BTREE |
| |
| |
| class BasicHashTestCase(BasicTestCase): |
| dbtype = db.DB_HASH |
| |
| |
| class BasicBTreeWithThreadFlagTestCase(BasicTestCase): |
| dbtype = db.DB_BTREE |
| dbopenflags = db.DB_THREAD |
| |
| |
| class BasicHashWithThreadFlagTestCase(BasicTestCase): |
| dbtype = db.DB_HASH |
| dbopenflags = db.DB_THREAD |
| |
| |
| class BasicWithEnvTestCase(BasicTestCase): |
| dbopenflags = db.DB_THREAD |
| useEnv = 1 |
| envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
| |
| #---------------------------------------- |
| |
| def test09_EnvRemoveAndRename(self): |
| if not self.env: |
| return |
| |
| if verbose: |
| print '\n', '-=' * 30 |
| print "Running %s.test09_EnvRemoveAndRename..." % self.__class__.__name__ |
| |
| # can't rename or remove an open DB |
| self.d.close() |
| |
| newname = self.filename + '.renamed' |
| self.env.dbrename(self.filename, None, newname) |
| self.env.dbremove(newname) |
| |
| #---------------------------------------- |
| |
| class BasicBTreeWithEnvTestCase(BasicWithEnvTestCase): |
| dbtype = db.DB_BTREE |
| |
| |
| class BasicHashWithEnvTestCase(BasicWithEnvTestCase): |
| dbtype = db.DB_HASH |
| |
| |
| #---------------------------------------------------------------------- |
| |
| class BasicTransactionTestCase(BasicTestCase): |
| if (sys.version_info < (2, 7)) or ((sys.version_info >= (3, 0)) and |
| (sys.version_info < (3, 2))) : |
| def assertIn(self, a, b, msg=None) : |
| return self.assertTrue(a in b, msg=msg) |
| |
| dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT |
| useEnv = 1 |
| envflags = (db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | |
| db.DB_INIT_TXN) |
| envsetflags = db.DB_AUTO_COMMIT |
| |
| |
| def tearDown(self): |
| self.txn.commit() |
| BasicTestCase.tearDown(self) |
| |
| |
| def populateDB(self): |
| txn = self.env.txn_begin() |
| BasicTestCase.populateDB(self, _txn=txn) |
| |
| self.txn = self.env.txn_begin() |
| |
| |
| def test06_Transactions(self): |
| d = self.d |
| if verbose: |
| print '\n', '-=' * 30 |
| print "Running %s.test06_Transactions..." % self.__class__.__name__ |
| |
| self.assertEqual(d.get('new rec', txn=self.txn), None) |
| d.put('new rec', 'this is a new record', self.txn) |
| self.assertEqual(d.get('new rec', txn=self.txn), |
| 'this is a new record') |
| self.txn.abort() |
| self.assertEqual(d.get('new rec'), None) |
| |
| self.txn = self.env.txn_begin() |
| |
| self.assertEqual(d.get('new rec', txn=self.txn), None) |
| d.put('new rec', 'this is a new record', self.txn) |
| self.assertEqual(d.get('new rec', txn=self.txn), |
| 'this is a new record') |
| self.txn.commit() |
| self.assertEqual(d.get('new rec'), 'this is a new record') |
| |
| self.txn = self.env.txn_begin() |
| c = d.cursor(self.txn) |
| rec = c.first() |
| count = 0 |
| while rec is not None: |
| count = count + 1 |
| if verbose and count % 100 == 0: |
| print rec |
| rec = c.next() |
| self.assertEqual(count, self._numKeys+1) |
| |
| c.close() # Cursors *MUST* be closed before commit! |
| self.txn.commit() |
| |
| # flush pending updates |
| self.env.txn_checkpoint (0, 0, 0) |
| |
| statDict = self.env.log_stat(0); |
| self.assertIn('magic', statDict) |
| self.assertIn('version', statDict) |
| self.assertIn('cur_file', statDict) |
| self.assertIn('region_nowait', statDict) |
| |
| # must have at least one log file present: |
| logs = self.env.log_archive(db.DB_ARCH_ABS | db.DB_ARCH_LOG) |
| self.assertNotEqual(logs, None) |
| for log in logs: |
| if verbose: |
| print 'log file: ' + log |
| logs = self.env.log_archive(db.DB_ARCH_REMOVE) |
| self.assertTrue(not logs) |
| |
| self.txn = self.env.txn_begin() |
| |
| #---------------------------------------- |
| |
| if db.version() >= (4, 6): |
| def test08_exists(self) : |
| txn = self.env.txn_begin() |
| self.d.put("abcde", "ABCDE", txn=txn) |
| txn.commit() |
| txn = self.env.txn_begin() |
| self.assertTrue(self.d.exists("abcde", txn=txn) == True, |
| "DB->exists() returns wrong value") |
| self.assertTrue(self.d.exists("x", txn=txn) == False, |
| "DB->exists() returns wrong value") |
| txn.abort() |
| |
| #---------------------------------------- |
| |
| def test09_TxnTruncate(self): |
| d = self.d |
| if verbose: |
| print '\n', '-=' * 30 |
| print "Running %s.test09_TxnTruncate..." % self.__class__.__name__ |
| |
| d.put("abcde", "ABCDE"); |
| txn = self.env.txn_begin() |
| num = d.truncate(txn) |
| self.assertTrue(num >= 1, "truncate returned <= 0 on non-empty database") |
| num = d.truncate(txn) |
| self.assertEqual(num, 0, |
| "truncate on empty DB returned nonzero (%r)" % (num,)) |
| txn.commit() |
| |
| #---------------------------------------- |
| |
| def test10_TxnLateUse(self): |
| txn = self.env.txn_begin() |
| txn.abort() |
| try: |
| txn.abort() |
| except db.DBError, e: |
| pass |
| else: |
| raise RuntimeError, "DBTxn.abort() called after DB_TXN no longer valid w/o an exception" |
| |
| txn = self.env.txn_begin() |
| txn.commit() |
| try: |
| txn.commit() |
| except db.DBError, e: |
| pass |
| else: |
| raise RuntimeError, "DBTxn.commit() called after DB_TXN no longer valid w/o an exception" |
| |
| |
| #---------------------------------------- |
| |
| |
| if db.version() >= (4, 4): |
| def test_txn_name(self) : |
| txn=self.env.txn_begin() |
| self.assertEqual(txn.get_name(), "") |
| txn.set_name("XXYY") |
| self.assertEqual(txn.get_name(), "XXYY") |
| txn.set_name("") |
| self.assertEqual(txn.get_name(), "") |
| txn.abort() |
| |
| #---------------------------------------- |
| |
| |
| def test_txn_set_timeout(self) : |
| txn=self.env.txn_begin() |
| txn.set_timeout(1234567, db.DB_SET_LOCK_TIMEOUT) |
| txn.set_timeout(2345678, flags=db.DB_SET_TXN_TIMEOUT) |
| txn.abort() |
| |
| #---------------------------------------- |
| |
| def test_get_tx_max(self) : |
| self.assertEqual(self.env.get_tx_max(), 30) |
| |
| def test_get_tx_timestamp(self) : |
| self.assertEqual(self.env.get_tx_timestamp(), self._t) |
| |
| |
| |
| class BTreeTransactionTestCase(BasicTransactionTestCase): |
| dbtype = db.DB_BTREE |
| |
| class HashTransactionTestCase(BasicTransactionTestCase): |
| dbtype = db.DB_HASH |
| |
| |
| |
| #---------------------------------------------------------------------- |
| |
| class BTreeRecnoTestCase(BasicTestCase): |
| dbtype = db.DB_BTREE |
| dbsetflags = db.DB_RECNUM |
| |
| def test09_RecnoInBTree(self): |
| d = self.d |
| if verbose: |
| print '\n', '-=' * 30 |
| print "Running %s.test09_RecnoInBTree..." % self.__class__.__name__ |
| |
| rec = d.get(200) |
| self.assertEqual(type(rec), type(())) |
| self.assertEqual(len(rec), 2) |
| if verbose: |
| print "Record #200 is ", rec |
| |
| c = d.cursor() |
| c.set('0200') |
| num = c.get_recno() |
| self.assertEqual(type(num), type(1)) |
| if verbose: |
| print "recno of d['0200'] is ", num |
| |
| rec = c.current() |
| self.assertEqual(c.set_recno(num), rec) |
| |
| c.close() |
| |
| |
| |
| class BTreeRecnoWithThreadFlagTestCase(BTreeRecnoTestCase): |
| dbopenflags = db.DB_THREAD |
| |
| #---------------------------------------------------------------------- |
| |
| class BasicDUPTestCase(BasicTestCase): |
| dbsetflags = db.DB_DUP |
| |
| def test10_DuplicateKeys(self): |
| d = self.d |
| if verbose: |
| print '\n', '-=' * 30 |
| print "Running %s.test10_DuplicateKeys..." % \ |
| self.__class__.__name__ |
| |
| d.put("dup0", "before") |
| for x in "The quick brown fox jumped over the lazy dog.".split(): |
| d.put("dup1", x) |
| d.put("dup2", "after") |
| |
| data = d.get("dup1") |
| self.assertEqual(data, "The") |
| if verbose: |
| print data |
| |
| c = d.cursor() |
| rec = c.set("dup1") |
| self.assertEqual(rec, ('dup1', 'The')) |
| |
| next_reg = c.next() |
| self.assertEqual(next_reg, ('dup1', 'quick')) |
| |
| rec = c.set("dup1") |
| count = c.count() |
| self.assertEqual(count, 9) |
| |
| next_dup = c.next_dup() |
| self.assertEqual(next_dup, ('dup1', 'quick')) |
| |
| rec = c.set('dup1') |
| while rec is not None: |
| if verbose: |
| print rec |
| rec = c.next_dup() |
| |
| c.set('dup1') |
| rec = c.next_nodup() |
| self.assertNotEqual(rec[0], 'dup1') |
| if verbose: |
| print rec |
| |
| c.close() |
| |
| |
| |
| class BTreeDUPTestCase(BasicDUPTestCase): |
| dbtype = db.DB_BTREE |
| |
| class HashDUPTestCase(BasicDUPTestCase): |
| dbtype = db.DB_HASH |
| |
| class BTreeDUPWithThreadTestCase(BasicDUPTestCase): |
| dbtype = db.DB_BTREE |
| dbopenflags = db.DB_THREAD |
| |
| class HashDUPWithThreadTestCase(BasicDUPTestCase): |
| dbtype = db.DB_HASH |
| dbopenflags = db.DB_THREAD |
| |
| |
| #---------------------------------------------------------------------- |
| |
| class BasicMultiDBTestCase(BasicTestCase): |
| dbname = 'first' |
| |
| def otherType(self): |
| if self.dbtype == db.DB_BTREE: |
| return db.DB_HASH |
| else: |
| return db.DB_BTREE |
| |
| def test11_MultiDB(self): |
| d1 = self.d |
| if verbose: |
| print '\n', '-=' * 30 |
| print "Running %s.test11_MultiDB..." % self.__class__.__name__ |
| |
| d2 = db.DB(self.env) |
| d2.open(self.filename, "second", self.dbtype, |
| self.dbopenflags|db.DB_CREATE) |
| d3 = db.DB(self.env) |
| d3.open(self.filename, "third", self.otherType(), |
| self.dbopenflags|db.DB_CREATE) |
| |
| for x in "The quick brown fox jumped over the lazy dog".split(): |
| d2.put(x, self.makeData(x)) |
| |
| for x in string.letters: |
| d3.put(x, x*70) |
| |
| d1.sync() |
| d2.sync() |
| d3.sync() |
| d1.close() |
| d2.close() |
| d3.close() |
| |
| self.d = d1 = d2 = d3 = None |
| |
| self.d = d1 = db.DB(self.env) |
| d1.open(self.filename, self.dbname, flags = self.dbopenflags) |
| d2 = db.DB(self.env) |
| d2.open(self.filename, "second", flags = self.dbopenflags) |
| d3 = db.DB(self.env) |
| d3.open(self.filename, "third", flags = self.dbopenflags) |
| |
| c1 = d1.cursor() |
| c2 = d2.cursor() |
| c3 = d3.cursor() |
| |
| count = 0 |
| rec = c1.first() |
| while rec is not None: |
| count = count + 1 |
| if verbose and (count % 50) == 0: |
| print rec |
| rec = c1.next() |
| self.assertEqual(count, self._numKeys) |
| |
| count = 0 |
| rec = c2.first() |
| while rec is not None: |
| count = count + 1 |
| if verbose: |
| print rec |
| rec = c2.next() |
| self.assertEqual(count, 9) |
| |
| count = 0 |
| rec = c3.first() |
| while rec is not None: |
| count = count + 1 |
| if verbose: |
| print rec |
| rec = c3.next() |
| self.assertEqual(count, len(string.letters)) |
| |
| |
| c1.close() |
| c2.close() |
| c3.close() |
| |
| d2.close() |
| d3.close() |
| |
| |
| |
| # Strange things happen if you try to use Multiple DBs per file without a |
| # DBEnv with MPOOL and LOCKing... |
| |
| class BTreeMultiDBTestCase(BasicMultiDBTestCase): |
| dbtype = db.DB_BTREE |
| dbopenflags = db.DB_THREAD |
| useEnv = 1 |
| envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
| |
| class HashMultiDBTestCase(BasicMultiDBTestCase): |
| dbtype = db.DB_HASH |
| dbopenflags = db.DB_THREAD |
| useEnv = 1 |
| envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
| |
| |
| class PrivateObject(unittest.TestCase) : |
| def tearDown(self) : |
| del self.obj |
| |
| def test01_DefaultIsNone(self) : |
| self.assertEqual(self.obj.get_private(), None) |
| |
| def test02_assignment(self) : |
| a = "example of private object" |
| self.obj.set_private(a) |
| b = self.obj.get_private() |
| self.assertTrue(a is b) # Object identity |
| |
| def test03_leak_assignment(self) : |
| a = "example of private object" |
| refcount = sys.getrefcount(a) |
| self.obj.set_private(a) |
| self.assertEqual(refcount+1, sys.getrefcount(a)) |
| self.obj.set_private(None) |
| self.assertEqual(refcount, sys.getrefcount(a)) |
| |
| def test04_leak_GC(self) : |
| a = "example of private object" |
| refcount = sys.getrefcount(a) |
| self.obj.set_private(a) |
| self.obj = None |
| self.assertEqual(refcount, sys.getrefcount(a)) |
| |
| class DBEnvPrivateObject(PrivateObject) : |
| def setUp(self) : |
| self.obj = db.DBEnv() |
| |
| class DBPrivateObject(PrivateObject) : |
| def setUp(self) : |
| self.obj = db.DB() |
| |
| class CrashAndBurn(unittest.TestCase) : |
| #def test01_OpenCrash(self) : |
| # # See http://bugs.python.org/issue3307 |
| # self.assertRaises(db.DBInvalidArgError, db.DB, None, 65535) |
| |
| if db.version() < (4, 8) : |
| def test02_DBEnv_dealloc(self): |
| # http://bugs.python.org/issue3885 |
| import gc |
| self.assertRaises(db.DBInvalidArgError, db.DBEnv, ~db.DB_RPCCLIENT) |
| gc.collect() |
| |
| |
| #---------------------------------------------------------------------- |
| #---------------------------------------------------------------------- |
| |
| def test_suite(): |
| suite = unittest.TestSuite() |
| |
| suite.addTest(unittest.makeSuite(VersionTestCase)) |
| suite.addTest(unittest.makeSuite(BasicBTreeTestCase)) |
| suite.addTest(unittest.makeSuite(BasicHashTestCase)) |
| suite.addTest(unittest.makeSuite(BasicBTreeWithThreadFlagTestCase)) |
| suite.addTest(unittest.makeSuite(BasicHashWithThreadFlagTestCase)) |
| suite.addTest(unittest.makeSuite(BasicBTreeWithEnvTestCase)) |
| suite.addTest(unittest.makeSuite(BasicHashWithEnvTestCase)) |
| suite.addTest(unittest.makeSuite(BTreeTransactionTestCase)) |
| suite.addTest(unittest.makeSuite(HashTransactionTestCase)) |
| suite.addTest(unittest.makeSuite(BTreeRecnoTestCase)) |
| suite.addTest(unittest.makeSuite(BTreeRecnoWithThreadFlagTestCase)) |
| suite.addTest(unittest.makeSuite(BTreeDUPTestCase)) |
| suite.addTest(unittest.makeSuite(HashDUPTestCase)) |
| suite.addTest(unittest.makeSuite(BTreeDUPWithThreadTestCase)) |
| suite.addTest(unittest.makeSuite(HashDUPWithThreadTestCase)) |
| suite.addTest(unittest.makeSuite(BTreeMultiDBTestCase)) |
| suite.addTest(unittest.makeSuite(HashMultiDBTestCase)) |
| suite.addTest(unittest.makeSuite(DBEnvPrivateObject)) |
| suite.addTest(unittest.makeSuite(DBPrivateObject)) |
| suite.addTest(unittest.makeSuite(CrashAndBurn)) |
| |
| return suite |
| |
| |
| if __name__ == '__main__': |
| unittest.main(defaultTest='test_suite') |