Logo Search packages:      
Sourcecode: python-biopython version File versions  Download package

BaseDB.py

import os
import Bio
import compression

def _int_str(i):
    s = str(i)
    if s[-1:] == "l":
        return s[:-1]
    return s

00011 class WriteDB:
    # Must define 'self.filename_map' mapping from filename -> fileid
    # Must define 'self.fileid_info' mapping from fileid -> (filename,size)

    def add_filename(self, filename, size, fileid_info):
        fileid = self.filename_map.get(filename, None)
        if fileid is not None:
            return fileid
        s = str(len(self.filename_map))
        self.filename_map[filename] = s  # map from filename -> id
        assert s not in fileid_info.keys(), "Duplicate entry! %s" % (s,)
        self.fileid_info[s] = (filename, size)
        return s

    def load(self, filename, builder, fileid_info, record_tag = "record"):
        formatname = self.formatname
        size = os.path.getsize(filename)
        filetag = self.add_filename(filename, size, fileid_info)

        source = compression.open_file(filename, "rb")
        if formatname == "unknown":
            formatname = "sequence"
        
        format = Bio.formats.normalize(formatname).identifyFile(source)
        if format is None:
            raise TypeError("Cannot identify file as a %s format" %
                            (self.formatname,))
        if self.formatname == "unknown":
            expected_names = ["fasta", "embl", "swissprot", "genbank"]
            for node in format._parents:
                if node.name in expected_names:
                    self.formatname = node.name
                    break
            else:
                self.formatname = format.name
        
        iterator = format.make_iterator(
            record_tag,
            select_names = tuple(builder.uses_tags()) + (record_tag,),
            debug_level = 0)

        for record in iterator.iterate(source, cont_handler = builder):
            self.add_record(filetag,
                            iterator.start_position,
                            iterator.end_position - iterator.start_position,
                            record.document)

00058 class DictLookup:
    def __getitem__(self, key):
        raise NotImplementedError("Must be implemented in subclass")
    def keys(self):
        raise NotImplementedError("Must be implemented in subclass")

    def values(self):
        return [self[key] for key in self.keys()]
    def items(self):
        return [(key, self[key]) for key in self.keys()]

    def get(self, key, default = None):
        try:
            return self[key]
        except KeyError:
            return default
    
        
00076 class OpenDB(DictLookup):
    def __init__(self, dbname, index_type):
        self.dbname = dbname

        config = read_config(os.path.join(dbname, "config.dat"))
        if config["index"] != index_type:
            raise TypeError("FlatDB does not support %r index" %
                            (config["index"],))
        self.primary_namespace = config["primary_namespace"]
        self.secondary_namespaces = config["secondary_namespaces"]
        self.formatname = config["format"]

        filename_map = {}
        fileid_info = {}
        for k, v in config.items():
            if not k.startswith("fileid_"):
                continue
            fileid = k[7:]
            filename, size = v
            fileid_info[fileid] = v
            filename_map[filename] = fileid
            if os.path.getsize(filename) != size:
                raise TypeError(
                    "File %s has changed size from %d to %d bytes!" %
                    (size, os.path.getsize(filename)))

        self.filename_map = filename_map
        self.fileid_info = fileid_info
        

    def lookup(self, *args, **kwargs):
        if args:
            if kwargs:
                raise TypeError("Cannot specify both args and kwargs")
            if len(args) != 1:
                raise TypeError("Only one identifier handled")
            namespace, name = self.primary_namespace, args[0]
        
        else:
            if len(kwargs) != 1:
                raise TypeError("lookup takes a single key")
            namespace, name = kwargs.items()[0]
        return self[namespace][name]

00120     def __getitem__(self, namespace):
        """return the database table lookup for the given namespace"""
        raise NotImplementedError("must be implemented in the derived class")

    def keys(self):
        return [self.primary_namespace] + self.secondary_namespaces

# Write the configuration
def write_config(config_filename,
                 index_type,
                 primary_namespace,
                 secondary_namespaces,
                 fileid_info,
                 formatname):
    configfile = open(config_filename, "wb")

    # Write the header
    configfile.write("index\t" + index_type + "\n")

    # Write the namespace information
    configfile.write("primary_namespace\t%s\n" % primary_namespace)
    keys = secondary_namespaces[:]
    keys.sort()
    configfile.write("secondary_namespaces\t")
    configfile.write("\t".join(keys) + "\n")

    # Format name
    configfile.write("format\t" + formatname + "\n")

    # Write the fileid table
    items = fileid_info.items()
    items.sort()
    for fileid, (filename, size) in items:
        configfile.write("fileid_%s\t%s\t%s\n" % \
                         (fileid, filename, _int_str(size)))

    configfile.close()


def read_config(config_filename):
    d = {}
    for line in open(config_filename, "rb").read().split("\n"):
        words = line.rstrip().split("\t")
        assert not d.has_key(words[0]), \
               "Duplicate key %r in config file: old = %r, new = %r" % \
               (words[0], d[words[0]], line)
        if words[0] in ("index", "primary_namespace", "format"):
            if len(words) != 2:
                raise AssertionError(
                    "%s should only have one value, not %r" % \
                    (words[0], words[1:]))
            d[words[0]] = words[1]
            
        elif words[0].startswith("fileid_"):
            if len(words) != 3:
                raise AssertionError(
                    "%s should only have two values, not %r" % \
                    (words[0], words[1:]))
            d[words[0]] = (words[1], long(words[2]))
        
        elif words[0] in ("secondary_namespaces",):
            # This can have 0 or more values
            d[words[0]] = words[1:]
        
        else:
            # Unknown word, save as-is
            d[words[0]] = words[1:]
    
    return d

Generated by  Doxygen 1.6.0   Back to index