Logo Search packages:      
Sourcecode: falcon version File versions  Download package

util.py

# This file is part of the Falcon repository manager
# Copyright (C) 2005-2008 Dennis Kaarsemaker
# See the file named COPYING in the root of the source tree for license details
#
# util.py - Generic utilities

import falcon
import os, sys, time, subprocess, snack, urllib2, re, tempfile, traceback
from gettext import gettext as _

def writefile(path, data):
    if os.path.dirname(path) and not os.path.exists(os.path.dirname(path)):
        os.makedirs(os.path.dirname(path))
    data = str(data)
    if os.path.exists(path) and data == readfile(path):
        debug(_("Not overwriting unchanged file %s") % path)
        return False
    else:
        debug(_("Writing %d bytes to %s") % (len(data), path))
        fd = open(path, 'w')
        fd.write(str(data))
        fd.close()
        return True
 
def readfile(path, ignore_errors = True):
    debug(_("Reading file %s") % path)
    try:
        fd = open(path, 'r')
        d = fd.read()
        fd.close
        return str(d)
    except:
        debug(_("File %s can't be read") % path)
        if ignore_errors:
            return ''
        raise
    
def run(command, wd=None, buffer=True, outfile=None, err_is_out=False):
    if wd:
        debug(_("Executing in workdir %s: %s") % (wd, command))
    else:
        debug(_("Executing: %s") % command)
    out = subprocess.PIPE
    err = subprocess.PIPE
    if not buffer:
        out = None
        err = None
    if outfile:
        out = open(outfile,'w')
    if err_is_out:
        err = subprocess.STDOUT
    process = subprocess.Popen(command, cwd = wd, stdout = out, stderr = err, stdin = subprocess.PIPE, bufsize=1)
    stdout, stderr = process.communicate()
    if outfile:
        out.close()
    ret = process.wait()
    if ret != 0:
        ex = RuntimeError(_("%s failed with code %d\n\n%s") % (command, ret, stderr))
        ex.code = ret
        ex.stdout = stdout
        ex.stderr = stderr
        raise ex
    return stdout

def intersect(a1,a2):
    if a1 == ['all']:
        return a2
    if a2 == ['all']:
        return a1
    return [x for x in a1 if x in a2]

output  = lambda x: sys.stdout.write('*  ' + x + "\n")
warning = lambda x: sys.stderr.write('W: ' + x + "\n")

def error(msg):
    print 'E: ' + msg
    if falcon.screen:
        falcon.screen.finish()
    falcon.unlock()
    sys.exit(1)
def debug(msg):
    if 'conf' in dir(falcon) and falcon.conf.verbose:
        print 'D: ' + msg

outfunc = (output, warning, error, debug)

def newer(file1, file2):
    try:
        return os.path.getmtime(file1) > os.path.getmtime(file2)
    except: # Exceptions occur when files don't exist
        return True

# The timer decorator
def timer(func):
    def wrapper(*args, **kwargs):
        if not falcon.conf.timings:
            return func(*args, **kwargs)
        start = time.time()
        res = func(*args, **kwargs)
        stop = time.time()
        delta = stop - start
        falcon.util.output(_("Function %s (args: %s, kwargs: %s) took %.3f seconds") %
                           (func.func_name, str(args), str(kwargs), delta))
        return res
    return wrapper

# Timer stack for non-decorator timers
timerstack = []
def timer_start(text=None):
    if falcon.conf.timings:
        timerstack.append((text,time.time()))

def timer_stop():
    if falcon.conf.timings:
        text, tm = timerstack.pop()
        falcon.util.output(_("%s took %.4f seconds") % (text, time.time() - tm))

def escapeurl(url):
    url = url.replace('http://','').replace('https://','').replace('ftp://','')
    ret = ''
    for c in url:
        if c.isalnum() or c in '_%.':
            ret += c
        else:
            ret += '_'
    return ret

def humanize(size):
    suffix = ' bytes'
    if size > 1048576:
        size = round(size/1048576, 2)
        suffix = 'MB'
    elif size > 1024:
        size = round(size/1024, 2)
        suffix = 'KB'
    return '%s %s' % (size, suffix)

def download(url,cache=None,store=None):
    mtime = 0
    if cache and os.path.exists(cache):
        mtime = os.path.getmtime(cache) - time.timezone
    falcon.util.output(_("Trying to download %s") % url)
    req = urllib2.urlopen(url)
    url_mtime = time.mktime(req.headers.getdate('last-modified'))
    if cache and (url_mtime < mtime):
        del req
        return open(cache)

    # Now really download
    if cache:
        fd = open(cache,'w')
    elif store:
        fd = open(store,'w+')
    size = int(req.headers['content-length'])
    sys.stdout.write(_("   Size: %s - downloaded   0%%") % humanize(size))
    sys.stdout.flush()

    dled = oldperc = 0
    while True:
        dl = req.read(2048)
        fd.write(dl)
        dled += len(dl)
        if len(dl) == 0: break
        perc = int(float(dled)/size * 100.0)
        if perc != oldperc:
            oldperc = perc
            sys.stdout.write("\b\b\b\b%3d%%" % perc)
            sys.stdout.flush()
    print ""

    if cache:
        fd.close()
        if url.endswith('.gz'):
            os.rename(cache, cache + '.gz')
            run(['gunzip', cache + '.gz'])
        if url.endswith('.bz2'):
            os.rename(cache, cache + '.bz2')
            run(['bunzip2', cache + '.bz2'])
        return open(cache)
    fd.seek(0)
    return fd

def debug_exception():
     for line in traceback.format_exc().split('\n'):
        debug(line)

def get_template(name):
    from django.template import loader
    falcon.conf.template_checked = False
    res = loader.get_template(name)
    if not falcon.conf.template_checked:
        error(_("Template %s did not perform an API check") % name)
    return res

def parse_uri(uri):
    uri = uri.strip()
    m = re.match(r'''(?P<engine>mysql|postgresql)://
                    (?:
                      (?P<username>[^:@]+)
                      (?::
                        (?P<password>[^@]+)
                      )?
                    @)?
                    (?P<hostname>[-a-zA-z0-9.]+)
                    (?::(?P<port>\d+))?
                    /
                    (?P<database>.+)''', uri, re.VERBOSE)
    if not m:
        warning(_("Couldn't parse database handle '%s'") % uri)
        return
    class dummy(object): pass
    ret = dummy()
    for attr in ('engine', 'hostname', 'port', 'database', 'username', 'password'):
        setattr(ret, attr, m.group(attr))
    return ret

Generated by  Doxygen 1.6.0   Back to index