Logo Search packages:      
Sourcecode: python-biopython version File versions

BioSeq.py

00001 """Implementations of Biopython-like Seq objects on top of BioSQL.

This allows retrival of items stored in a BioSQL database using
a biopython-like Seq interface.
"""

from Bio import SeqFeature

class DBSeq:  # This implements the biopython Seq interface
    def __init__(self, primary_id, adaptor, alphabet, start, length):
        self.primary_id = primary_id
        self.adaptor = adaptor
        self.alphabet = alphabet
        self._length = length
        self.start = start

    def __len__(self):
        return self._length
    
    def __getitem__(self, i):
        if i < 0:
            if -i >= self._length:
                raise IndexError(i)
            i = i + self._length
        elif i >= self._length:
            raise IndexError(i)

        return self.adaptor.get_subseq_as_string(self.primary_id,
                                                 self.start + i,
                                                 self.start + i + 1)
    def __getslice__(self, i, j):
        i = max(i, 0)
        i = min(i, self._length)
        j = max(j, 0)
        j = min(j, self._length)
        if i >= j:
            return self.__class__(self.primary_id, self.adaptor, self.alphabet,
                                  self.start, 0)
        return self.__class__(self.primary_id, self.adaptor, self.alphabet,
                              self.start + i, j - i)
    def tostring(self):
        return self.adaptor.get_subseq_as_string(self.primary_id,
                                                 self.start,
                                                 self.start + self._length)

    data = property(tostring)

def _retrieve_seq(adaptor, primary_id):
    seqs = adaptor.execute_and_fetchall(
        "SELECT alphabet, length(seq) FROM biosequence" \
        " WHERE bioentry_id = %s", (primary_id,))
    if seqs:
        moltype, length = seqs[0]
        from Bio.Alphabet import IUPAC
        if moltype == "dna":
            alphabet = IUPAC.unambiguous_dna
        elif moltype == "rna":
            alphabet = IUPAC.unambiguous_rna
        elif moltype == "protein":
            alphabet = IUPAC.protein
        else:
            raise AssertionError("Unknown moltype: %s" % moltype)
        seq = DBSeq(primary_id, adaptor, alphabet, 0, int(length))
        return seq
    else:
        return None

def _retrieve_dbxrefs(adaptor, primary_id):
    _dbxrefs = []
    dbxrefs = adaptor.execute_and_fetchall(
        "SELECT dbname, accession, version" \
        " FROM bioentry_dbxref join dbxref using (dbxref_id)" \
        " WHERE bioentry_id = %s" \
        " ORDER BY rank", (primary_id,))
    for dbname, accession, version in dbxrefs:
        if version and version != "0":
            v = "%s.%s" % (accession, version)
        else:
            v = accession
        _dbxrefs.append((dbname, v))
    return _dbxrefs

def _retrieve_features(adaptor, primary_id):
    sql = "SELECT seqfeature_id, type.name, rank" \
          " FROM seqfeature join term type on (type_term_id = type.term_id)" \
          " WHERE bioentry_id = %s" \
          " ORDER BY rank"
    results = adaptor.execute_and_fetchall(sql, (primary_id,))
    seq_feature_list = []
    for seqfeature_id, seqfeature_type, seqfeature_rank in results:
        # Get qualifiers
        qvs = adaptor.execute_and_fetchall(
            "SELECT name, value" \
            " FROM seqfeature_qualifier_value  join term using (term_id)" \
            " WHERE seqfeature_id = %s", (seqfeature_id,))
        qualifiers = {}
        for qv_name, qv_value in qvs:
            qualifiers.setdefault(qv_name, []).append(qv_value)
        # Get locations
        results = adaptor.execute_and_fetchall(
            "SELECT location_id, start_pos, end_pos, strand" \
            " FROM location" \
            " WHERE seqfeature_id = %s" \
            " ORDER BY rank", (seqfeature_id,))
        locations = []
        # convert to Python standard form
        for location_id, start, end, strand in results:
            if start:
                start -= 1
            locations.append( (location_id, start, end, strand) )
        # Get possible remote reference information
        remote_results = adaptor.execute_and_fetchall(
            "SELECT location_id, dbname, accession, version" \
            " FROM location join dbxref using (dbxref_id)" \
            " WHERE seqfeature_id = %s", (seqfeature_id,))
        lookup = {}
        for location_id, dbname, accession, version in remote_results:
            if version and version != "0":
                v = "%s.%s" % (accession, version)
            else:
                v = accession
            lookup[location_id] = (dbname, v)
        
        feature = SeqFeature.SeqFeature(type = seqfeature_type)
        feature.qualifiers = qualifiers
        if len(locations) == 0:
            pass
        elif len(locations) == 1:
            location_id, start, end, strand = locations[0]
            dbname, version = lookup.get(location_id, (None, None))

            feature.location = SeqFeature.FeatureLocation(start, end)
            feature.strand = strand
            feature.ref_db = dbname
            feature.ref = version
        else:
            min_start = locations[0][1]
            max_end = locations[0][2]
            sub_feature_list = []       # (start, sub feature) for sorting
            for location in locations:
                location_id, start, end, strand = location
                dbname, version = lookup.get(location_id, (None, None))
                min_start = min(min_start, start)
                max_end = max(max_end, end)

                subfeature = SeqFeature.SeqFeature()
                subfeature.type = seqfeature_type 
                subfeature.location_operator = "join"
                subfeature.location = SeqFeature.FeatureLocation(start, end)
                subfeature.strand = strand
                subfeature.ref_db = dbname
                subfeature.ref = version
                sub_feature_list.append((start, subfeature))
            sub_feature_list.sort()
            feature.sub_features = [sub_feature[1]
                                    for sub_feature in sub_feature_list]
            feature.location = SeqFeature.FeatureLocation(min_start, max_end)
            feature.strand = feature.sub_features[0].strand

        seq_feature_list.append(
            (seqfeature_rank, feature.location.start.position, feature) )

    # Primary sort is on the feature's rank
    #  .. then on the start position
    #  .. then arbitrary on the feature's id (SeqFeature has no __cmp__)
    seq_feature_list.sort()

    # Get just the SeqFeature
    return [x[2] for x in seq_feature_list]

def _retrieve_annotations(adaptor, primary_id, taxon_id):
    annotations = {}
    annotations.update(_retrieve_qualifier_value(adaptor, primary_id))
    annotations.update(_retrieve_reference(adaptor, primary_id))
    annotations.update(_retrieve_dbxref(adaptor, primary_id))
    annotations.update(_retrieve_taxon(adaptor, primary_id, taxon_id))
    return annotations

def _retrieve_qualifier_value(adaptor, primary_id):
    qvs = adaptor.execute_and_fetchall(
        "SELECT name, value" \
        " FROM bioentry_qualifier_value JOIN term USING (term_id)" \
        " WHERE bioentry_id = %s" \
        " ORDER BY rank", (primary_id,))
    qualifiers = {}
    for name, value in qvs:
        if name == "keyword": name = "keywords"
        elif name == "date_changed": name = "dates"
        elif name == "secondary_accession": name = "accessions"
        qualifiers.setdefault(name, []).append(value)
    return qualifiers

def _retrieve_reference(adaptor, primary_id):
    # XXX dbxref_qualifier_value
 
    refs = adaptor.execute_and_fetchall(
        "SELECT start_pos, end_pos, " \
        " location, title, authors," \
        " dbname, accession" \
        " FROM bioentry_reference" \
        " JOIN reference USING (reference_id)" \
        " LEFT JOIN dbxref USING (dbxref_id)" \
        " WHERE bioentry_id = %s" \
        " ORDER BY rank", (primary_id,))
    references = []
    for start, end, location, title, authors, dbname, accession in refs:
        reference = SeqFeature.Reference()
        if start: start -= 1
        reference.location = [SeqFeature.FeatureLocation(start, end)]
        reference.authors = authors
        reference.title = title
        reference.journal = location
        if dbname == 'PUBMED':
            reference.pubmed_id = accession
        elif dbname == 'MEDLINE':
            reference.medline_id = accession
        references.append(reference)
    return {'references': references}

def _retrieve_dbxref(adaptor, primary_id):
    # XXX dbxref_qualifier_value

    refs = adaptor.execute_and_fetchall(
        "SELECT dbname, accession, version" \
        " FROM bioentry_dbxref JOIN dbxref USING (dbxref_id)" \
        " WHERE bioentry_id = %s" \
        " ORDER BY rank", (primary_id,))
    dbxrefs = []
    for dbname, accession, version in refs:
        if version and version != "0":
            v = "%s.%s" % (accession, version)
        else:
            v = accession
        dbxrefs.append((dbname, v))
    return {'cross_references': dbxrefs}

def _retrieve_taxon(adaptor, primary_id, taxon_id):
    a = {}
    common_names = adaptor.execute_and_fetch_col0(
        "SELECT name FROM taxon_name WHERE taxon_id = %s" \
        " AND name_class = 'genbank common name'", (taxon_id,))
    if common_names:
        a['source'] = common_names[0]
    scientific_names = adaptor.execute_and_fetch_col0(
        "SELECT name FROM taxon_name WHERE taxon_id = %s" \
        " AND name_class = 'scientific name'", (taxon_id,))
    if scientific_names:
        a['organism'] = scientific_names[0]
    ncbi_taxids = adaptor.execute_and_fetch_col0(
        "SELECT ncbi_taxon_id FROM taxon WHERE taxon_id = %s", (taxon_id,))
    if ncbi_taxids and ncbi_taxids[0] and ncbi_taxids[0] != "0":
        a['ncbi_taxid'] = ncbi_taxids[0]

    # XXX 'no rank' or not?
    # If we keep them, perhaps add " AND parent.parent_taxon_id <> 1"
    taxonomy = adaptor.execute_and_fetch_col0(
        "SELECT taxon_name.name" \
        " FROM taxon t, taxon parent, taxon_name" \
        " WHERE parent.taxon_id=taxon_name.taxon_id" \
        " AND t.left_value BETWEEN parent.left_value AND parent.right_value" \
        " AND name_class='scientific name'" \
        " AND parent.node_rank <> 'no rank'" \
        " AND t.taxon_id = %s" \
        " ORDER BY parent.left_value", (taxon_id,))
    if taxonomy:
        a['taxonomy'] = taxonomy
    return a

00269 class DBSeqRecord(object):
    """BioSQL equivalent of the biopython SeqRecord object.
    """

    def __init__(self, adaptor, primary_id):
        self._adaptor = adaptor
        self._primary_id = primary_id

        (self._biodatabase_id, self._taxon_id, self.name,
         accession, version, self._identifier,
         self._division, self.description) = self._adaptor.execute_one(
            "SELECT biodatabase_id, taxon_id, name, accession, version," \
            " identifier, division, description" \
            " FROM bioentry" \
            " WHERE bioentry_id = %s", (self._primary_id,))
        if version and version != "0":
            self.id = "%s.%s" % (accession, version)
        else:
            self.id = accession

    def __get_seq(self):
        if not hasattr(self, "_seq"):
            self._seq = _retrieve_seq(self._adaptor, self._primary_id)
        return self._seq
    def __set_seq(self, seq): self._seq = seq
    def __del_seq(self):      del self._seq
    seq = property(__get_seq, __set_seq, __del_seq, "Seq object")

    def __get_dbxrefs(self):
        if not hasattr(self,"_dbxrefs"):
            self._dbxrefs = _retrieve_dbxrefs(self._adaptor, self._primary_id)
        return self._dbxrefs
    def __set_dbxrefs(self, dbxrefs): self._dbxrefs = dbxrefs
    def __del_dbxrefs(self):      del self._dbxrefs
    dbxrefs = property(__get_dbxrefs, __set_dbxrefs, __del_dbxrefs, "Dbxrefs")

    def __get_features(self):
        if not hasattr(self, "_features"):
            self._features = _retrieve_features(self._adaptor,
                                                self._primary_id)
        return self._features
    def __set_features(self, features): self._features = features
    def __del_features(self):      del self._features
    features = property(__get_features, __set_features, __del_features,
                        "Features")

    def __get_annotations(self):
        if not hasattr(self, "_annotations"):
            self._annotations = _retrieve_annotations(self._adaptor,
                                                      self._primary_id,
                                                      self._taxon_id)
            if self._identifier:
                self._annotations["gi"] = self._identifier
            if self._division:
                self._annotations["data_file_division"] = self._division
        return self._annotations
    def __set_annotations(self, annotations): self._annotations = annotations
    def __del_annotations(self): del self._annotations
    annotations = property(__get_annotations, __set_annotations,
                           __del_annotations, "Annotations")

Generated by  Doxygen 1.6.0   Back to index