| """ |
| TestCases for testing the locking sub-system. |
| """ |
| |
| import time |
| |
| import unittest |
| from test_all import db, test_support, verbose, have_threads, \ |
| get_new_environment_path, get_new_database_path |
| |
| if have_threads : |
| from threading import Thread |
| import sys |
| if sys.version_info[0] < 3 : |
| from threading import currentThread |
| else : |
| from threading import current_thread as currentThread |
| |
| #---------------------------------------------------------------------- |
| |
| class LockingTestCase(unittest.TestCase): |
| def setUp(self): |
| self.homeDir = get_new_environment_path() |
| self.env = db.DBEnv() |
| self.env.open(self.homeDir, db.DB_THREAD | db.DB_INIT_MPOOL | |
| db.DB_INIT_LOCK | db.DB_CREATE) |
| |
| |
| def tearDown(self): |
| self.env.close() |
| test_support.rmtree(self.homeDir) |
| |
| |
| def test01_simple(self): |
| if verbose: |
| print '\n', '-=' * 30 |
| print "Running %s.test01_simple..." % self.__class__.__name__ |
| |
| anID = self.env.lock_id() |
| if verbose: |
| print "locker ID: %s" % anID |
| lock = self.env.lock_get(anID, "some locked thing", db.DB_LOCK_WRITE) |
| if verbose: |
| print "Aquired lock: %s" % lock |
| self.env.lock_put(lock) |
| if verbose: |
| print "Released lock: %s" % lock |
| self.env.lock_id_free(anID) |
| |
| |
| def test02_threaded(self): |
| if verbose: |
| print '\n', '-=' * 30 |
| print "Running %s.test02_threaded..." % self.__class__.__name__ |
| |
| threads = [] |
| threads.append(Thread(target = self.theThread, |
| args=(db.DB_LOCK_WRITE,))) |
| threads.append(Thread(target = self.theThread, |
| args=(db.DB_LOCK_READ,))) |
| threads.append(Thread(target = self.theThread, |
| args=(db.DB_LOCK_READ,))) |
| threads.append(Thread(target = self.theThread, |
| args=(db.DB_LOCK_WRITE,))) |
| threads.append(Thread(target = self.theThread, |
| args=(db.DB_LOCK_READ,))) |
| threads.append(Thread(target = self.theThread, |
| args=(db.DB_LOCK_READ,))) |
| threads.append(Thread(target = self.theThread, |
| args=(db.DB_LOCK_WRITE,))) |
| threads.append(Thread(target = self.theThread, |
| args=(db.DB_LOCK_WRITE,))) |
| threads.append(Thread(target = self.theThread, |
| args=(db.DB_LOCK_WRITE,))) |
| |
| for t in threads: |
| import sys |
| if sys.version_info[0] < 3 : |
| t.setDaemon(True) |
| else : |
| t.daemon = True |
| t.start() |
| for t in threads: |
| t.join() |
| |
| def test03_lock_timeout(self): |
| self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT) |
| self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 0) |
| self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT) |
| self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 0) |
| self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT) |
| self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 123456) |
| self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT) |
| self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 7890123) |
| |
| def test04_lock_timeout2(self): |
| self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT) |
| self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT) |
| self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT) |
| self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT) |
| |
| def deadlock_detection() : |
| while not deadlock_detection.end : |
| deadlock_detection.count = \ |
| self.env.lock_detect(db.DB_LOCK_EXPIRE) |
| if deadlock_detection.count : |
| while not deadlock_detection.end : |
| pass |
| break |
| time.sleep(0.01) |
| |
| deadlock_detection.end=False |
| deadlock_detection.count=0 |
| t=Thread(target=deadlock_detection) |
| import sys |
| if sys.version_info[0] < 3 : |
| t.setDaemon(True) |
| else : |
| t.daemon = True |
| t.start() |
| self.env.set_timeout(100000, db.DB_SET_LOCK_TIMEOUT) |
| anID = self.env.lock_id() |
| anID2 = self.env.lock_id() |
| self.assertNotEqual(anID, anID2) |
| lock = self.env.lock_get(anID, "shared lock", db.DB_LOCK_WRITE) |
| start_time=time.time() |
| self.assertRaises(db.DBLockNotGrantedError, |
| self.env.lock_get,anID2, "shared lock", db.DB_LOCK_READ) |
| end_time=time.time() |
| deadlock_detection.end=True |
| # Floating point rounding |
| self.assertTrue((end_time-start_time) >= 0.0999) |
| self.env.lock_put(lock) |
| t.join() |
| |
| self.env.lock_id_free(anID) |
| self.env.lock_id_free(anID2) |
| |
| if db.version() >= (4,6): |
| self.assertTrue(deadlock_detection.count>0) |
| |
| def theThread(self, lockType): |
| import sys |
| if sys.version_info[0] < 3 : |
| name = currentThread().getName() |
| else : |
| name = currentThread().name |
| |
| if lockType == db.DB_LOCK_WRITE: |
| lt = "write" |
| else: |
| lt = "read" |
| |
| anID = self.env.lock_id() |
| if verbose: |
| print "%s: locker ID: %s" % (name, anID) |
| |
| for i in xrange(1000) : |
| lock = self.env.lock_get(anID, "some locked thing", lockType) |
| if verbose: |
| print "%s: Aquired %s lock: %s" % (name, lt, lock) |
| |
| self.env.lock_put(lock) |
| if verbose: |
| print "%s: Released %s lock: %s" % (name, lt, lock) |
| |
| self.env.lock_id_free(anID) |
| |
| |
| #---------------------------------------------------------------------- |
| |
| def test_suite(): |
| suite = unittest.TestSuite() |
| |
| if have_threads: |
| suite.addTest(unittest.makeSuite(LockingTestCase)) |
| else: |
| suite.addTest(unittest.makeSuite(LockingTestCase, 'test01')) |
| |
| return suite |
| |
| |
| if __name__ == '__main__': |
| unittest.main(defaultTest='test_suite') |