Logo Search packages:      
Sourcecode: python-biopython version File versions

ClustalIO.py

# Copyright 2006-2008 by Peter Cock.  All rights reserved.
#
# This code is part of the Biopython distribution and governed by its
# license.  Please see the LICENSE file that should have been included
# as part of this package.

from Bio.Align.Generic import Alignment
from Interfaces import AlignmentIterator, SequentialAlignmentWriter

00010 class ClustalWriter(SequentialAlignmentWriter) :
    """Clustalw alignment writer."""
00012     def write_alignment(self, alignment) :
        """Use this to write (another) single alignment to an open file."""

        if len(alignment.get_all_seqs()) == 0 :
            raise ValueError("Must have at least one sequence")

        #Old versions of the parser in Bio.Clustalw used a ._version property,
        try :
            version = self._version
        except AttributeError :
            version = '1.81'
        output = "CLUSTAL X (%s) multiple sequence alignment\n\n\n" % version
        
        cur_char = 0
        max_length = len(alignment._records[0].seq)

        if max_length <= 0 :
            raise ValueError("Non-empty sequences are required")

        # keep displaying sequences until we reach the end
        while cur_char != max_length:
            # calculate the number of sequences to show, which will
            # be less if we are at the end of the sequence
            if (cur_char + 50) > max_length:
                show_num = max_length - cur_char
            else:
                show_num = 50

            # go through all of the records and print out the sequences
            # when we output, we do a nice 80 column output, although this
            # may result in truncation of the ids.
            for record in alignment._records:
                #Make sure we don't get any spaces in the record
                #identifier when output in the file by replacing
                #them with underscores:
                line = record.id[0:30].replace(" ","_").ljust(36)
                line += record.seq.data[cur_char:(cur_char + show_num)]
                output += line + "\n"

            # now we need to print out the star info, if we've got it
            # This was stored by Bio.Clustalw using a ._star_info property.
            if hasattr(alignment, "_star_info") and alignment._star_info != '':
                output += (" " * 36) + \
                     alignment._star_info[cur_char:(cur_char + show_num)] + "\n"

            output += "\n"
            cur_char += show_num

        # Want a trailing blank new line in case the output is concatenated
        self.handle.write(output + "\n")

00063 class ClustalIterator(AlignmentIterator) :
    """Clustalw alignment iterator."""
    
    def next(self) :

        handle = self.handle

        try :
            #Header we saved from when we were parsing
            #the previous alignment.
            line = self._header
            del self._header
        except AttributeError:      
            line = handle.readline()
        if not line:
            return None
        if line[:7] <> 'CLUSTAL':
            raise ValueError("Did not find CLUSTAL header")


        # find the clustal version in the header line
        version = None
        for word in line.split() :
            if word[0]=='(' and word[-1]==')':
                word = word[1:-1]
            if word[0] in '0123456789':
                version = word
        

        #There should be two blank lines after the header line
        line = handle.readline()
        while line.strip() == "" :
            line = handle.readline()

        #If the alignment contains entries with the same sequence
        #identifier (not a good idea - but seems possible), then this
        #dictionary based parser will merge their sequences.  Fix this?
        ids = []
        seqs = []
        consensus = ""
        seq_cols = None #: Used to extract the consensus

        #Use the first block to get the sequence identifiers
        while line.strip() <> "" :
            if line[0] <> " " :
                #Sequences identifier...
                fields = line.rstrip().split()

                #We expect there to be two fields, there can be an optional
                #"sequence number" field containing the letter count.
                if len(fields) < 2 or len(fields) > 3:
                    raise ValueError("Could not parse line:\n%s" % line)

                ids.append(fields[0])
                seqs.append(fields[1])

                #Record the sequence position to get the consensus
                if seq_cols is None :
                    start = len(fields[0]) + line[len(fields[0]):].find(fields[1])
                    end = start + len(fields[1])
                    seq_cols = slice(start, end)
                    del start, end
                assert fields[1] == line[seq_cols]

                if len(fields) == 3 :
                    #This MAY be an old style file with a letter count...
                    try :
                        letters = int(fields[2])
                    except ValueError :
                        raise ValueError("Could not parse line, bad sequence number:\n%s" % line)
                    if len(fields[1].replace("-","")) <> letters :
                        raise ValueError("Could not parse line, invalid sequence number:\n%s" % line)
            else :
                #Sequence consensus line...
                assert seq_cols is not None
                consensus = line[seq_cols]
                assert not line[seq_cols.stop:].strip()
            line = handle.readline()
            if not line : break #end of file

        assert line.strip() == ""
        assert seq_cols is not None

        #Loop over any remaining blocks...
        done = False
        while not done :
            #There should be a blank line between each block.
            #Also want to ignore any consensus line from the
            #previous block.
            while (not line) or line.strip() == "" :
                line = handle.readline()
                if not line : break # end of file
            if not line : break # end of file

            if line[:7] == 'CLUSTAL':
                #Found concatenated alignment.
                done = True
                self._header = line
                break

            for i in range(len(ids)) :
                fields = line.rstrip().split()
                
                #We expect there to be two fields, there can be an optional
                #"sequence number" field containing the letter count.
                if len(fields) < 2 or len(fields) > 3:
                    raise ValueError("Could not parse line:\n%s" % line)

                if fields[0] <> ids[i] :
                    raise ValueError("Identifiers out of order? Got '%s' but expected '%s'" \
                                      % (fields[0], ids[i]))

                if fields[1] <> line[seq_cols] :
                    start = len(fields[0]) + line[len(fields[0]):].find(fields[1])
                    assert start == seq_cols.start, 'Old location %s -> %i:XX' % (seq_cols, start)
                    end = start + len(fields[1])
                    seq_cols = slice(start, end)
                    del start, end

                #Append the sequence
                seqs[i] += fields[1]

                if len(fields) == 3 :
                    #This MAY be an old style file with a letter count...
                    try :
                        letters = int(fields[2])
                    except ValueError :
                        raise ValueError("Could not parse line, bad sequence number:\n%s" % line)
                    if len(seqs[i].replace("-","")) <> letters :
                        raise ValueError("Could not parse line, invalid sequence number:\n%s" % line)

                #Read in the next line
                line = handle.readline()
            #There should now be a consensus line
            if consensus :
                assert seq_cols is not None
                consensus += line[seq_cols]
                assert not line[seq_cols.stop:].strip()
                #Read in the next line
                line = handle.readline()


        assert len(ids) == len(seqs)
        if len(seqs) == 0 or len(seqs[0]) == 0 :
            return None

        if self.records_per_alignment is not None \
        and self.records_per_alignment <> len(ids) :
            raise ValueError("Found %i records in this alignment, told to expect %i" \
                             % (len(ids), self.records_per_alignment))

        alignment = Alignment(self.alphabet)
        alignment_length = len(seqs[0])
        for i in range(len(ids)) :
            if len(seqs[i]) <> alignment_length:
                raise ValueError("Error parsing alignment - sequences of different length?")
            alignment.add_sequence(ids[i], seqs[i])
        #TODO - Handle alignment annotation better, for now
        #mimic the old parser in Bio.Clustalw
        if version :
            alignment._version = version
        if consensus :
            assert len(consensus) == alignment_length, \
                   "Alignment length is %i, consensus length is %i, '%s'" \
                   % (alignment_length, len(consensus), consensus)
            alignment._star_info = consensus
        return alignment
    
if __name__ == "__main__" :
    print "Running a quick self-test"

    #This is a truncated version of the example in Tests/cw02.aln
    #Notice the inclusion of sequence numbers (right hand side)
    aln_example1 = \
"""CLUSTAL W (1.81) multiple sequence alignment


gi|4959044|gb|AAD34209.1|AF069      MENSDSNDKGSDQSAAQRRSQMDRLDREEAFYQFVNNLSEEDYRLMRDNN 50
gi|671626|emb|CAA85685.1|           ---------MSPQTETKASVGFKAGVKEYKLTYYTPEYETKDTDILAAFR 41
                                              * *: ::    :.   :*  :  :. : . :*  ::   .

gi|4959044|gb|AAD34209.1|AF069      LLGTPGESTEEELLRRLQQIKEGPPPQSPDENRAGESSDDVTNSDSIIDW 100
gi|671626|emb|CAA85685.1|           VTPQPG-----------------VPPEEAGAAVAAESSTGT--------- 65
                                    :   **                  **:...   *.*** ..         

gi|4959044|gb|AAD34209.1|AF069      LNSVRQTGNTTRSRQRGNQSWRAVSRTNPNSGDFRFSLEINVNRNNGSQT 150
gi|671626|emb|CAA85685.1|           WTTVWTDGLTSLDRYKG-----RCYHIEPVPG------------------ 92
                                     .:*   * *: .* :*        : :* .*                  

gi|4959044|gb|AAD34209.1|AF069      SENESEPSTRRLSVENMESSSQRQMENSASESASARPSRAERNSTEAVTE 200
gi|671626|emb|CAA85685.1|           -EKDQCICYVAYPLDLFEEGSVTNMFTSIVGNVFGFKALRALRLEDLRIP 141
                                     *::.  .    .:: :*..*  :* .*   .. .  :    .  :    

gi|4959044|gb|AAD34209.1|AF069      VPTTRAQRRA 210
gi|671626|emb|CAA85685.1|           VAYVKTFQGP 151
                                    *. .:: : .
                                     
"""                 

    #This example is a truncated version of the dataset used here:
    #http://virgil.ruc.dk/kurser/Sekvens/Treedraw.htm
    #with the last record repeated twice (deliberate toture test)
    aln_example2 = \
"""CLUSTAL X (1.83) multiple sequence alignment


V_Harveyi_PATH                 --MKNWIKVAVAAIA--LSAA------------------TVQAATEVKVG
B_subtilis_YXEM                MKMKKWTVLVVAALLAVLSACG------------NGNSSSKEDDNVLHVG
B_subtilis_GlnH_homo_YCKK      MKKALLALFMVVSIAALAACGAGNDNQSKDNAKDGDLWASIKKKGVLTVG
YA80_HAEIN                     MKKLLFTTALLTGAIAFSTF-----------SHAGEIADRVEKTKTLLVG
FLIY_ECOLI                     MKLAHLGRQALMGVMAVALVAG---MSVKSFADEG-LLNKVKERGTLLVG
E_coli_GlnH                    --MKSVLKVSLAALTLAFAVS------------------SHAADKKLVVA
Deinococcus_radiodurans        -MKKSLLSLKLSGLLVPSVLALS--------LSACSSPSSTLNQGTLKIA
HISJ_E_COLI                    MKKLVLSLSLVLAFSSATAAF-------------------AAIPQNIRIG
HISJ_E_COLI                    MKKLVLSLSLVLAFSSATAAF-------------------AAIPQNIRIG
                                         : .                                 : :.

V_Harveyi_PATH                 MSGRYFPFTFVKQ--DKLQGFEVDMWDEIGKRNDYKIEYVTANFSGLFGL
B_subtilis_YXEM                ATGQSYPFAYKEN--GKLTGFDVEVMEAVAKKIDMKLDWKLLEFSGLMGE
B_subtilis_GlnH_homo_YCKK      TEGTYEPFTYHDKDTDKLTGYDVEVITEVAKRLGLKVDFKETQWGSMFAG
YA80_HAEIN                     TEGTYAPFTFHDK-SGKLTGFDVEVIRKVAEKLGLKVEFKETQWDAMYAG
FLIY_ECOLI                     LEGTYPPFSFQGD-DGKLTGFEVEFAQQLAKHLGVEASLKPTKWDGMLAS
E_coli_GlnH                    TDTAFVPFEFKQG--DKYVGFDVDLWAAIAKELKLDYELKPMDFSGIIPA
Deinococcus_radiodurans        MEGTYPPFTSKNE-QGELVGFDVDIAKAVAQKLNLKPEFVLTEWSGILAG
HISJ_E_COLI                    TDPTYAPFESKNS-QGELVGFDIDLAKELCKRINTQCTFVENPLDALIPS
HISJ_E_COLI                    TDPTYAPFESKNS-QGELVGFDIDLAKELCKRINTQCTFVENPLDALIPS
                                     **       .:  *::::.   : :.   .        ..:   

V_Harveyi_PATH                 LETGRIDTISNQITMTDARKAKYLFADPYVVDG-AQI
B_subtilis_YXEM                LQTGKLDTISNQVAVTDERKETYNFTKPYAYAG-TQI
B_subtilis_GlnH_homo_YCKK      LNSKRFDVVANQVG-KTDREDKYDFSDKYTTSR-AVV
YA80_HAEIN                     LNAKRFDVIANQTNPSPERLKKYSFTTPYNYSG-GVI
FLIY_ECOLI                     LDSKRIDVVINQVTISDERKKKYDFSTPYTISGIQAL
E_coli_GlnH                    LQTKNVDLALAGITITDERKKAIDFSDGYYKSG-LLV
Deinococcus_radiodurans        LQANKYDVIVNQVGITPERQNSIGFSQPYAYSRPEII
HISJ_E_COLI                    LKAKKIDAIMSSLSITEKRQQEIAFTDKLYAADSRLV
HISJ_E_COLI                    LKAKKIDAIMSSLSITEKRQQEIAFTDKLYAADSRLV
                               *.: . *        .  *     *:          :

"""

    from StringIO import StringIO

    alignments = list(ClustalIterator(StringIO(aln_example1)))
    assert 1 == len(alignments)
    assert alignments[0]._version == "1.81"
    records = alignments[0].get_all_seqs()
    assert 2 == len(records)
    assert records[0].id == "gi|4959044|gb|AAD34209.1|AF069"
    assert records[1].id == "gi|671626|emb|CAA85685.1|"
    assert records[0].seq.tostring() == \
          "MENSDSNDKGSDQSAAQRRSQMDRLDREEAFYQFVNNLSEEDYRLMRDNN" + \
          "LLGTPGESTEEELLRRLQQIKEGPPPQSPDENRAGESSDDVTNSDSIIDW" + \
          "LNSVRQTGNTTRSRQRGNQSWRAVSRTNPNSGDFRFSLEINVNRNNGSQT" + \
          "SENESEPSTRRLSVENMESSSQRQMENSASESASARPSRAERNSTEAVTE" + \
          "VPTTRAQRRA"

    alignments = list(ClustalIterator(StringIO(aln_example2)))
    assert 1 == len(alignments)
    assert alignments[0]._version == "1.83"
    records = alignments[0].get_all_seqs()
    assert 9 == len(records)
    assert records[-1].id == "HISJ_E_COLI"
    assert records[-1].seq.tostring() == \
          "MKKLVLSLSLVLAFSSATAAF-------------------AAIPQNIRIG" + \
          "TDPTYAPFESKNS-QGELVGFDIDLAKELCKRINTQCTFVENPLDALIPS" + \
          "LKAKKIDAIMSSLSITEKRQQEIAFTDKLYAADSRLV"

    for alignment in ClustalIterator(StringIO(aln_example2 + aln_example1)) :
        print "Alignment with %i records of length %i" \
              % (len(alignment.get_all_seqs()),
                 alignment.get_alignment_length())

    print "Checking empty file..."
    assert 0 == len(list(ClustalIterator(StringIO(""))))

    print "Checking write/read..."
    alignments = list(ClustalIterator(StringIO(aln_example1))) \
               + list(ClustalIterator(StringIO(aln_example2)))*2
    handle = StringIO()
    ClustalWriter(handle).write_file(alignments)
    handle.seek(0)
    for i,a in enumerate(ClustalIterator(handle)) :
        assert a.get_alignment_length() == alignments[i].get_alignment_length()
    handle.seek(0)

    print "Testing write/read when there is only one sequence..."
    alignment._records = alignment._records[0:1]
    handle = StringIO()
    ClustalWriter(handle).write_file([alignment])
    handle.seek(0)
    for i,a in enumerate(ClustalIterator(handle)) :
        assert a.get_alignment_length() == alignment.get_alignment_length()
        assert len(a.get_all_seqs()) == 1
        

    print "The End"

Generated by  Doxygen 1.6.0   Back to index