| # Tests StringIO and cStringIO |
| |
| import unittest |
| import StringIO |
| import cStringIO |
| import types |
| import array |
| import sys |
| from test import test_support |
| |
| |
| class TestGenericStringIO(unittest.TestCase): |
| # use a class variable MODULE to define which module is being tested |
| |
| # Line of data to test as string |
| _line = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!' |
| |
| # Constructor to use for the test data (._line is passed to this |
| # constructor) |
| constructor = str |
| |
| def setUp(self): |
| self._lines = self.constructor((self._line + '\n') * 5) |
| self._fp = self.MODULE.StringIO(self._lines) |
| |
| def test_reads(self): |
| eq = self.assertEqual |
| self.assertRaises(TypeError, self._fp.seek) |
| eq(self._fp.read(10), self._line[:10]) |
| eq(self._fp.read(0), '') |
| eq(self._fp.readline(0), '') |
| eq(self._fp.readline(), self._line[10:] + '\n') |
| eq(len(self._fp.readlines(60)), 2) |
| self._fp.seek(0) |
| eq(self._fp.readline(-1), self._line + '\n') |
| |
| def test_writes(self): |
| f = self.MODULE.StringIO() |
| self.assertRaises(TypeError, f.seek) |
| f.write(self._line[:6]) |
| f.seek(3) |
| f.write(self._line[20:26]) |
| f.write(self._line[52]) |
| self.assertEqual(f.getvalue(), 'abcuvwxyz!') |
| |
| def test_writelines(self): |
| f = self.MODULE.StringIO() |
| f.writelines([self._line[0], self._line[1], self._line[2]]) |
| f.seek(0) |
| self.assertEqual(f.getvalue(), 'abc') |
| |
| def test_writelines_error(self): |
| def errorGen(): |
| yield 'a' |
| raise KeyboardInterrupt() |
| f = self.MODULE.StringIO() |
| self.assertRaises(KeyboardInterrupt, f.writelines, errorGen()) |
| |
| def test_truncate(self): |
| eq = self.assertEqual |
| f = self.MODULE.StringIO() |
| f.write(self._lines) |
| f.seek(10) |
| f.truncate() |
| eq(f.getvalue(), 'abcdefghij') |
| f.truncate(5) |
| eq(f.getvalue(), 'abcde') |
| f.write('xyz') |
| eq(f.getvalue(), 'abcdexyz') |
| self.assertRaises(IOError, f.truncate, -1) |
| f.close() |
| self.assertRaises(ValueError, f.write, 'frobnitz') |
| |
| def test_closed_flag(self): |
| f = self.MODULE.StringIO() |
| self.assertEqual(f.closed, False) |
| f.close() |
| self.assertEqual(f.closed, True) |
| f = self.MODULE.StringIO("abc") |
| self.assertEqual(f.closed, False) |
| f.close() |
| self.assertEqual(f.closed, True) |
| |
| def test_isatty(self): |
| f = self.MODULE.StringIO() |
| self.assertRaises(TypeError, f.isatty, None) |
| self.assertEqual(f.isatty(), False) |
| f.close() |
| self.assertRaises(ValueError, f.isatty) |
| |
| def test_iterator(self): |
| eq = self.assertEqual |
| unless = self.assertTrue |
| eq(iter(self._fp), self._fp) |
| # Does this object support the iteration protocol? |
| unless(hasattr(self._fp, '__iter__')) |
| unless(hasattr(self._fp, 'next')) |
| i = 0 |
| for line in self._fp: |
| eq(line, self._line + '\n') |
| i += 1 |
| eq(i, 5) |
| self._fp.close() |
| self.assertRaises(ValueError, self._fp.next) |
| |
| def test_getvalue(self): |
| self._fp.close() |
| self.assertRaises(ValueError, self._fp.getvalue) |
| |
| @test_support.bigmemtest(test_support._2G + 2**26, memuse=2.001) |
| def test_reads_from_large_stream(self, size): |
| linesize = 2**26 # 64 MiB |
| lines = ['x' * (linesize - 1) + '\n'] * (size // linesize) + \ |
| ['y' * (size % linesize)] |
| f = self.MODULE.StringIO(''.join(lines)) |
| for i, expected in enumerate(lines): |
| line = f.read(len(expected)) |
| self.assertEqual(len(line), len(expected)) |
| self.assertEqual(line, expected) |
| self.assertEqual(f.read(), '') |
| f.seek(0) |
| for i, expected in enumerate(lines): |
| line = f.readline() |
| self.assertEqual(len(line), len(expected)) |
| self.assertEqual(line, expected) |
| self.assertEqual(f.readline(), '') |
| f.seek(0) |
| self.assertEqual(f.readlines(), lines) |
| self.assertEqual(f.readlines(), []) |
| f.seek(0) |
| self.assertEqual(f.readlines(size), lines) |
| self.assertEqual(f.readlines(), []) |
| |
| # In worst case cStringIO requires 2 + 1 + 1/2 + 1/2**2 + ... = 4 |
| # bytes per input character. |
| @test_support.bigmemtest(test_support._2G, memuse=4) |
| def test_writes_to_large_stream(self, size): |
| s = 'x' * 2**26 # 64 MiB |
| f = self.MODULE.StringIO() |
| n = size |
| while n > len(s): |
| f.write(s) |
| n -= len(s) |
| s = None |
| f.write('x' * n) |
| self.assertEqual(len(f.getvalue()), size) |
| |
| |
| class TestStringIO(TestGenericStringIO): |
| MODULE = StringIO |
| |
| def test_unicode(self): |
| |
| if not test_support.have_unicode: return |
| |
| # The StringIO module also supports concatenating Unicode |
| # snippets to larger Unicode strings. This is tested by this |
| # method. Note that cStringIO does not support this extension. |
| |
| f = self.MODULE.StringIO() |
| f.write(self._line[:6]) |
| f.seek(3) |
| f.write(unicode(self._line[20:26])) |
| f.write(unicode(self._line[52])) |
| s = f.getvalue() |
| self.assertEqual(s, unicode('abcuvwxyz!')) |
| self.assertEqual(type(s), types.UnicodeType) |
| |
| class TestcStringIO(TestGenericStringIO): |
| MODULE = cStringIO |
| |
| def test_array_support(self): |
| # Issue #1730114: cStringIO should accept array objects |
| a = array.array('B', [0,1,2]) |
| f = self.MODULE.StringIO(a) |
| self.assertEqual(f.getvalue(), '\x00\x01\x02') |
| |
| def test_unicode(self): |
| |
| if not test_support.have_unicode: return |
| |
| # The cStringIO module converts Unicode strings to character |
| # strings when writing them to cStringIO objects. |
| # Check that this works. |
| |
| f = self.MODULE.StringIO() |
| f.write(u'abcde') |
| s = f.getvalue() |
| self.assertEqual(s, 'abcde') |
| self.assertEqual(type(s), str) |
| |
| f = self.MODULE.StringIO(u'abcde') |
| s = f.getvalue() |
| self.assertEqual(s, 'abcde') |
| self.assertEqual(type(s), str) |
| |
| self.assertRaises(UnicodeEncodeError, self.MODULE.StringIO, u'\xf4') |
| |
| |
| import sys |
| if sys.platform.startswith('java'): |
| # Jython doesn't have a buffer object, so we just do a useless |
| # fake of the buffer tests. |
| buffer = str |
| |
| class TestBufferStringIO(TestStringIO): |
| constructor = buffer |
| |
| class TestBuffercStringIO(TestcStringIO): |
| constructor = buffer |
| |
| class TestMemoryviewcStringIO(TestcStringIO): |
| constructor = memoryview |
| |
| |
| def test_main(): |
| test_support.run_unittest(TestStringIO, TestcStringIO) |
| with test_support.check_py3k_warnings(("buffer.. not supported", |
| DeprecationWarning)): |
| test_support.run_unittest(TestBufferStringIO, TestBuffercStringIO) |
| test_support.run_unittest(TestMemoryviewcStringIO) |
| |
| if __name__ == '__main__': |
| test_main() |