Logo Search packages:      
Sourcecode: python-biopython version File versions  Download package

BioSeqDatabase.py

00001 """Connect with a BioSQL database and load Biopython like objects from it.

This provides interfaces for loading biological objects from a relational
database, and is compatible with the BioSQL standards.
"""
import BioSeq
import Loader
import DBUtils

00010 def open_database(driver = "MySQLdb", **kwargs):
    """Main interface for loading a existing BioSQL-style database.

    This function is the easiest way to retrieve a connection to a
    database, doing something like:
        
        >>> from BioSeq import BioSeqDatabase
        >>> server = BioSeqDatabase.open_database(user = "root", db="minidb")

    the various options are:
    driver -> The name of the database driver to use for connecting. The
    driver should implement the python DB API. By default, the MySQLdb
    driver is used.
    user -> the username to connect to the database with.
    password, passwd -> the password to connect with
    host -> the hostname of the database
    database or db -> the name of the database
    """
    module = __import__(driver)
    connect = getattr(module, "connect")

    # Different drivers use different keywords...
    kw = kwargs.copy()
    if driver == "MySQLdb":
        if kw.has_key("database"):
            kw["db"] = kw["database"]
            del kw["database"]
        if kw.has_key("password"):
            kw["passwd"] = kw["password"]
            del kw["password"]
    else:
        # DB-API recommendations
        if kw.has_key("db"):
            kw["database"] = kw["db"]
            del kw["db"]
        if kw.has_key("passwd"):
            kw["password"] = kw["passwd"]
            del kw["passwd"]
    if driver == "psycopg" and not kw.get("database"):
        kw["database"] = "template1"
    try:
        conn = connect(**kw)
    except module.InterfaceError:
        # Ok, so let's try building a DSN
        # (older releases of psycopg need this)
        if kw.has_key("database"):
            kw["dbname"] = kw["database"]
            del kw["database"]
        elif kw.has_key("db"):
            kw["dbname"] = kw["db"]
            del kw["db"]
        
        dsn = ' '.join(['='.join(i) for i in kw.items()])
        conn = connect(dsn)
    
    return DBServer(conn, module)

class DBServer:
    def __init__(self, conn, module, module_name=None):
        self.module = module
        if module_name is None:
            module_name = module.__name__
        self.adaptor = Adaptor(conn, DBUtils.get_dbutils(module_name))
        self.module_name = module_name
        
    def __repr__(self):
        return self.__class__.__name__ + "(%r)" % self.adaptor.conn
    def __getitem__(self, name):
        return BioSeqDatabase(self.adaptor, name)
    def keys(self):
        return self.adaptor.list_biodatabase_names()
    def values(self):
        return [self[key] for key in self.keys()]
    def items(self):
        return [(key, self[key]) for key in self.keys()]

    def remove_database(self, db_name):
        """Try to remove all references to items in a database.
        """
        db_id = self.adaptor.fetch_dbid_by_dbname(db_name)
        remover = Loader.DatabaseRemover(self.adaptor, db_id)
        remover.remove()

    def new_database(self, db_name, authority=None, description=None):
        """Add a new database to the server and return it.
        """
        # make the database
        sql = r"INSERT INTO biodatabase (name, authority, description)" \
              r" VALUES (%s, %s, %s)" 
        self.adaptor.execute(sql, (db_name,authority, description))
        return BioSeqDatabase(self.adaptor, db_name)

    def load_database_sql(self, sql_file):
        """Load a database schema into the given database.

        This is used to create tables, etc when a database is first created.
        sql_file should specify the complete path to a file containing
        SQL entries for building the tables.
        """
        # Not sophisticated enough for PG schema. Is it needed by MySQL?
        # Looks like we need this more complicated way for both. Leaving it
        # the default and removing the simple-minded approach.

        # read the file with all comment lines removed
        sql_handle = open(sql_file, "rb")
        sql = r""
        for line in sql_handle.xreadlines():
            if line.find("--") == 0: # don't include comment lines
                pass
            elif line.find("#") == 0: # ditto for MySQL comments
                pass
            elif line.strip(): # only include non-blank lines
                sql += line.strip()
                sql += ' '
        
        # two ways to load the SQL
        # 1. PostgreSQL can load it all at once and actually needs to
        # due to FUNCTION defines at the end of the SQL which mess up
        # the splitting by semicolons
        if self.module_name in ["psycopg"]:
            self.adaptor.cursor.execute(sql)
        # 2. MySQL needs the database loading split up into single lines of
        # SQL executed one at a time
        elif self.module_name in ["MySQLdb"]:
            sql_parts = sql.split(";") # one line per sql command
            for sql_line in sql_parts[:-1]: # don't use the last item, it's blank
                self.adaptor.cursor.execute(sql_line)
        else:
            raise ValueError("Module %s not supported by the loader." %
                    (self.module_name))

class Adaptor:
    def __init__(self, conn, dbutils):
        self.conn = conn
        self.cursor = conn.cursor()
        self.dbutils = dbutils

    def last_id(self, table):
        return self.dbutils.last_id(self.cursor, table)

    def autocommit(self, y=True):
        return self.dbutils.autocommit(self.conn, y)

    def commit(self):
        return self.conn.commit()

    def rollback(self):
        return self.conn.rollback()

    def close(self):
        return self.conn.close()

    def fetch_dbid_by_dbname(self, dbname):
        self.cursor.execute(
            r"select biodatabase_id from biodatabase where name = %s",
            (dbname,))
        rv = self.cursor.fetchall()
        if not rv:
            raise KeyError("Cannot find biodatabase with name %r" % dbname)
        # Cannot happen (UK)
##        assert len(rv) == 1, "More than one biodatabase with name %r" % dbname
        return rv[0][0]

00173     def fetch_seqid_by_display_id(self, dbid, name):
        sql = r"select bioentry_id from bioentry where name = %s"
        fields = [name]
        if dbid:
            sql += " and biodatabase_id = %s"
            fields.append(dbid)
        self.cursor.execute(sql, fields)
        rv = self.cursor.fetchall()
        if not rv:
            raise IndexError("Cannot find display id %r" % name)
        if len(rv) > 1:
            raise IndexError("More than one entry with display id %r" % name)
        return rv[0][0]

    def fetch_seqid_by_accession(self, dbid, name):
        sql = r"select bioentry_id from bioentry where accession = %s"
        fields = [name]
        if dbid:
            sql += " and biodatabase_id = %s"
            fields.append(dbid)
        self.cursor.execute(sql, fields)
        rv = self.cursor.fetchall()
        if not rv:
            raise IndexError("Cannot find accession %r" % name)
        if len(rv) > 1:
            raise IndexError("More than one entry with accession %r" % name)
        return rv[0][0]

    def fetch_seqids_by_accession(self, dbid, name):
        sql = r"select bioentry_id from bioentry where accession = %s"
        fields = [name]
        if dbid:
            sql += " and biodatabase_id = %s"
            fields.append(dbid)
        return self.execute_and_fetch_col0(sql, fields)

    def fetch_seqid_by_version(self, dbid, name):
        acc_version = name.split(".")
        if len(acc_version) > 2:
            raise IndexError("Bad version %r" % name)
        acc = acc_version[0]
        if len(acc_version) == 2:
            version = acc_version[1]
        else:
            version = "0"
        sql = r"SELECT bioentry_id FROM bioentry WHERE accession = %s" \
              r" AND version = %s"
        fields = [acc, version]
        if dbid:
            sql += " and biodatabase_id = %s"
            fields.append(dbid)
        self.cursor.execute(sql, fields)
        rv = self.cursor.fetchall()
        if not rv:
            raise IndexError("Cannot find version %r" % name)
        if len(rv) > 1:
            raise IndexError("More than one entry with version %r" % name)
        return rv[0][0]

    def fetch_seqid_by_identifier(self, dbid, identifier):
        # YB: was fetch_seqid_by_seqid
        sql = "SELECT bioentry_id FROM bioentry WHERE identifier = %s"
        fields = [identifier]
        if dbid:
            sql += " and biodatabase_id = %s"
            fields.append(dbid)
        self.cursor.execute(sql, fields)
        rv = self.cursor.fetchall()
        if not rv:
            raise IndexError("Cannot find display id %r" % identifier)
        return rv[0][0]

    def list_biodatabase_names(self):
        return self.execute_and_fetch_col0(
            "SELECT name FROM biodatabase")

    def list_bioentry_ids(self, dbid):
        return self.execute_and_fetch_col0(
            "SELECT bioentry_id FROM bioentry WHERE biodatabase_id = %s",
            (dbid,))

    def list_bioentry_display_ids(self, dbid):
        return self.execute_and_fetch_col0(
            "SELECT name FROM bioentry WHERE biodatabase_id = %s",
            (dbid,))

00259     def list_any_ids(self, sql, args):
        """Return ids given a SQL statement to select for them.
        
        This assumes that the given SQL does a SELECT statement that
        returns a list of items. This parses them out of the 2D list
        they come as and just returns them in a list.
        """
        return self.cursor.execute_and_fetch_col0(sql, args)

    def execute_one(self, sql, args=None):
        self.cursor.execute(sql, args or ())
        rv = self.cursor.fetchall()
        assert len(rv) == 1, "Expected 1 response, got %d" % len(rv)
        return rv[0]

00274     def execute(self, sql, args=None):
        """Just execute an sql command.
        """
        self.cursor.execute(sql, args or ())

    def get_subseq_as_string(self, seqid, start, end):
        length = end - start
        return self.execute_one(
            """select SUBSTRING(seq FROM %s FOR %s)
                     from biosequence where bioentry_id = %s""",
            (start+1, length, seqid))[0]

    def execute_and_fetch_col0(self, sql, args=None):
        self.cursor.execute(sql, args or ())
        return [field[0] for field in self.cursor.fetchall()]

    def execute_and_fetchall(self, sql, args=None):
        self.cursor.execute(sql, args or ())
        return self.cursor.fetchall()

_allowed_lookups = {
    # Lookup name / function name to get id, function to list all ids
    'primary_id': "fetch_seqid_by_identifier",
    'gi':         "fetch_seqid_by_identifier",
    'display_id': "fetch_seqid_by_display_id",
    'name':       "fetch_seqid_by_display_id",
    'accession':  "fetch_seqid_by_accession",
    'version':    "fetch_seqid_by_version",
    }

class BioSeqDatabase:
    def __init__(self, adaptor, name):
        self.adaptor = adaptor
        self.name = name
        self.dbid = self.adaptor.fetch_dbid_by_dbname(name)
    def __repr__(self):
        return "BioSeqDatabase(%r, %r)" % (self.adaptor, self.name)
        
    def get_Seq_by_id(self, name):
        """Gets a Bio::Seq object by its name

        Example: seq = db.get_Seq_by_id('ROA1_HUMAN')
        
        """
        seqid = self.adaptor.fetch_seqid_by_display_id(self.dbid, name)
        return BioSeq.DBSeqRecord(self.adaptor, seqid)

    def get_Seq_by_acc(self, name):
        """Gets a Bio::Seq object by accession number

        Example: seq = db.get_Seq_by_acc('X77802')

        """
        seqid = self.adaptor.fetch_seqid_by_accession(self.dbid, name)
        return BioSeq.DBSeqRecord(self.adaptor, seqid)

    def get_Seq_by_ver(self, name):
        """Gets a Bio::Seq object by version number

        Example: seq = db.get_Seq_by_ver('X77802.1')

        """
        seqid = self.adaptor.fetch_seqid_by_version(self.dbid, name)
        return BioSeq.DBSeqRecord(self.adaptor, seqid)

    def get_Seqs_by_acc(self, name):
        """Gets a *list* of Bio::Seq objects by accession number

        Example: seqs = db.get_Seq_by_acc('X77802')

        """
        seqids = self.adaptor.fetch_seqids_by_accession(self.dbid, name)
        return [BioSeq.DBSeqRecord(self.adaptor, seqid) for seqid in seqids]

    def get_PrimarySeq_stream(self):
        # my @array = $self->get_all_primary_ids;
        # my $stream = Bio::DB::BioDatabasePSeqStream->new(
        #         -adaptor => $self->_adaptor->db->get_PrimarySeqAdaptor,
        #         -idlist => \@array);
        raise NotImplementedError("waiting for Python 2.2's iter")

    def get_all_primary_ids(self):
        """Array of all the primary_ids of the sequences in the database.

        These maybe ids (display style) or accession numbers or
        something else completely different - they *are not*
        meaningful outside of this database implementation.
        """
        return self.adaptor.list_bioentry_ids(self.dbid)

    def __getitem__(self, key):
        return BioSeq.DBSeqRecord(self.adaptor, key)
    def keys(self):
        return self.get_all_primary_ids()
    def values(self):
        return [self[key] for key in self.keys()]
    def items(self):
        return [(key, self[key]) for key in self.keys()]

    def lookup(self, **kwargs):
        if len(kwargs) != 1:
            raise TypeError("single key/value parameter expected")
        k, v = kwargs.items()[0]
        if not _allowed_lookups.has_key(k):
            raise TypeError("lookup() expects one of %s, not %r" % \
                            (repr(_allowed_lookups.keys())[1:-1], repr(k)))
        lookup_name = _allowed_lookups[k]
        lookup_func = getattr(self.adaptor, lookup_name)
        seqid = lookup_func(self.dbid, v)
        return BioSeq.DBSeqRecord(self.adaptor, seqid)
        
    def get_Seq_by_primary_id(self, seqid):
        """Gets a Bio::Seq object by the primary (internal) id.

        The primary id in these cases has to come from
        $db->get_all_primary_ids.  There is no other way to get (or
        guess) the primary_ids in a database.
        """
        return self[seqid]

    def load(self, record_iterator):
        """Load a set of SeqRecords into the BioSQL database.

        record_iterator is an Iterator object that returns SeqRecord objects
        which will be used to populate the database. The Iterator should
        implement next() and either return None or raise StopIteration
        when it is out of objects.

        Returns the number of records loaded.
        """
        db_loader = Loader.DatabaseLoader(self.adaptor, self.dbid)
        num_records = 0
        while 1:
            try:
                cur_record = record_iterator.next()
            except StopIteration:
                break
            if cur_record is None:
                break
            num_records += 1
            db_loader.load_seqrecord(cur_record)

        return num_records
        

Generated by  Doxygen 1.6.0   Back to index