Logo Search packages:      
Sourcecode: python-biopython version File versions

test_RecordReader2.py

# Still more tests of the RecordReader.  This one stresses the ability
# to find newlines and pass in lookahead text

from cStringIO import StringIO
import string
from Martel import RecordReader
from mx import TextTools as TT

def count_records(reader):
    i = 0
    #print "Testing new reader", reader
    while 1:
        x = reader.next()
        #print "Read", repr(x)
        if x is None:
            break
        i = i + 1
    return i

# Notice how this omits the final newline?
def normalize(s):
    return string.join(TT.splitlines(s), "\n")
    


data1 = """\
ID   Q1234
DE   Some protein
SQ   blah
     ABCDE FGHIJ
//
ID   Q2345
DE   ID
SQ   lahb
 ID  just checking
     BCDEF GHIJA
//
ID   Q3456
DE   me proteinSo
SQ   ahbl
     CDEFG HIJID
//
"""
data1 = normalize(data1)

### StartsWith

def test_startswith_generic():
    to_eol_data = (("A\n", 1),
                   ("AA\n", 1),
                   ("A\nB\n", 1),
                   ("A\nB\nB\n", 1),
                   ("A\nB\nB\nA\n", 2),
                   ("A\nA\nB\nA\n", 3),
                   ("A\n A\nA\n", 2),
                   ("A A A A A A A A A A A A A A\nB\nA\n", 2),
                   )
    for s, expected in to_eol_data:
        reader = RecordReader.StartsWith(StringIO(s), "A")
        count = count_records(reader)
        assert count == expected, (s, expected, count) # skips to EOL
                   

def test_startswith_SP():
    # Check using a SWISS-PROT-like format
    for ending in ("\n", "\r", "\r\n"):
        s = string.replace(data1, "\n", ending)
        for final in ("", ending):
            d = s + final
            
            for i in range(5, 20):
                infile = StringIO(d)
                reader = RecordReader.StartsWith(infile, "ID", i)
                count = count_records(reader)
                assert count == 3, (ending, final, i, count, d)

            for i in range(6, 20):
                infile = StringIO(d)
                reader = RecordReader.StartsWith(infile, "ID ", i)
                count = count_records(reader)
                assert count == 3, (ending, final, i, count, d)

def test_startswith_exhaustive(ending):
    # Exhaustive test of the various combinations.  Should catch most
    # edge conditions.
    for base in ("A" + ending, "A" + ending + "BA" + ending):
        for repeat in range(0, 15):
            s = base * repeat
            infile = StringIO(s)
            #for marker in ("A", "A\n"):  # Don't use; bug when using "\n"
            for marker in ("A",):
                for look in range(5):
                    lookahead = base * look
                    for readhint in range(4, 10):
                            infile.seek(0)
                            reader = RecordReader.StartsWith(\
                                infile, marker, readhint, lookahead)
                            count = count_records(reader)
                            assert count == repeat + look, \
                                   (count, ending, base, repeat, marker,
                                    look, readhint)
                    infile.seek(0)
                    reader = RecordReader.StartsWith(infile, marker,
                                                     lookahead = lookahead)
                    count = count_records(reader)
                    assert count == repeat + look, \
                           (count, ending, repeat, marker, look)

def test_startswith_remainder():
    # Make sure the remainder method works
    for repeat in range(20):
        vals = map(lambda x: "A\n%d\n" % x, range(repeat))
        data = string.join(vals, "")
        infile = StringIO(data)
        for look in range(10) + range(10, len(data), 5):
            infile.seek(look)
            lookahead = data[:look]
            reader = RecordReader.StartsWith(infile, "A",
                                             lookahead = lookahead)
            all = ""
            while 1:
                file, lh = reader.remainder()
                pos = file.tell()
                rest = file.read()
                assert all + lh + rest == data, (all, lh, rest, data)
                file.seek(pos)
                assert data.startswith(all), (data, all)
                record = reader.next()
                if record is None:
                    break
                all = all + record
            assert all == data, (all, data)
            
                
def test_startswith_errors():
    # Check the failure cases.  Actually, there's only one.

    # Doesn't start with A
    for s in ("B", "B\n", " A\n", " A", "B\nA\n"):
        try:
            infile = StringIO(s)
            # The current implementation will fail here, but the
            # interface spec allows the error to be unreported
            # until reading the record.
            reader = RecordReader.StartsWith(infile, "A")
            rec = reader.next()
            raise AssertionError, "should not allow %r" % s
        except RecordReader.ReaderError:
            pass
        else:
            raise AssertionError, "should not get here"

def test_startswith():
    print "Testing StartsWith"

    print " ... generic"
    test_startswith_generic()

    print " ... newline variations"
    test_startswith_SP()

    for ending in ("\n", "\r", "\r\n"):
        print " ... exhaustive testing against %r" % ending
        test_startswith_exhaustive(ending)

    print " ... remainder"
    test_startswith_remainder()

    print " ... format errors"
    test_startswith_errors()

### EndsWith

def test_endswith_generic():
    to_eol_data = (("A\n", 1),
                   ("AA\n", 1),
                   ("B\nA\n", 1),
                   ("B\nB\nA\n", 1),
                   ("A\nA\n", 2),
                   ("A A\nA\n", 2),  # this changes with an "A\n" reader
                   ("A", 1),
                   ("A\nA A\nA\n", 3), # this changes with an "A\n" reader
                   ("A\nA A\nA", 3), # this changes with an "A\n" reader
                   )
    for s, expected in to_eol_data:
        reader = RecordReader.EndsWith(StringIO(s), "A")
        count = count_records(reader)
        assert count == expected, (s, expected, count)  # skips to EOL


    newline_data = (("A\n", 1),
                    #("AA\n", 1),  # not legal
                    ("B\nA\n", 1),
                    ("B\nB\nA\n", 1),
                    ("A\nA\n", 2),
                    ("A A\nA\n", 1),  # this changed with an "A\n" reader
                    ("A", 1),
                    ("A\nA A\nA\n", 2), # this changed with an "A\n" reader
                    ("A\nA A\nA", 2), # this changed with an "A\n" reader
                    )
    for s, expected in newline_data:
        reader = RecordReader.EndsWith(StringIO(s), "A\n")
        count = count_records(reader)
        assert count == expected, (s, expected, count)  # expects newline

        
def test_endswith_SP():
    # Check using a SWISS-PROT-like format
    for ending in ("\n", "\r", "\r\n"):
        s = string.replace(data1, "\n", ending)
        for final in ("", ending):
            d = s + final
            
            loop = 0
            for i in range(5, 20):
                infile = StringIO(d)
                reader = RecordReader.EndsWith(infile, "//", i)
                count = count_records(reader)
                assert count == 3, (ending, final, i, count, d)

            for i in range(5, 20):
                infile = StringIO(d)
                reader = RecordReader.EndsWith(infile, "//\n", i)
                count = count_records(reader)
                assert count == 3, (ending, final, i, count, d)


def test_endswith_exhaustive(ending):
    # Exhaustive test of the various combinations.  Should catch most
    # edge conditions.
    for base in ("A" + ending, "BA" + ending + "A" + ending):
        for repeat in range(0, 15):
            s = base * repeat
            infile = StringIO(s)
            for marker in ("A", "A\n"):
                for look in range(5):
                    lookahead = base * look
                    for readhint in range(4, 10):
                            infile.seek(0)
                            reader = RecordReader.EndsWith(\
                                infile, marker, readhint, lookahead)
                            count = count_records(reader)
                            assert count == repeat + look, \
                                   (count, ending, base, repeat, marker,
                                    look, readhint)
                    infile.seek(0)
                    reader = RecordReader.EndsWith(infile, marker,
                                                   lookahead = lookahead)
                    count = count_records(reader)
                    assert count == repeat + look, \
                           (count, ending, repeat, marker, look)

def test_endswith_remainder():
    # Make sure the remainder method works
    for repeat in range(20):
        vals = map(lambda x: "%d\nA\n" % x, range(repeat))
        data = string.join(vals, "")
        infile = StringIO(data)
        for look in range(10) + range(10, len(data), 5):
            infile.seek(look)
            lookahead = data[:look]
            reader = RecordReader.EndsWith(infile, "A",
                                           lookahead = lookahead)
            all = ""
            while 1:
                file, lh = reader.remainder()
                pos = file.tell()
                rest = file.read()
                assert all + lh + rest == data, (all, lh, rest, data)
                file.seek(pos)
                assert data.startswith(all), (data, all)
                record = reader.next()
                if record is None:
                    break
                all = all + record
            assert all == data, (all, data)

def test_endswith_errors():
    # Check the failure cases.

    # Could no record at all
    # Could be some records followed by an incomplete record
    # Could be a line which partially matches the data
    for s in ("B", "B\n", "A\nB\n", "A\nB\nA\nB\n", "A\nB\nA\n ", "AA",
              "AA\n", "A\nB\nA X\n"):
        has_error = 0
        infile = StringIO(s)
        try:
            reader = RecordReader.EndsWith(infile, "A\n")
        except RecordReader.ReaderError:
            has_error = 1

        if not has_error:
            while not has_error:
                try:
                    rec = reader.next()
                except RecordReader.ReaderError:
                    has_error = 1
                if not has_error and rec is None:
                    break
        if not has_error:
            raise AssertionError, "should not get here with %r" % s

    # Could no record at all
    # Could be some records followed by an incomplete record
    # *Allowed* to read rest of line
    for s in ("B", "B\n", "A\nB\n", "A\nB\nA\nB\n", "A\nB\nA\n "):
        has_error = 0
        infile = StringIO(s)
        try:
            reader = RecordReader.EndsWith(infile, "A")
        except RecordReader.ReaderError:
            has_error = 1

        if not has_error:
            while not has_error:
                try:
                    rec = reader.next()
                except RecordReader.ReaderError:
                    has_error = 1
                if not has_error and rec is None:
                    break
        if not has_error:
            raise AssertionError, "should not get here with %r" % s
    


def test_endswith():
    print "Testing EndsWith"

    print " ... generic"
    test_endswith_generic()

    print " ... newline variations"
    test_endswith_SP()

    for ending in ("\n", "\r", "\r\n"):
        print " ... exhaustive testing against %r" % ending
        test_endswith_exhaustive(ending)
        
    print " ... remainder"
    test_endswith_remainder()

    print " ... format errors"
    test_endswith_errors()

### Until

# Don't need to do that much testing since the code is built on
# top of the StartsWith reader, which has already been tested.
def test_until():
    # Can only read at most one record
    print "Testing Until"
    test_data = ("A",
                 "A\n",
                 "A\nB",
                 "A\nB\n",
                 "A\nBCDE\nQWE\nTRYU\nA\n",
                 "A\nA\nA\nA\nA\nA\nA\n",
                 "AB",
                 "AB\n",
                 "AB\nAC",
                 "AB\nAC\n",
                 )

    for ending in ("\n", "\r", "\r\n"):
        for pre in ("", "B\n", "BA\n", "CA\nBA\n"):
            pre_nl = string.replace(pre, "\n", ending)
            for look in (0, 1, 3):
                for text in test_data:
                    text_nl = string.replace(text, "\n", ending)
                    s = pre_nl + text_nl
                    reader = RecordReader.Until(StringIO(s[look:]),
                                                "A",
                                                lookahead = s[:look],
                                                sizehint = 4)
                    found_record = None
                    while 1:
                        rec = reader.next()
                        if rec is None:
                            break
                        assert not found_record, \
                               "Already found %r but also found %r in %r" % \
                               (found_record, rec, s)
                        found_record = rec

                    assert found_record == pre_nl, \
                           "Expecting record %r, found %r in %r" % \
                           (pre_nl, found_record, s)
                    infile, remainder = reader.remainder()
                    remainder = remainder + infile.read()
                    assert remainder == text_nl, \
                           "Expecting remainder %r, found %r in %r" % \
                           (text_nl, remainder, s)

### CountLines

def test_count_lines():
    # Create a set of 'i' lines and read 'count' lines at a time.
    # Either count divides i or it doesn't.  If it does, the reader
    # should go to completion.  If it does not, the reader should
    # have a remainder whose size can be verified.
    
    print "Testing CountLines"
    for ending in ("\n", "\r", "\r\n"):
        print " ... exhaustive testing against %r" % ending
        s = ""
        for i in range(25):
            for count in range(1,i+1):
                for look in (0, 2, 5):
                    rep, final = divmod(i, count)
                    reader = RecordReader.CountLines(StringIO(s[look:]),
                                                     count,
                                                     lookahead = s[:look],
                                                     sizehint = 1)
                    all = ""
                    while rep > 0:
                        rec = reader.next()
                        lines = string.split(rec, ending)
                        assert len(lines)-1 == count, \
                               "Expecting %d lines, got %d in %r using %r" % \
                               (count, len(lines)-1, rec, ending)
                        all = all + rec
                        rep = rep - 1

                    if final == 0:
                        # Reader should be at the end of input
                        rec = reader.next()
                        assert rec is None, \
                               "Should be at end of reader, got %r" % rec
                        rec = reader.next()
                        assert rec is None, \
                               "data after end of reader, got %r" % rec
                    else:
                        # There should be a remainder of size i % count lines
                        infile, remainder = reader.remainder()
                        text = remainder + infile.read()
                        all = all + text
                        lines = string.split(text, ending)
                        assert len(lines)-1 == final, \
                               "Expecting %d final lines, got %d" % \
                               (final, len(lines)-1)
                        try:
                            rec = reader.next()
                            raise AssertionError, \
                                  "Got unexpected final record, %r" % rec
                        except RecordReader.ReaderError:
                            pass
                    assert all == s, \
                           "record data %r doesn't rebuild input %r" % \
                           (all, s)
            s = s + str(i) + ending
    
### Nothing

def test_nothing():
    print "Testing Nothing"
    
    s = "This is a test.\nThis is only a test.\nHad this been an actual...\n"
    for ending in ("\n", "\r", "\r\n"):
        data = string.replace(s, "\n", ending)
        for look in (0, 1, 2, 5):
            reader = RecordReader.Nothing(StringIO(data[look:]),
                                          sizehint = 1,
                                          lookahead = data[:look])
            rec = reader.next()
            assert rec is None, "should be empty, not %r" % rec
            rec = reader.next()
            assert rec is None, "2nd time should also be empty, not %r" % rec
            
            infile, remainder = reader.remainder()
            remainder = remainder + infile.read()
            assert remainder == data, "Why %r when input was %r?" % \
                   (remainder, data)

### Everything

def test_everything():
    print "Testing Everything"

    s = "This is a test.\nThis is only a test.\nHad this been an actual...\n"
    for ending in ("\n", "\r", "\r\n"):
        data = string.replace(s, "\n", ending)
        for look in (0, 1, 2, 5):
            reader = RecordReader.Everything(StringIO(data[look:]),
                                             sizehint = 1,
                                             lookahead = data[:look])
            rec = reader.next()
            assert rec == data, "Record %r is not same as input %r" % \
                   (rec, data)
            infile, remainder = reader.remainder()
            remainder = remainder + infile.read()
            assert not remainder, "Why is there a remainder of %r?" % \
                   remainder

            rec = reader.next()
            assert rec is None, "Expecting None after final read, got %r" % \
                   rec
            rec = reader.next()
            assert rec is None, "Expecting None (again), got %r" % rec
            


### test driver
    
def test():
    test_startswith()
    test_endswith()
    test_until()
    test_count_lines()
    test_nothing()
    test_everything()
    
if __name__ == "__main__":
    test()

Generated by  Doxygen 1.6.0   Back to index