diff options
Diffstat (limited to 'lib/python2.7/test/test_fileio.py')
-rw-r--r-- | lib/python2.7/test/test_fileio.py | 479 |
1 files changed, 479 insertions, 0 deletions
diff --git a/lib/python2.7/test/test_fileio.py b/lib/python2.7/test/test_fileio.py new file mode 100644 index 0000000..b74cec2 --- /dev/null +++ b/lib/python2.7/test/test_fileio.py @@ -0,0 +1,479 @@ +# Adapted from test_file.py by Daniel Stutzbach + +from __future__ import unicode_literals + +import sys +import os +import errno +import unittest +from array import array +from weakref import proxy +from functools import wraps +from UserList import UserList +import _testcapi + +from test.test_support import TESTFN, check_warnings, run_unittest, make_bad_fd +from test.test_support import py3k_bytes as bytes +from test.script_helper import run_python + +from _io import FileIO as _FileIO + +class AutoFileTests(unittest.TestCase): + # file tests for which a test file is automatically set up + + def setUp(self): + self.f = _FileIO(TESTFN, 'w') + + def tearDown(self): + if self.f: + self.f.close() + os.remove(TESTFN) + + def testWeakRefs(self): + # verify weak references + p = proxy(self.f) + p.write(bytes(range(10))) + self.assertEqual(self.f.tell(), p.tell()) + self.f.close() + self.f = None + self.assertRaises(ReferenceError, getattr, p, 'tell') + + def testSeekTell(self): + self.f.write(bytes(range(20))) + self.assertEqual(self.f.tell(), 20) + self.f.seek(0) + self.assertEqual(self.f.tell(), 0) + self.f.seek(10) + self.assertEqual(self.f.tell(), 10) + self.f.seek(5, 1) + self.assertEqual(self.f.tell(), 15) + self.f.seek(-5, 1) + self.assertEqual(self.f.tell(), 10) + self.f.seek(-5, 2) + self.assertEqual(self.f.tell(), 15) + + def testAttributes(self): + # verify expected attributes exist + f = self.f + + self.assertEqual(f.mode, "wb") + self.assertEqual(f.closed, False) + + # verify the attributes are readonly + for attr in 'mode', 'closed': + self.assertRaises((AttributeError, TypeError), + setattr, f, attr, 'oops') + + def testReadinto(self): + # verify readinto + self.f.write(b"\x01\x02") + self.f.close() + a = array(b'b', b'x'*10) + self.f = _FileIO(TESTFN, 'r') + n = self.f.readinto(a) + self.assertEqual(array(b'b', [1, 2]), a[:n]) + + def testWritelinesList(self): + l = [b'123', b'456'] + self.f.writelines(l) + self.f.close() + self.f = _FileIO(TESTFN, 'rb') + buf = self.f.read() + self.assertEqual(buf, b'123456') + + def testWritelinesUserList(self): + l = UserList([b'123', b'456']) + self.f.writelines(l) + self.f.close() + self.f = _FileIO(TESTFN, 'rb') + buf = self.f.read() + self.assertEqual(buf, b'123456') + + def testWritelinesError(self): + self.assertRaises(TypeError, self.f.writelines, [1, 2, 3]) + self.assertRaises(TypeError, self.f.writelines, None) + + def test_none_args(self): + self.f.write(b"hi\nbye\nabc") + self.f.close() + self.f = _FileIO(TESTFN, 'r') + self.assertEqual(self.f.read(None), b"hi\nbye\nabc") + self.f.seek(0) + self.assertEqual(self.f.readline(None), b"hi\n") + self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"]) + + def testRepr(self): + self.assertEqual(repr(self.f), "<_io.FileIO name=%r mode='%s'>" + % (self.f.name, self.f.mode)) + del self.f.name + self.assertEqual(repr(self.f), "<_io.FileIO fd=%r mode='%s'>" + % (self.f.fileno(), self.f.mode)) + self.f.close() + self.assertEqual(repr(self.f), "<_io.FileIO [closed]>") + + def testErrors(self): + f = self.f + self.assertTrue(not f.isatty()) + self.assertTrue(not f.closed) + #self.assertEqual(f.name, TESTFN) + self.assertRaises(ValueError, f.read, 10) # Open for reading + f.close() + self.assertTrue(f.closed) + f = _FileIO(TESTFN, 'r') + self.assertRaises(TypeError, f.readinto, "") + self.assertTrue(not f.closed) + f.close() + self.assertTrue(f.closed) + + def testMethods(self): + methods = ['fileno', 'isatty', 'read', 'readinto', + 'seek', 'tell', 'truncate', 'write', 'seekable', + 'readable', 'writable'] + if sys.platform.startswith('atheos'): + methods.remove('truncate') + + self.f.close() + self.assertTrue(self.f.closed) + + for methodname in methods: + method = getattr(self.f, methodname) + # should raise on closed file + self.assertRaises(ValueError, method) + + def testOpendir(self): + # Issue 3703: opening a directory should fill the errno + # Windows always returns "[Errno 13]: Permission denied + # Unix calls dircheck() and returns "[Errno 21]: Is a directory" + try: + _FileIO('.', 'r') + except IOError as e: + self.assertNotEqual(e.errno, 0) + self.assertEqual(e.filename, ".") + else: + self.fail("Should have raised IOError") + + @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system") + def testOpenDirFD(self): + fd = os.open('.', os.O_RDONLY) + with self.assertRaises(IOError) as cm: + _FileIO(fd, 'r') + os.close(fd) + self.assertEqual(cm.exception.errno, errno.EISDIR) + + #A set of functions testing that we get expected behaviour if someone has + #manually closed the internal file descriptor. First, a decorator: + def ClosedFD(func): + @wraps(func) + def wrapper(self): + #forcibly close the fd before invoking the problem function + f = self.f + os.close(f.fileno()) + try: + func(self, f) + finally: + try: + self.f.close() + except IOError: + pass + return wrapper + + def ClosedFDRaises(func): + @wraps(func) + def wrapper(self): + #forcibly close the fd before invoking the problem function + f = self.f + os.close(f.fileno()) + try: + func(self, f) + except IOError as e: + self.assertEqual(e.errno, errno.EBADF) + else: + self.fail("Should have raised IOError") + finally: + try: + self.f.close() + except IOError: + pass + return wrapper + + @ClosedFDRaises + def testErrnoOnClose(self, f): + f.close() + + @ClosedFDRaises + def testErrnoOnClosedWrite(self, f): + f.write('a') + + @ClosedFDRaises + def testErrnoOnClosedSeek(self, f): + f.seek(0) + + @ClosedFDRaises + def testErrnoOnClosedTell(self, f): + f.tell() + + @ClosedFDRaises + def testErrnoOnClosedTruncate(self, f): + f.truncate(0) + + @ClosedFD + def testErrnoOnClosedSeekable(self, f): + f.seekable() + + @ClosedFD + def testErrnoOnClosedReadable(self, f): + f.readable() + + @ClosedFD + def testErrnoOnClosedWritable(self, f): + f.writable() + + @ClosedFD + def testErrnoOnClosedFileno(self, f): + f.fileno() + + @ClosedFD + def testErrnoOnClosedIsatty(self, f): + self.assertEqual(f.isatty(), False) + + def ReopenForRead(self): + try: + self.f.close() + except IOError: + pass + self.f = _FileIO(TESTFN, 'r') + os.close(self.f.fileno()) + return self.f + + @ClosedFDRaises + def testErrnoOnClosedRead(self, f): + f = self.ReopenForRead() + f.read(1) + + @ClosedFDRaises + def testErrnoOnClosedReadall(self, f): + f = self.ReopenForRead() + f.readall() + + @ClosedFDRaises + def testErrnoOnClosedReadinto(self, f): + f = self.ReopenForRead() + a = array(b'b', b'x'*10) + f.readinto(a) + +class OtherFileTests(unittest.TestCase): + + def testAbles(self): + try: + f = _FileIO(TESTFN, "w") + self.assertEqual(f.readable(), False) + self.assertEqual(f.writable(), True) + self.assertEqual(f.seekable(), True) + f.close() + + f = _FileIO(TESTFN, "r") + self.assertEqual(f.readable(), True) + self.assertEqual(f.writable(), False) + self.assertEqual(f.seekable(), True) + f.close() + + f = _FileIO(TESTFN, "a+") + self.assertEqual(f.readable(), True) + self.assertEqual(f.writable(), True) + self.assertEqual(f.seekable(), True) + self.assertEqual(f.isatty(), False) + f.close() + + if sys.platform != "win32": + try: + f = _FileIO("/dev/tty", "a") + except EnvironmentError: + # When run in a cron job there just aren't any + # ttys, so skip the test. This also handles other + # OS'es that don't support /dev/tty. + pass + else: + self.assertEqual(f.readable(), False) + self.assertEqual(f.writable(), True) + if sys.platform != "darwin" and \ + 'bsd' not in sys.platform and \ + not sys.platform.startswith('sunos'): + # Somehow /dev/tty appears seekable on some BSDs + self.assertEqual(f.seekable(), False) + self.assertEqual(f.isatty(), True) + f.close() + finally: + os.unlink(TESTFN) + + def testModeStrings(self): + # check invalid mode strings + for mode in ("", "aU", "wU+", "rw", "rt"): + try: + f = _FileIO(TESTFN, mode) + except ValueError: + pass + else: + f.close() + self.fail('%r is an invalid file mode' % mode) + + def testUnicodeOpen(self): + # verify repr works for unicode too + f = _FileIO(str(TESTFN), "w") + f.close() + os.unlink(TESTFN) + + def testBytesOpen(self): + # Opening a bytes filename + try: + fn = TESTFN.encode("ascii") + except UnicodeEncodeError: + # Skip test + return + f = _FileIO(fn, "w") + try: + f.write(b"abc") + f.close() + with open(TESTFN, "rb") as f: + self.assertEqual(f.read(), b"abc") + finally: + os.unlink(TESTFN) + + def testInvalidFd(self): + self.assertRaises(ValueError, _FileIO, -10) + self.assertRaises(OSError, _FileIO, make_bad_fd()) + if sys.platform == 'win32': + import msvcrt + self.assertRaises(IOError, msvcrt.get_osfhandle, make_bad_fd()) + # Issue 15989 + self.assertRaises(TypeError, _FileIO, _testcapi.INT_MAX + 1) + self.assertRaises(TypeError, _FileIO, _testcapi.INT_MIN - 1) + + def testBadModeArgument(self): + # verify that we get a sensible error message for bad mode argument + bad_mode = "qwerty" + try: + f = _FileIO(TESTFN, bad_mode) + except ValueError as msg: + if msg.args[0] != 0: + s = str(msg) + if TESTFN in s or bad_mode not in s: + self.fail("bad error message for invalid mode: %s" % s) + # if msg.args[0] == 0, we're probably on Windows where there may be + # no obvious way to discover why open() failed. + else: + f.close() + self.fail("no error for invalid mode: %s" % bad_mode) + + def testTruncate(self): + f = _FileIO(TESTFN, 'w') + f.write(bytes(bytearray(range(10)))) + self.assertEqual(f.tell(), 10) + f.truncate(5) + self.assertEqual(f.tell(), 10) + self.assertEqual(f.seek(0, os.SEEK_END), 5) + f.truncate(15) + self.assertEqual(f.tell(), 5) + self.assertEqual(f.seek(0, os.SEEK_END), 15) + f.close() + + def testTruncateOnWindows(self): + def bug801631(): + # SF bug <http://www.python.org/sf/801631> + # "file.truncate fault on windows" + f = _FileIO(TESTFN, 'w') + f.write(bytes(range(11))) + f.close() + + f = _FileIO(TESTFN,'r+') + data = f.read(5) + if data != bytes(range(5)): + self.fail("Read on file opened for update failed %r" % data) + if f.tell() != 5: + self.fail("File pos after read wrong %d" % f.tell()) + + f.truncate() + if f.tell() != 5: + self.fail("File pos after ftruncate wrong %d" % f.tell()) + + f.close() + size = os.path.getsize(TESTFN) + if size != 5: + self.fail("File size after ftruncate wrong %d" % size) + + try: + bug801631() + finally: + os.unlink(TESTFN) + + def testAppend(self): + try: + f = open(TESTFN, 'wb') + f.write(b'spam') + f.close() + f = open(TESTFN, 'ab') + f.write(b'eggs') + f.close() + f = open(TESTFN, 'rb') + d = f.read() + f.close() + self.assertEqual(d, b'spameggs') + finally: + try: + os.unlink(TESTFN) + except: + pass + + def testInvalidInit(self): + self.assertRaises(TypeError, _FileIO, "1", 0, 0) + + def testWarnings(self): + with check_warnings(quiet=True) as w: + self.assertEqual(w.warnings, []) + self.assertRaises(TypeError, _FileIO, []) + self.assertEqual(w.warnings, []) + self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt") + self.assertEqual(w.warnings, []) + + def test_surrogates(self): + # Issue #8438: try to open a filename containing surrogates. + # It should either fail because the file doesn't exist or the filename + # can't be represented using the filesystem encoding, but not because + # of a LookupError for the error handler "surrogateescape". + filename = u'\udc80.txt' + try: + with _FileIO(filename): + pass + except (UnicodeEncodeError, IOError): + pass + # Spawn a separate Python process with a different "file system + # default encoding", to exercise this further. + env = dict(os.environ) + env[b'LC_CTYPE'] = b'C' + _, out = run_python('-c', 'import _io; _io.FileIO(%r)' % filename, env=env) + if ('UnicodeEncodeError' not in out and not + ( ('IOError: [Errno 2] No such file or directory' in out) or + ('IOError: [Errno 22] Invalid argument' in out) ) ): + self.fail('Bad output: %r' % out) + + def testUnclosedFDOnException(self): + class MyException(Exception): pass + class MyFileIO(_FileIO): + def __setattr__(self, name, value): + if name == "name": + raise MyException("blocked setting name") + return super(MyFileIO, self).__setattr__(name, value) + fd = os.open(__file__, os.O_RDONLY) + self.assertRaises(MyException, MyFileIO, fd) + os.close(fd) # should not raise OSError(EBADF) + +def test_main(): + # Historically, these tests have been sloppy about removing TESTFN. + # So get rid of it no matter what. + try: + run_unittest(AutoFileTests, OtherFileTests) + finally: + if os.path.exists(TESTFN): + os.unlink(TESTFN) + +if __name__ == '__main__': + test_main() |