| """TestCases for distributed transactions. |
| """ |
| |
| import os |
| import unittest |
| |
| from test_all import db, test_support, get_new_environment_path, \ |
| get_new_database_path |
| |
| from test_all import verbose |
| |
| #---------------------------------------------------------------------- |
| |
| class DBTxn_distributed(unittest.TestCase): |
| num_txns=1234 |
| nosync=True |
| must_open_db=False |
| def _create_env(self, must_open_db) : |
| self.dbenv = db.DBEnv() |
| self.dbenv.set_tx_max(self.num_txns) |
| self.dbenv.set_lk_max_lockers(self.num_txns*2) |
| self.dbenv.set_lk_max_locks(self.num_txns*2) |
| self.dbenv.set_lk_max_objects(self.num_txns*2) |
| if self.nosync : |
| self.dbenv.set_flags(db.DB_TXN_NOSYNC,True) |
| self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_THREAD | |
| db.DB_RECOVER | |
| db.DB_INIT_TXN | db.DB_INIT_LOG | db.DB_INIT_MPOOL | |
| db.DB_INIT_LOCK, 0666) |
| self.db = db.DB(self.dbenv) |
| self.db.set_re_len(db.DB_GID_SIZE) |
| if must_open_db : |
| txn=self.dbenv.txn_begin() |
| self.db.open(self.filename, |
| db.DB_QUEUE, db.DB_CREATE | db.DB_THREAD, 0666, |
| txn=txn) |
| txn.commit() |
| |
| def setUp(self) : |
| self.homeDir = get_new_environment_path() |
| self.filename = "test" |
| return self._create_env(must_open_db=True) |
| |
| def _destroy_env(self): |
| if self.nosync or (db.version()[:2] == (4,6)): # Known bug |
| self.dbenv.log_flush() |
| self.db.close() |
| self.dbenv.close() |
| |
| def tearDown(self): |
| self._destroy_env() |
| test_support.rmtree(self.homeDir) |
| |
| def _recreate_env(self,must_open_db) : |
| self._destroy_env() |
| self._create_env(must_open_db) |
| |
| def test01_distributed_transactions(self) : |
| txns=set() |
| adapt = lambda x : x |
| import sys |
| if sys.version_info[0] >= 3 : |
| adapt = lambda x : bytes(x, "ascii") |
| # Create transactions, "prepare" them, and |
| # let them be garbage collected. |
| for i in xrange(self.num_txns) : |
| txn = self.dbenv.txn_begin() |
| gid = "%%%dd" %db.DB_GID_SIZE |
| gid = adapt(gid %i) |
| self.db.put(i, gid, txn=txn, flags=db.DB_APPEND) |
| txns.add(gid) |
| txn.prepare(gid) |
| del txn |
| |
| self._recreate_env(self.must_open_db) |
| |
| # Get "to be recovered" transactions but |
| # let them be garbage collected. |
| recovered_txns=self.dbenv.txn_recover() |
| self.assertEqual(self.num_txns,len(recovered_txns)) |
| for gid,txn in recovered_txns : |
| self.assertTrue(gid in txns) |
| del txn |
| del recovered_txns |
| |
| self._recreate_env(self.must_open_db) |
| |
| # Get "to be recovered" transactions. Commit, abort and |
| # discard them. |
| recovered_txns=self.dbenv.txn_recover() |
| self.assertEqual(self.num_txns,len(recovered_txns)) |
| discard_txns=set() |
| committed_txns=set() |
| state=0 |
| for gid,txn in recovered_txns : |
| if state==0 or state==1: |
| committed_txns.add(gid) |
| txn.commit() |
| elif state==2 : |
| txn.abort() |
| elif state==3 : |
| txn.discard() |
| discard_txns.add(gid) |
| state=-1 |
| state+=1 |
| del txn |
| del recovered_txns |
| |
| self._recreate_env(self.must_open_db) |
| |
| # Verify the discarded transactions are still |
| # around, and dispose them. |
| recovered_txns=self.dbenv.txn_recover() |
| self.assertEqual(len(discard_txns),len(recovered_txns)) |
| for gid,txn in recovered_txns : |
| txn.abort() |
| del txn |
| del recovered_txns |
| |
| self._recreate_env(must_open_db=True) |
| |
| # Be sure there are not pending transactions. |
| # Check also database size. |
| recovered_txns=self.dbenv.txn_recover() |
| self.assertTrue(len(recovered_txns)==0) |
| self.assertEqual(len(committed_txns),self.db.stat()["nkeys"]) |
| |
| class DBTxn_distributedSYNC(DBTxn_distributed): |
| nosync=False |
| |
| class DBTxn_distributed_must_open_db(DBTxn_distributed): |
| must_open_db=True |
| |
| class DBTxn_distributedSYNC_must_open_db(DBTxn_distributed): |
| nosync=False |
| must_open_db=True |
| |
| #---------------------------------------------------------------------- |
| |
| def test_suite(): |
| suite = unittest.TestSuite() |
| if db.version() >= (4,5) : |
| suite.addTest(unittest.makeSuite(DBTxn_distributed)) |
| suite.addTest(unittest.makeSuite(DBTxn_distributedSYNC)) |
| if db.version() >= (4,6) : |
| suite.addTest(unittest.makeSuite(DBTxn_distributed_must_open_db)) |
| suite.addTest(unittest.makeSuite(DBTxn_distributedSYNC_must_open_db)) |
| return suite |
| |
| |
| if __name__ == '__main__': |
| unittest.main(defaultTest='test_suite') |