Viewing file: util.py (12.16 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# # Copyright (C) 2005 Red Hat, Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. #
import os import os.path import sys import fnmatch import pwd import gettext import locale import errno import warnings import exceptions import random from config import *
( DEBUG_DEPRECATED, DEBUG_USERPROFILE, DEBUG_STORAGE, DEBUG_PROTOSESSION, DEBUG_USERMOD, DEBUG_DIRMONITOR, DEBUG_GCONFSOURCE, DEBUG_PANELDELEGATE, DEBUG_FILESSOURCE, DEBUG_MOZILLASOURCE, DEBUG_ADMINTOOL, DEBUG_USERDB, DEBUG_CACHE, ) = range (13)
debug_modules = { DEBUG_DEPRECATED : ("deprecated", False), DEBUG_USERPROFILE : ("user-profile", False), DEBUG_STORAGE : ("storage", False), DEBUG_PROTOSESSION : ("proto-session", False), DEBUG_USERMOD : ("usermod", False), DEBUG_DIRMONITOR : ("dir-monitor", False), DEBUG_GCONFSOURCE : ("gconf-source", False), DEBUG_PANELDELEGATE : ("panel-delegate", False), DEBUG_FILESSOURCE : ("files-source", False), DEBUG_MOZILLASOURCE : ("mozilla-source", False), DEBUG_ADMINTOOL : ("admin-tool", False), DEBUG_USERDB : ("user-db", False), DEBUG_CACHE : ("cache", False), }
def init_debug_modules (): debug_value = os.getenv ("SABAYON_DEBUG") if not debug_value: warnings.filterwarnings ("ignore", category = exceptions.DeprecationWarning) return
if debug_value == "help": print "Valid options for the SABAYON_DEBUG environment variable are:\n" print " all" for module in debug_modules: print " %s" % debug_modules[module][0] print "You may supply a list of modules separated by a colon (:)" print "You may also supply an optional hex debug mask to a module, e.g. foo=0xF8" sys.exit (1) elif debug_value == "all": for module in debug_modules: debug_modules[module] = (debug_modules[module][0], 0xFFFFFFFF) else: for item in debug_value.split (":"): item = item.split("=") key = item[0] value = True if len(item) > 1: value = int(item[1],16) for module in debug_modules: if debug_modules[module][0] == key: debug_modules[module] = (key, value) break
if not debug_modules[DEBUG_DEPRECATED][1]: warnings.filterwarnings ("ignore", category = exceptions.DeprecationWarning)
init_debug_modules ()
def debug_print (module, message, mask=~0): assert debug_modules.has_key(module) if not debug_modules[module][1] & mask: return print "(%d) %s: %s" % (os.getpid (), debug_modules[module][0], message)
class GeneralError (Exception): def __init__ (self, msg): Exception.__init__ (self, msg)
unit_tests_homedir = None def set_home_dir_for_unit_tests (homedir): global unit_tests_homedir unit_tests_homedir = homedir
def get_home_dir (): if unit_tests_homedir: return unit_tests_homedir try: pw = pwd.getpwuid (os.getuid ()) if pw.pw_dir != "": return pw.pw_dir except KeyError: pass if os.environ.has_key ("HOME"): return os.environ["HOME"] else: raise GeneralError (_("Cannot find home directory: not set in /etc/passwd and no value for $HOME in environment"))
def get_user_name (): try: pw = pwd.getpwuid (os.getuid ()) if pw.pw_name != "": return pw.pw_name except KeyError: pass if os.environ.has_key ("USER"): return os.environ["USER"] else: raise GeneralError (_("Cannot find username: not set in /etc/passwd and no value for $USER in environment"))
def print_exception (): import traceback import sys traceback.print_exc(file=sys.stderr)
def init_gettext (): """Binds _() to gettext.gettext() in the global namespace. Run util.init_gettext() at the entry point to any script and you'll be able to use _() to mark strings for translation. """ locale.setlocale (locale.LC_ALL, "") gettext.install (PACKAGE, LOCALEDIR)
def random_string (len): """Returns a string with random binary data of the specified length""" bin = "" while len > 0: len = len - 1 bin = bin + chr(random.getrandbits(8)) return bin
def split_path(path, head=None, tail=None): '''Given a path split it into a head and tail. If head is passed then it is assumed to comprise the first part of the full path. If tail is passed it is assumed to comprise the second part of the full path. The path, head, and tail are made canonical via os.path.normpath prior to the operations. ValueErrors are raised if head is not present at the start of the path or if the path does not end with tail. The split must occur on a directory separator boundary.
The return value is the tuple (head, tail) in canonical form.''' path = os.path.normpath(path)
if tail is not None: tail = os.path.normpath(tail) if tail[0] == '/': tail = tail[1:] if not path.endswith(tail): raise ValueError path_len = len (path) tail_len = len (tail) dir_split = path_len - tail_len - 1 if path[dir_split] != '/': raise ValueError return (path[:dir_split], path[dir_split+1:])
if head is not None: head = os.path.normpath(head) if head[-1] == '/': head = head[:-1] if not path.startswith(head): raise ValueError head_len = len (head) dir_split = head_len if path[dir_split] != '/': raise ValueError return (path[:dir_split], path[dir_split+1:]) raise ValueError
# # os.spawn() doesn't handle EINTR from waitpid() on Linux: # http://sourceforge.net/tracker/?group_id=5470&atid=105470&func=detail&aid=686667 # Best we can do is ignore the exception and carry on # See bug #303034 # def uninterruptible_spawnve (mode, file, args, env): try: if env is None: os.spawnv (mode, file, args) else: os.spawnve (mode, file, args, env) except os.error, (err, errstr): if err != errno.EINTR: raise def uninterruptible_spawnv (mode, file, args): uninterruptible_spawnve (mode, file, args, None)
def run_unit_tests (): home_dir = get_home_dir () assert home_dir != "" assert get_user_name () != "" set_home_dir_for_unit_tests ("foo") assert get_home_dir () == "foo" set_home_dir_for_unit_tests (None) assert get_home_dir () == home_dir
# ------ Class DictCompare ------
class DictCompare: def __init__(self, a, b): self.a = a self.b = b
def compare(self): ''' Given two dictionaries a,b analyze them for their differences and similarities.
intersection - keys shared between a and b only_a - keys only present in a only_b - keys only present in b equal - keys present in both a and b whole values are equal not_equal - keys present in both a and b whole values are not equal'''
self.keys_a = self.a.keys() self.keys_b = self.b.keys() self.intersection = [] self.only_a = [] self.only_b = [] self.equal = [] self.not_equal = [] self._add = {} self._del = {} self._mod = {}
for k in self.keys_a: if self.b.has_key(k): self.intersection.append(k) else: self.only_a.append(k)
for k in self.keys_b: if not self.a.has_key(k): self.only_b.append(k) for k in self.intersection: if self.a[k] == self.b[k]: self.equal.append(k) else: self.not_equal.append(k) def intersection(self): 'return list of keys shared between a and b' return self.intersection
def only_a(self): 'return list of keys only present in a' return self.only_a
def only_b(self): 'return list of keys only present in b' return self.only_b
def equal(self): 'return list of keys present in both a and b whole values are equal' return self.equal
def not_equal(self): 'return list of keys present in both a and b whole values are not equal' return self.not_equal
def get_change_set(self, dict_lhs, dict_rhs): '''Return changes necessary to make dict_lhs equivalent to dict_rhs, (e.g. lhs = rhs), the two dictionary parameters are specified as either the string 'a' or the string 'b' corresponding to the parameters this class was created with. Return value is a dictionary with 3 keys (add, del, mod) whose values are dictionaries containing containing (key,value) pairs to add, delete, or modify respectively in dict_lhs.'''
if dict_lhs == dict_rhs or dict_lhs not in "ab" or dict_rhs not in "ab": raise ValueError
if dict_lhs == 'a': a = self.a b = self.b only_a = self.only_a only_b = self.only_b elif dict_lhs == 'b': a = self.b b = self.a only_a = self.only_b only_b = self.only_a else: raise ValueError
self._add = {} for k in only_b: self._add[k] = b[k]
self._del = {} for k in only_a: self._del[k] = a[k]
self._mod = {} for k in self.not_equal: self._mod[k] = b[k]
change_set = {'add':self._add, 'del':self._del, 'mod':self._mod} return change_set
def is_equal(self): if len(self.only_a) == 0 and len(self.only_b) == 0 and len(self.not_equal) == 0: return True else: return False
def dump(self): 'Print the results of the dictionary comparision' print "intersection = %s" % ",".join(self.intersection) print "only a = %s" % ",".join(self.only_a) print "only b = %s" % ",".join(self.only_b) print "equal = %s" % ",".join(self.equal) print "not equal = %s" % ",".join(self.not_equal)
def dump_change_set(cs): _add = cs['add'] _del = cs['del'] _mod = cs['mod']
if len(_add.keys()): print "Key/Values to ADD" for k in _add.keys(): print " %s=%s" % (k, _add[k])
if len(_del.keys()): print "Keys to DELETE" for k in _del.keys(): print " %s=%s" % (k, _del[k])
if len(_mod.keys()): print "Key/Values to Modify" for k in _mod.keys(): print " %s=%s" % (k, _mod[k])
def should_ignore_dir (base_dir, ignore_dir_list, dir): dir = os.path.normpath (dir)
for ignore_dir in ignore_dir_list: ignore_path = os.path.normpath (os.path.join (base_dir, ignore_dir))
if fnmatch.fnmatch (dir, ignore_path): return True
parent = os.path.dirname (dir) if parent != dir: return should_ignore_dir (base_dir, ignore_dir_list, parent) else: return False
def should_ignore_file (base_dir, ignore_dir_list, ignore_file_list, file): file = os.path.normpath (file)
for ignore_file in ignore_file_list: ignore_path = os.path.normpath (os.path.join (base_dir, ignore_file))
if fnmatch.fnmatch (file, ignore_path): return True
return should_ignore_dir (base_dir, ignore_dir_list, os.path.dirname (file))
|