Logo Search packages:      
Sourcecode: python-biopython version File versions  Download package

RecordReader.py

# Copyright 2000-2001, Dalke Scientific Software, LLC
# Distributed under the Biopython License Agreement (see the LICENSE file).

# The existing parsers are in-memory.  For large data files, like
# swissprot, that requires too much memory.

# On the other hand, the records aren't all that large (there's just a
# lot of them.)  This module has readers which scan the input to get a
# record as a string.

import string
from mx import TextTools as TT

SIZEHINT = 100000

class ReaderError(TypeError):
    pass

class RecordReader:
    def __init__(self, infile):
        self.infile = infile
    def next(self):
        raise NotImplementedError
    def remainder(self):
        raise NotImplementedError
    
def _startswith_tagtable_rest_of_line(text):
    return (
        # Ensure the text starts with the given word
        ("begin", TT.Word, text, TT.MatchFail, +1),

        # Read to the end of line
        (None, TT.AllInSet, TT.invset('\r\n'), +1, +1),

        # Read the end of line
        (None, TT.Is, '\n', +1, +4),  # matches '\n' or
        (None, TT.Is, '\r', +2, +1),  # '\r' followed by
        (None, TT.Is, '\n', +2, +2),  # optional '\n'

        # Check if EOF (allow EOF if no EOL found)
        (None, TT.EOF, TT.Here, +1, TT.MatchOk),

        # Not EOF, so look for the next line starting with text
        ("begin", TT.Word, text, +1, -5),

        # Not what I am looking for, so read to the end of line
        (None, TT.AllInSet, TT.invset('\r\n'), +1, +1),

        # Read the end of line then test the next line
        (None, TT.Is, '\n', +1, -2),  # '\n'
        (None, TT.Is, '\r', +2, +1),  # '\r' followed by
        (None, TT.Is, '\n', -4, -4),  # optional '\n'
        # Allow termination at EOF
        (None, TT.EOF, TT.Here, TT.MatchFail, TT.MatchOk),    
        )

def _startswith_tagtable_newline(text):
    return (
        # Ensure the text starts with the given word ...
        ("begin", TT.Word, text, TT.MatchFail, +1),

        # ... followed by the end of line
        (None, TT.Is, '\n', +1, +4),  # matches '\n' or
        (None, TT.Is, '\r', +2, +1),  # '\r' followed by
        (None, TT.Is, '\n', +2, +2),  # optional '\n'

        # Check if EOF instead of a newline (allow EOF if found)
        # Otherwise, this means the line starts with the text but
        # doesn't have a successive newline.
        # XXX BUG! When looking for "A\n" should not fail on "AA\n"!
        (None, TT.EOF, TT.Here, TT.MatchFail, TT.MatchOk),

        # Look for the next line starting with text
        ("begin", TT.Word, text, +1, -4),

        # Not what I am looking for, so read to the end of line
        (None, TT.AllInSet, TT.invset('\r\n'), +1, +1),

        # Read the end of line then test the next line
        (None, TT.Is, '\n', +1, -2),  # '\n'
        (None, TT.Is, '\r', +2, +1),  # '\r' followed by
        (None, TT.Is, '\n', -4, -4),  # optional '\n'
        # Allow termination at EOF
        (None, TT.EOF, TT.Here, TT.MatchFail, TT.MatchOk),    
        )
        

def _find_begin_positions(text, tagtable):
    success, tags, pos = TT.tag(text, tagtable)
    # print "XXX", success, tags, pos, len(text)
    if not success:
        raise ReaderError("invalid format starting with %s" % repr(text[:50]))
    if pos != len(text):
        raise ReaderError, \
            "could not parse to end of text (ended at %d of %d)" % \
            (pos, len(text))
    return [tag[1] for tag in tags]
                        

class StartsWith(RecordReader):
    def __init__(self, infile, text, sizehint = SIZEHINT, lookahead = ""):
        RecordReader.__init__(self, infile)
        self.text = text
        self.sizehint = sizehint

        pos = string.find(text, "\n")
        if pos != -1:
            if pos != len(text)-1:
                raise AssertionError, "'\\n' can only exist at the end of the string"
            text = text[:-1]
            has_newline = 1
        else:
            has_newline = 0
        assert len(text), "StartsWith text size is too short"
        assert len(text) < sizehint - 2, \
               "StartsWith text size larger than sizehint allows"


        if has_newline:
            raise NotImplementedError, "there's a bug in the '\\n' option"
            self.tagtable = _startswith_tagtable_newline(text)
        else:
            self.tagtable = _startswith_tagtable_rest_of_line(text)

        self.lookahead = lookahead

        # Start parsing here.  This guarantees the first line is in
        # the right format.
        if len(self.lookahead) < len(text) + 2:
            self.lookahead += infile.read(sizehint)
        if self.lookahead:
                self.positions = _find_begin_positions(self.lookahead,
                                                       self.tagtable)
        else:
            self.positions = [0]
        self.index = 0
        
    def next(self):
        # Are any precomputed positions remaining?
        if self.index + 1 < len(self.positions):
            # Yes, so return the text in the range
            s = self.lookahead[self.positions[self.index]:
                               self.positions[self.index+1]]
            self.index += 1
            return s

        # The last record may be incomplete, so reset the
        # lookahead to be just that text
        self.lookahead = self.lookahead[self.positions[-1]:]

        # Read past at least the start of the second record or to the
        # end of file.
        positions = [self.positions[-1]]
        while 1:
            data = self.infile.read(self.sizehint)
            if not data:
                break
            self.lookahead = self.lookahead + data
            positions = _find_begin_positions(self.lookahead, self.tagtable)
            if len(positions) > 1:
                break
        if len(positions) > 1:
            self.positions = positions
            self.index = 1
            return self.lookahead[positions[0]:positions[1]]
        elif not self.lookahead:
            # No data (either empty file or at EOF)
            self.positions = [0]
            self.index = 0
            return None
        else:
            # Read to the end of file and it's the last record
            assert len(positions) == 1
            self.positions = [0]
            self.index = 0
            s = self.lookahead
            self.lookahead = ""
            return s

    def remainder(self):
        return self.infile, self.lookahead[self.positions[self.index]:]

def _endswith_tagtable_newline(text):
    return (
        # Is the current line the end of record marker?
        (None, TT.Word, text, +6, +1),
 
        # Make sure it ends the line
        ("end", TT.Is, '\n', +1, -1),  # matches '\n'
        (None, TT.Is, '\r', +4, +1),
        ("end", TT.Is, '\n', +1, -3),
        (None, TT.Skip, -1, +1, +1),
        ("end", TT.Skip, +1, -5, -5),
 
        # Not the end of record marker, so read to the end of line
        (None, TT.AllInSet, TT.invset('\r\n'), +1, +1),
 
        # Check if EOF
        (None, TT.EOF, TT.Here, +1, TT.MatchOk),
 
        # Not EOF, so scarf any newlines
        (None, TT.AllInSet, TT.set('\r\n'), TT.MatchFail, -8),
        )

def _endswith_tagtable_rest_of_line(text):
    return (
        # Is the current line the end of record marker?
        (None, TT.Word, text, +8, +1),

        # Read whatever else is on that line (could be nothing)
        (None, TT.AllInSet, TT.invset('\r\n'), +1, +1),
 
        # Get the end of line
        ("end", TT.Is, '\n', +1, -2),  # matches '\n'
        (None, TT.Is, '\r', +4, +1),
        ("end", TT.Is, '\n', +1, -4),
        (None, TT.Skip, -1, +1, +1),
        ("end", TT.Skip, +1, -6, -6),
 
        # Check if EOF (only tests when the end of record line has no \n)
        # Only time this should fail is with a bug in TT.
        ("end", TT.EOF, TT.Here, TT.MatchFail, TT.MatchOk),
        
        # Not the end of record marker, so read to the end of line
        (None, TT.AllInSet, TT.invset('\r\n'), +1, +1),
 
        # Check if EOF
        (None, TT.EOF, TT.Here, +1, TT.MatchOk),
 
        # Not EOF, so scarf any newlines and try again
        (None, TT.AllInSet, TT.set('\r\n'), TT.MatchFail, -10),
        )
        

def _find_end_positions(text, tagtable):
    success, tags, pos = TT.tag(text, tagtable)
    #print "XXX", success, tags, pos, len(text), repr(text)
    if not success:
        raise ReaderError("invalid format starting with %s" % repr(text[:50]))
    if pos != len(text):
        raise ReaderError, \
            "could not parse to end of text (ended at %d of %d)" % \
            (pos, len(text))
    return [tag[2] for tag in tags]

class EndsWith(RecordReader):
    def __init__(self, infile, text, sizehint = SIZEHINT, lookahead = ""):
        RecordReader.__init__(self, infile)
        self.text = text
        self.sizehint = sizehint
        
        pos = string.find(text, "\n")
        if pos != -1:
            if pos != len(text)-1:
                raise AssertionError, "'\\n' can only exist at the end of the string"
            text = text[:-1]
            has_newline = 1
        else:
            has_newline = 0
        assert len(text) < sizehint - 2, \
               "EndsWith text size larger than sizehint allows"

        if has_newline:
            self.tagtable = _endswith_tagtable_newline(text)
        else:
            self.tagtable = _endswith_tagtable_rest_of_line(text)


        self.lookahead = lookahead
        self.positions = []
        self.index = 0
        self.pos = 0

    def next(self):
        # Are any precomputed positions remaining?
        if self.index < len(self.positions):
            # Yes, so return that text
            newpos = self.positions[self.index]
            s = self.lookahead[self.pos:newpos]
            self.pos = newpos
            self.index = self.index + 1
            return s

        # No available information, so use what remains to seed the
        # next level.
        lookahead = self.lookahead[self.pos:]

        data = ""
        positions = []
        # Add text until I've found a record or there is no more data
        while 1:
            data = self.infile.read(self.sizehint)
            if not data:
                if not positions:
                    positions = _find_end_positions(lookahead, self.tagtable)
                break
            lookahead = lookahead + data
            positions = _find_end_positions(lookahead, self.tagtable)
            if len(positions) > 1:
                del positions[-1]
                break

        self.lookahead = lookahead
        self.positions = positions
        
        if positions:
            self.index = 1
            self.pos = positions[0]
            return lookahead[:positions[0]]
        elif not lookahead:
            # No data (either empty file or at EOF)
            self.pos = 0
            self.index = 0
            return None

        # This is likely an unterminated record.  However, it could be
        # that there is no terminal end-of-line character so check for
        # that case.
        if lookahead[-1:] not in "\r\n":
            special_case = lookahead + "\n"
            positions = _find_end_positions(special_case, self.tagtable)
            if positions:
                assert len(positions) == 1, "this case should not occur"
                pos = positions[0]
                assert pos == len(special_case), "wrong sizes: %d and %d" % \
                       (pos, len(special_case))
                self.lookahead = ""
                self.positions = []
                self.pos = 0
                self.index = 0
                return lookahead

        # Really could not find a terminator
        self.index = 0
        self.pos = 0
        raise ReaderError("Last record not terminated: at %s ..." %
                          repr(self.lookahead[:50]))

    def remainder(self):
        return self.infile, self.lookahead[self.pos:]



class Until(RecordReader):
    def __init__(self, infile, text, sizehint = SIZEHINT, lookahead = ""):
        RecordReader.__init__(self, infile)
        self.text = text
        self.lookahead = lookahead
        self.sizehint = sizehint
        self.found = 0

        if text[-1] == "\n":
            raise NotImplementedError, "Until reader does not support '\\n'"
        if "\n" in text:
            raise AssertionError, "'\\n' can only exist at the end of the string"

    def next(self):
        if self.found:
            return None

        # Use the StartsWith reader to get to the end of this record.
        # Need to fake the first line..
        fake = self.text + "\n"
        reader = StartsWith(self.infile, self.text, self.sizehint,
                            fake + self.lookahead)
        rec = reader.next()
        rec = rec[len(fake):]  # remove the fake data
        self.infile, self.lookahead = reader.remainder()
        self.found = 1
        return rec

    def remainder(self):
        return self.infile, self.lookahead

# Tag the last byte of every newline
_tag_lines_tagtable = (
    # Skip non-newline characters
    (None, TT.AllInSet, TT.invset('\r\n'), +1, +1),

    # Check if newline
    ("newline", TT.Is, '\n', +1, -1),  # can be '\n'
    (None, TT.Is, '\r', +3, +1),       # or start a '\r' followed by ..
    ("newline", TT.Is, '\n', +1, -3),  #  .. an optional '\n'
    ("newline", TT.Skip, 0, -4, -4),   # get here with just an '\r'
    (None, TT.EOF, TT.Here, -5, TT.MatchOk),  # stop at end of text
    )


00389 class CountLines(RecordReader):
    """Read a specified (fixed) number of lines"""
    def __init__(self, infile, count, sizehint = SIZEHINT, lookahead = ""):
        assert count > 0, "CountLines reader must read at least one line"
        assert lookahead > 0, "Must read at least a character at a time"
        assert sizehint > 0, "sizehint must be positive"
        RecordReader.__init__(self, infile)
        self.sizehint = sizehint
        self.lookahead = lookahead
        self.count = count
        self.pos = 0
        self.positions = []
        self.index = 0

    def next(self):
        if self.index + self.count < len(self.positions):
            self.index = self.index + self.count
            endpos = self.positions[self.index-1]
            s = self.lookahead[self.pos:endpos]
            self.pos = endpos
            return s
            
        lookahead = self.lookahead[self.pos:]
        while 1:
            positions = _find_end_positions(lookahead, _tag_lines_tagtable)
            if len(positions) > self.count:
                # Last line may be incomplete, as with "\r" of "\r\n"
                # Hmm, is this *really* needed?  Doesn't hurt. XXX
                del positions[-1]
                break
            data = self.infile.read(self.sizehint)
            if not data:
                break
            lookahead = lookahead + data

        self.lookahead = lookahead
        self.pos = 0
        self.positions = positions

        if not lookahead:
            return None

        if len(positions) >= self.count:
            self.index = self.count
            endpos = self.positions[self.count-1]
            s = lookahead[0:endpos]
            self.pos = endpos
            return s

        # Commented out to require final newline
        # Don't allow that case since it is more frequent that the line
        # count is wrong.  (Could change in future releases.)
##        elif len(positions) == self.count - 1 and not data:
##            # This was the last record, and it has no trailing newline
##            s = self.lookahead
##            self.lookahead = ""
##            self.positions = []
##            self.index = 0
##            return s
        raise ReaderError, \
              "Only found %d lines, expecting %d (starting with %s ...)" % \
              (len(positions), self.count, repr(lookahead[:20]))

    def remainder(self):
        return self.infile, self.lookahead[self.pos:]

00455 class Nothing(RecordReader):
    """Reads nothing"""
    def __init__(self, infile, sizehint = SIZEHINT, lookahead = ""):
        RecordReader.__init__(self, infile)
        self.lookahead = lookahead

    def next(self):
        return None
    
    def remainder(self):
        return self.infile, self.lookahead

00467 class Everything(RecordReader):
    """Reads everything"""
    def __init__(self, infile, sizehint = SIZEHINT, lookahead = ""):
        RecordReader.__init__(self, infile)
        self.lookahead = lookahead
        self.found = 0

    def next(self):
        if self.found:
            return None
        s = self.lookahead + self.infile.read()
        self.lookahead = ""
        self.found = 1
        return s

    def remainder(self):
        return self.infile, self.lookahead

Generated by  Doxygen 1.6.0   Back to index