#!/usr/bin/env python """ The multi_file_object module includes a class you can use to create a custom file object that opens multiple split files as a single concatenated file. See http://schof.org/2010/multi_file_object/ for more information. Copyright (c) 2010 Dakim, Inc. and John Mark Schofield Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ import unittest __version__ = "1.0.0" DEFAULT_BUFFER = 10240 import inspect def ptv(name): """ Print value """ record = inspect.getouterframes(inspect.currentframe())[1] frame = record[0] val = eval(name, frame.f_globals, frame.f_locals) print('{0}: {1}'.format(name, val)) class MultiFileObject: """ Creates a file-like object that tarfile can use as a file object. Concatenates multiple split files into one file, without actually doing that. Reads from each file, until it reaches the end of the last file. Init method accepts a list of files to concatenate. Supports reading only; does not support writing to files. """ def __init__(self, filepaths): self.filepaths = filepaths self.maxfileindex = len(self.filepaths) - 1 self.fileobjects = list() for filepath in self.filepaths: self.fileobjects.append( open(filepath, 'rb')) self.fileindex = 0 self.closed = False self.read_buffer = '' def __iter__(self): self.fileobjects = list() self.closed = False self.fileindex = 0 self.fileobjects = list() for filepath in self.filepaths: self.fileobjects.append( open(filepath, 'rb')) return self def __enter__(self, *args): return self.__iter__() def __exit__(self, *args): self.close() def close(self): """ Close all open files and reset fileindex. """ self.__iter__() for fileobj in self.fileobjects: fileobj.close() self.closed = True def flush(self): """ Flush is not implemented because we don't support writing. Per Python docs, it should be a no-op. """ pass # NOTE: Per Python docs, we do not implement fileno() or isatty(). def next(self): """ This returns the next input line from the source files. Raises StopIteration when there are no more lines to return. The end of a file (even when there are more files to return) is treated as the end of a line, even if it doesn't end in a line seperator. """ if self.closed: raise ValueError, "I/O operation on a closed file." while self.fileindex <= self.maxfileindex: while True: try: stuff = self.fileobjects[self.fileindex].next() return stuff.rstrip('\n') except StopIteration: self.fileindex += 1 break raise StopIteration def bufferfull(self, maxbuf, buflen): """ Accepts the maximum buffer size and the current buffer length. If the maximum buffer is None or < 1, return False immediately. Otherwise, if buflen is < maxbuf, return False, otherwise, return True. """ if maxbuf < 0: return False elif buflen < maxbuf: return False else: return True def read(self, maxbuf=-1): """ This read function mimics the read function available in a true file object. We return at most maxbuf bytes. If maxbuf is negative, that's treated as infinite. (Out of memory errors are the caller's problem.) If we have less than maxbuf bytes in the buffer, and we reach the end of a file, we read the remaining number of bytes from the next file. We return bytes as long as there are bytes to return; then we return an empty string. """ if self.closed: raise ValueError, 'I/O operation on a closed file' while not self.bufferfull(maxbuf, len(self.read_buffer)): if self.fileindex > self.maxfileindex: break else: next_read = self.fileobjects[self.fileindex].read(maxbuf) if next_read: self.read_buffer = ''.join([self.read_buffer, next_read]) else: self.fileindex += 1 if (maxbuf < 0) or (len(self.read_buffer) <= maxbuf): returnvalue = self.read_buffer self.read_buffer = '' return returnvalue else: returnvalue = self.read_buffer[:maxbuf] self.read_buffer = self.read_buffer[maxbuf:] return returnvalue def readline(self, maxbuf=-1): """ This readline function mimics the readline function available in a true file object. We return a single line, with a length of at most maxbuf bytes. If maxbuf is negative, that's treated as infinite. We return bytes as long as there are bytes to return; then we return an empty string. """ if self.closed: raise ValueError, 'I/O operation on a closed file' while True: if self.fileindex > self.maxfileindex: break else: next_read = self.fileobjects[self.fileindex].readline(maxbuf) if next_read: return next_read else: self.fileindex += 1 return '' def readlines(self): """ Returns a list of the lines in all the files """ contents_list = list() line = self.readline() while line != '': contents_list.append(line) line = self.readline() return contents_list # NOTE: Python docs say that not all file objects are seekable, and # implementing seek() would be a pain in the ass. Skipping it for now. # Also skipping tell() for pain-in-the-ass reasons. I'll come back and # reimplement these later when I feel like messing with the math. # Skipping truncate, write, writelines because we don't support writing. ### TESTING CODE class TestMultiFileObject(unittest.TestCase): """ We using MultiFileObject to open multiple text files and confirm that it concatentates the contents of the various files correctly. """ num_of_split_files = 10 num_lines_to_write = 200 def __init__(self, *args): """ Initialize the variables we'll need """ unittest.TestCase.__init__(self, *args) self.file_list = list() self.all_contents = "" def setUp(self): """ Set up multiple temp files to use for this test. """ import tempfile import os for filecount in range(0, TestMultiFileObject.num_of_split_files): file_handle, file_name = tempfile.mkstemp(text=True) self.file_list.append(file_name) file_obj = os.fdopen(file_handle, 'w') for linecount in range(0, TestMultiFileObject.num_lines_to_write): text_line = 'This is line %s of file %s\n' % ( linecount + 1, filecount + 1) file_obj.write(text_line) self.all_contents = self.all_contents + text_line file_obj.close() def test_single_file(self): """ MultiFileObject called with just one file MultiFileObject is designed to work with multiple files, but it should also work just fine with a single file.""" test_obj = MultiFileObject([self.file_list[0]]) stuff = test_obj.read() self.assertEqual(stuff, open(self.file_list[0], 'r').read()) def test_close_read(self): """ read() from closed file should fail Test the read() method to confirm that it raises an error when you attempt to read from a closed file. """ test_obj = MultiFileObject(self.file_list) test_obj.readline() # Read one line and throw away the value. test_obj.close() self.assertRaises(ValueError, test_obj.read) def test_close_next(self): """ next() from closed file should fail Test the next() method to confirm that it raises an error when you attempt to read from a closed file. """ test_obj = MultiFileObject(self.file_list) test_obj.readline() # Read one line and throw away the value. test_obj.close() self.assertRaises(ValueError, test_obj.next) def test_close_twice(self): """ Closing a closed file should work Test the close() method to confirm that you can close an object multiple times without raising an error. """ test_obj = MultiFileObject(self.file_list) test_obj.close() test_obj.close() def test_with(self): """"with" should work with our file object Verify that we can use "with", as documented in: http://docs.python.org/library/stdtypes.html#file.close """ import sys version_info = sys.version_info if (version_info[0] >= 2) and (version_info[1] >= 6): test_obj = MultiFileObject(self.file_list) test_contents = "" with test_obj: for line in test_obj: line += '\n' test_contents = "".join([test_contents, line]) self.assertEqual(test_contents, self.all_contents) self.assertRaises(ValueError, test_obj.read) else: # Requires Python 2.6 or later pass def test_flush(self): """ flush() should be a no-op """ test_obj = MultiFileObject(self.file_list) test_obj.flush() test_obj.close() def test_next(self): """ next() should return one line at a time Tests that the next() function returns one line at a time and raises StopIteration when done. We're not implementing seek(), so we're not testing interactions of seek() with next(). """ test_obj = MultiFileObject(self.file_list) for sourceline in self.all_contents.splitlines(): self.assertEqual(test_obj.next(), sourceline) self.assertRaises(StopIteration, test_obj.next) def test_read_maxbuf_small(self): """ read() should work with small maxbuf Tests that the read() function reads the right number of bytes from the file object. Test requires that the file contents are larger than the read buffer. We should get maxbuf bytes from each call to read(), except possibly the last call, which may return a smaller number of bytes. If we get a smaller number of bytes than maxbuf, the next call to read() should return None. """ read_buffer_size = 100 self.assertTrue(read_buffer_size < len(self.all_contents)) test_obj = MultiFileObject(self.file_list) test_contents = '' while True: stuff = test_obj.read(read_buffer_size) if stuff == "": break elif len(stuff) < read_buffer_size: test_contents = ''.join([test_contents, stuff]) self.assertEqual(test_obj.read(), '') break elif len(stuff) == read_buffer_size: test_contents = ''.join([test_contents, stuff]) else: self.fail('We read more bytes than we\'re supposed to.') self.assertEqual(test_contents, self.all_contents) def test_read_maxbuf_large(self): """ read() should work with large maxbuf Tests that the read function operates properly when the buffer size is larger than the content (ignoring possible out-of-memory issues, which are your damn fault if you specify a huge buffer). """ read_buffer_size = 1024 ** 3 self.assertTrue(read_buffer_size > len(self.all_contents)) test_obj = MultiFileObject(self.file_list) stuff = test_obj.read(read_buffer_size) self.assertEqual(stuff, self.all_contents) self.assertEqual(test_obj.read(), '') def test_read_maxbuf_negative(self): """ read() should work with maxbuf negative Tests that the read function operates properly when the buffer size is negative, which means unlimited. Again, ignoring out-of-memory issues. """ read_buffer_size = -10 test_obj = MultiFileObject(self.file_list) stuff = test_obj.read(read_buffer_size) self.assertEqual(stuff, self.all_contents) self.assertEqual(test_obj.read(), '') def test_read_maxbuf_empty(self): """ read() should work with maxbuf = '' Tests that the read function operates properly when the buffer size is not given, which means unlimited. Again, ignoring out-of-memory issues. """ test_obj = MultiFileObject(self.file_list) stuff = test_obj.read() self.assertEqual(stuff, self.all_contents) self.assertEqual(test_obj.read(), '') def test_read_maxbuf_zero(self): """ read() with zero maxbuf should do nothing This is basically a complicated way of doing a no-op. """ read_buffer_size = 0 self.assertTrue(read_buffer_size < len(self.all_contents)) test_obj = MultiFileObject(self.file_list) self.assertEqual(len(test_obj.read(read_buffer_size)), read_buffer_size) self.assertEqual(test_obj.read(0), '') def test_readline_no_maxbuf(self): """ readline() should work with no maxbuf Validates the readline function when given no maxbuf (infinite buffer, to memory limits. """ test_obj = MultiFileObject(self.file_list) for sourceline in self.all_contents.splitlines(True): self.assertEqual(test_obj.readline(), sourceline) self.assertEqual(test_obj.readline(), '') def test_readline_maxbuf_large(self): """ readline() should work with large maxbuf """ read_buffer_size = 1024 ** 3 test_obj = MultiFileObject(self.file_list) for sourceline in self.all_contents.splitlines(True): self.assertEqual(test_obj.readline(read_buffer_size), sourceline) self.assertEqual(test_obj.readline(), '') def test_readline_maxbuf_small(self): """ readline() should work with really small maxbuf """ read_buffer_size = 10 test_obj = MultiFileObject(self.file_list) test_content = '' while True: stuff = test_obj.readline(read_buffer_size) if stuff == '': break else: self.assertTrue(len(stuff) <= read_buffer_size) test_content = ''.join([test_content, stuff]) self.assertEqual(test_content, self.all_contents) def test_readline_maxbuf_negative(self): """ readline() should work with negative maxbuf """ read_buffer_size = -1 test_obj = MultiFileObject(self.file_list) for sourceline in self.all_contents.splitlines(True): self.assertEqual(test_obj.readline(read_buffer_size), sourceline) self.assertEqual(test_obj.readline(), '') def test_readlines(self): """ readlines() should work """ test_obj = MultiFileObject(self.file_list) stuff_list = test_obj.readlines() stuff_string = ''.join(stuff_list) self.assertEqual(stuff_string, self.all_contents) def test_file_closed(self): """ MultiFileObject.closed boolean should work """ test_obj = MultiFileObject(self.file_list) self.assertFalse(test_obj.closed) test_obj.close() self.assertTrue(test_obj.closed) def tearDown(self): """ Clean up the temp files we created in setUp(). """ import os for file_name in self.file_list: os.remove(file_name) if __name__ == '__main__': SUITE = unittest.TestLoader().loadTestsFromTestCase(TestMultiFileObject) unittest.TextTestRunner(verbosity=2).run(SUITE)