| """ |
| TestCases for python DB duplicate and Btree key comparison function. |
| """ |
| |
| import sys, os, re |
| import test_all |
| from cStringIO import StringIO |
| |
| import unittest |
| |
| from test_all import db, dbshelve, test_support, \ |
| get_new_environment_path, get_new_database_path |
| |
| |
| # Needed for python 3. "cmp" vanished in 3.0.1 |
| def cmp(a, b) : |
| if a==b : return 0 |
| if a<b : return -1 |
| return 1 |
| |
| lexical_cmp = cmp |
| |
| def lowercase_cmp(left, right) : |
| return cmp(left.lower(), right.lower()) |
| |
| def make_reverse_comparator(cmp) : |
| def reverse(left, right, delegate=cmp) : |
| return - delegate(left, right) |
| return reverse |
| |
| _expected_lexical_test_data = ['', 'CCCP', 'a', 'aaa', 'b', 'c', 'cccce', 'ccccf'] |
| _expected_lowercase_test_data = ['', 'a', 'aaa', 'b', 'c', 'CC', 'cccce', 'ccccf', 'CCCP'] |
| |
| class ComparatorTests(unittest.TestCase) : |
| def comparator_test_helper(self, comparator, expected_data) : |
| data = expected_data[:] |
| |
| import sys |
| if sys.version_info < (2, 6) : |
| data.sort(cmp=comparator) |
| else : # Insertion Sort. Please, improve |
| data2 = [] |
| for i in data : |
| for j, k in enumerate(data2) : |
| r = comparator(k, i) |
| if r == 1 : |
| data2.insert(j, i) |
| break |
| else : |
| data2.append(i) |
| data = data2 |
| |
| self.assertEqual(data, expected_data, |
| "comparator `%s' is not right: %s vs. %s" |
| % (comparator, expected_data, data)) |
| def test_lexical_comparator(self) : |
| self.comparator_test_helper(lexical_cmp, _expected_lexical_test_data) |
| def test_reverse_lexical_comparator(self) : |
| rev = _expected_lexical_test_data[:] |
| rev.reverse() |
| self.comparator_test_helper(make_reverse_comparator(lexical_cmp), |
| rev) |
| def test_lowercase_comparator(self) : |
| self.comparator_test_helper(lowercase_cmp, |
| _expected_lowercase_test_data) |
| |
| class AbstractBtreeKeyCompareTestCase(unittest.TestCase) : |
| env = None |
| db = None |
| |
| if (sys.version_info < (2, 7)) or ((sys.version_info >= (3,0)) and |
| (sys.version_info < (3, 2))) : |
| def assertLess(self, a, b, msg=None) : |
| return self.assertTrue(a<b, msg=msg) |
| |
| def setUp(self) : |
| self.filename = self.__class__.__name__ + '.db' |
| self.homeDir = get_new_environment_path() |
| env = db.DBEnv() |
| env.open(self.homeDir, |
| db.DB_CREATE | db.DB_INIT_MPOOL |
| | db.DB_INIT_LOCK | db.DB_THREAD) |
| self.env = env |
| |
| def tearDown(self) : |
| self.closeDB() |
| if self.env is not None: |
| self.env.close() |
| self.env = None |
| test_support.rmtree(self.homeDir) |
| |
| def addDataToDB(self, data) : |
| i = 0 |
| for item in data: |
| self.db.put(item, str(i)) |
| i = i + 1 |
| |
| def createDB(self, key_comparator) : |
| self.db = db.DB(self.env) |
| self.setupDB(key_comparator) |
| self.db.open(self.filename, "test", db.DB_BTREE, db.DB_CREATE) |
| |
| def setupDB(self, key_comparator) : |
| self.db.set_bt_compare(key_comparator) |
| |
| def closeDB(self) : |
| if self.db is not None: |
| self.db.close() |
| self.db = None |
| |
| def startTest(self) : |
| pass |
| |
| def finishTest(self, expected = None) : |
| if expected is not None: |
| self.check_results(expected) |
| self.closeDB() |
| |
| def check_results(self, expected) : |
| curs = self.db.cursor() |
| try: |
| index = 0 |
| rec = curs.first() |
| while rec: |
| key, ignore = rec |
| self.assertLess(index, len(expected), |
| "to many values returned from cursor") |
| self.assertEqual(expected[index], key, |
| "expected value `%s' at %d but got `%s'" |
| % (expected[index], index, key)) |
| index = index + 1 |
| rec = curs.next() |
| self.assertEqual(index, len(expected), |
| "not enough values returned from cursor") |
| finally: |
| curs.close() |
| |
| class BtreeKeyCompareTestCase(AbstractBtreeKeyCompareTestCase) : |
| def runCompareTest(self, comparator, data) : |
| self.startTest() |
| self.createDB(comparator) |
| self.addDataToDB(data) |
| self.finishTest(data) |
| |
| def test_lexical_ordering(self) : |
| self.runCompareTest(lexical_cmp, _expected_lexical_test_data) |
| |
| def test_reverse_lexical_ordering(self) : |
| expected_rev_data = _expected_lexical_test_data[:] |
| expected_rev_data.reverse() |
| self.runCompareTest(make_reverse_comparator(lexical_cmp), |
| expected_rev_data) |
| |
| def test_compare_function_useless(self) : |
| self.startTest() |
| def socialist_comparator(l, r) : |
| return 0 |
| self.createDB(socialist_comparator) |
| self.addDataToDB(['b', 'a', 'd']) |
| # all things being equal the first key will be the only key |
| # in the database... (with the last key's value fwiw) |
| self.finishTest(['b']) |
| |
| |
| class BtreeExceptionsTestCase(AbstractBtreeKeyCompareTestCase) : |
| def test_raises_non_callable(self) : |
| self.startTest() |
| self.assertRaises(TypeError, self.createDB, 'abc') |
| self.assertRaises(TypeError, self.createDB, None) |
| self.finishTest() |
| |
| def test_set_bt_compare_with_function(self) : |
| self.startTest() |
| self.createDB(lexical_cmp) |
| self.finishTest() |
| |
| def check_results(self, results) : |
| pass |
| |
| def test_compare_function_incorrect(self) : |
| self.startTest() |
| def bad_comparator(l, r) : |
| return 1 |
| # verify that set_bt_compare checks that comparator('', '') == 0 |
| self.assertRaises(TypeError, self.createDB, bad_comparator) |
| self.finishTest() |
| |
| def verifyStderr(self, method, successRe) : |
| """ |
| Call method() while capturing sys.stderr output internally and |
| call self.fail() if successRe.search() does not match the stderr |
| output. This is used to test for uncatchable exceptions. |
| """ |
| stdErr = sys.stderr |
| sys.stderr = StringIO() |
| try: |
| method() |
| finally: |
| temp = sys.stderr |
| sys.stderr = stdErr |
| errorOut = temp.getvalue() |
| if not successRe.search(errorOut) : |
| self.fail("unexpected stderr output:\n"+errorOut) |
| if sys.version_info < (3, 0) : # XXX: How to do this in Py3k ??? |
| sys.exc_traceback = sys.last_traceback = None |
| |
| def _test_compare_function_exception(self) : |
| self.startTest() |
| def bad_comparator(l, r) : |
| if l == r: |
| # pass the set_bt_compare test |
| return 0 |
| raise RuntimeError, "i'm a naughty comparison function" |
| self.createDB(bad_comparator) |
| #print "\n*** test should print 2 uncatchable tracebacks ***" |
| self.addDataToDB(['a', 'b', 'c']) # this should raise, but... |
| self.finishTest() |
| |
| def test_compare_function_exception(self) : |
| self.verifyStderr( |
| self._test_compare_function_exception, |
| re.compile('(^RuntimeError:.* naughty.*){2}', re.M|re.S) |
| ) |
| |
| def _test_compare_function_bad_return(self) : |
| self.startTest() |
| def bad_comparator(l, r) : |
| if l == r: |
| # pass the set_bt_compare test |
| return 0 |
| return l |
| self.createDB(bad_comparator) |
| #print "\n*** test should print 2 errors about returning an int ***" |
| self.addDataToDB(['a', 'b', 'c']) # this should raise, but... |
| self.finishTest() |
| |
| def test_compare_function_bad_return(self) : |
| self.verifyStderr( |
| self._test_compare_function_bad_return, |
| re.compile('(^TypeError:.* return an int.*){2}', re.M|re.S) |
| ) |
| |
| |
| def test_cannot_assign_twice(self) : |
| |
| def my_compare(a, b) : |
| return 0 |
| |
| self.startTest() |
| self.createDB(my_compare) |
| self.assertRaises(RuntimeError, self.db.set_bt_compare, my_compare) |
| |
| class AbstractDuplicateCompareTestCase(unittest.TestCase) : |
| env = None |
| db = None |
| |
| if (sys.version_info < (2, 7)) or ((sys.version_info >= (3,0)) and |
| (sys.version_info < (3, 2))) : |
| def assertLess(self, a, b, msg=None) : |
| return self.assertTrue(a<b, msg=msg) |
| |
| def setUp(self) : |
| self.filename = self.__class__.__name__ + '.db' |
| self.homeDir = get_new_environment_path() |
| env = db.DBEnv() |
| env.open(self.homeDir, |
| db.DB_CREATE | db.DB_INIT_MPOOL |
| | db.DB_INIT_LOCK | db.DB_THREAD) |
| self.env = env |
| |
| def tearDown(self) : |
| self.closeDB() |
| if self.env is not None: |
| self.env.close() |
| self.env = None |
| test_support.rmtree(self.homeDir) |
| |
| def addDataToDB(self, data) : |
| for item in data: |
| self.db.put("key", item) |
| |
| def createDB(self, dup_comparator) : |
| self.db = db.DB(self.env) |
| self.setupDB(dup_comparator) |
| self.db.open(self.filename, "test", db.DB_BTREE, db.DB_CREATE) |
| |
| def setupDB(self, dup_comparator) : |
| self.db.set_flags(db.DB_DUPSORT) |
| self.db.set_dup_compare(dup_comparator) |
| |
| def closeDB(self) : |
| if self.db is not None: |
| self.db.close() |
| self.db = None |
| |
| def startTest(self) : |
| pass |
| |
| def finishTest(self, expected = None) : |
| if expected is not None: |
| self.check_results(expected) |
| self.closeDB() |
| |
| def check_results(self, expected) : |
| curs = self.db.cursor() |
| try: |
| index = 0 |
| rec = curs.first() |
| while rec: |
| ignore, data = rec |
| self.assertLess(index, len(expected), |
| "to many values returned from cursor") |
| self.assertEqual(expected[index], data, |
| "expected value `%s' at %d but got `%s'" |
| % (expected[index], index, data)) |
| index = index + 1 |
| rec = curs.next() |
| self.assertEqual(index, len(expected), |
| "not enough values returned from cursor") |
| finally: |
| curs.close() |
| |
| class DuplicateCompareTestCase(AbstractDuplicateCompareTestCase) : |
| def runCompareTest(self, comparator, data) : |
| self.startTest() |
| self.createDB(comparator) |
| self.addDataToDB(data) |
| self.finishTest(data) |
| |
| def test_lexical_ordering(self) : |
| self.runCompareTest(lexical_cmp, _expected_lexical_test_data) |
| |
| def test_reverse_lexical_ordering(self) : |
| expected_rev_data = _expected_lexical_test_data[:] |
| expected_rev_data.reverse() |
| self.runCompareTest(make_reverse_comparator(lexical_cmp), |
| expected_rev_data) |
| |
| class DuplicateExceptionsTestCase(AbstractDuplicateCompareTestCase) : |
| def test_raises_non_callable(self) : |
| self.startTest() |
| self.assertRaises(TypeError, self.createDB, 'abc') |
| self.assertRaises(TypeError, self.createDB, None) |
| self.finishTest() |
| |
| def test_set_dup_compare_with_function(self) : |
| self.startTest() |
| self.createDB(lexical_cmp) |
| self.finishTest() |
| |
| def check_results(self, results) : |
| pass |
| |
| def test_compare_function_incorrect(self) : |
| self.startTest() |
| def bad_comparator(l, r) : |
| return 1 |
| # verify that set_dup_compare checks that comparator('', '') == 0 |
| self.assertRaises(TypeError, self.createDB, bad_comparator) |
| self.finishTest() |
| |
| def test_compare_function_useless(self) : |
| self.startTest() |
| def socialist_comparator(l, r) : |
| return 0 |
| self.createDB(socialist_comparator) |
| # DUPSORT does not allow "duplicate duplicates" |
| self.assertRaises(db.DBKeyExistError, self.addDataToDB, ['b', 'a', 'd']) |
| self.finishTest() |
| |
| def verifyStderr(self, method, successRe) : |
| """ |
| Call method() while capturing sys.stderr output internally and |
| call self.fail() if successRe.search() does not match the stderr |
| output. This is used to test for uncatchable exceptions. |
| """ |
| stdErr = sys.stderr |
| sys.stderr = StringIO() |
| try: |
| method() |
| finally: |
| temp = sys.stderr |
| sys.stderr = stdErr |
| errorOut = temp.getvalue() |
| if not successRe.search(errorOut) : |
| self.fail("unexpected stderr output:\n"+errorOut) |
| if sys.version_info < (3, 0) : # XXX: How to do this in Py3k ??? |
| sys.exc_traceback = sys.last_traceback = None |
| |
| def _test_compare_function_exception(self) : |
| self.startTest() |
| def bad_comparator(l, r) : |
| if l == r: |
| # pass the set_dup_compare test |
| return 0 |
| raise RuntimeError, "i'm a naughty comparison function" |
| self.createDB(bad_comparator) |
| #print "\n*** test should print 2 uncatchable tracebacks ***" |
| self.addDataToDB(['a', 'b', 'c']) # this should raise, but... |
| self.finishTest() |
| |
| def test_compare_function_exception(self) : |
| self.verifyStderr( |
| self._test_compare_function_exception, |
| re.compile('(^RuntimeError:.* naughty.*){2}', re.M|re.S) |
| ) |
| |
| def _test_compare_function_bad_return(self) : |
| self.startTest() |
| def bad_comparator(l, r) : |
| if l == r: |
| # pass the set_dup_compare test |
| return 0 |
| return l |
| self.createDB(bad_comparator) |
| #print "\n*** test should print 2 errors about returning an int ***" |
| self.addDataToDB(['a', 'b', 'c']) # this should raise, but... |
| self.finishTest() |
| |
| def test_compare_function_bad_return(self) : |
| self.verifyStderr( |
| self._test_compare_function_bad_return, |
| re.compile('(^TypeError:.* return an int.*){2}', re.M|re.S) |
| ) |
| |
| |
| def test_cannot_assign_twice(self) : |
| |
| def my_compare(a, b) : |
| return 0 |
| |
| self.startTest() |
| self.createDB(my_compare) |
| self.assertRaises(RuntimeError, self.db.set_dup_compare, my_compare) |
| |
| def test_suite() : |
| res = unittest.TestSuite() |
| |
| res.addTest(unittest.makeSuite(ComparatorTests)) |
| res.addTest(unittest.makeSuite(BtreeExceptionsTestCase)) |
| res.addTest(unittest.makeSuite(BtreeKeyCompareTestCase)) |
| res.addTest(unittest.makeSuite(DuplicateExceptionsTestCase)) |
| res.addTest(unittest.makeSuite(DuplicateCompareTestCase)) |
| return res |
| |
| if __name__ == '__main__': |
| unittest.main(defaultTest = 'suite') |