Source code for climaf.cache

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""

CliMAF cache module : store, retrieve and manage CliMAF objects from their CRS expression.



"""
# Created : S.Sénési - 2014

from __future__ import print_function, division, unicode_literals, absolute_import

import six
import os.path
import re
import time
import pickle
import uuid
import hashlib
import json
from operator import itemgetter

import env
from env.environment import *
from env.clogging import clogger
from env.utils import get_subprocess_output
from climaf.utils import Climaf_Cache_Error, Climaf_Error
from climaf.classes import compare_trees, cobject, cdataset, guess_projects, allow_error_on_ds, ds, cens
from climaf.cmacro import crewrite
from climaf import __path__ as cpath

# Can be False, "by_crs" or anything else. 'by_crs' means key=CRS; else means key=hash
handle_cvalues = 'by_hash'
cvalues = dict()
#: The length for truncating the hash value of CRS expressions when forming cache filenames
fileNameLength = 60
#: Define whether we try to have safe naming of cache objects using adaptative filename length
safe = False
#: The length of subdir names when segmenting cache filenames
directoryNameLength = 5
#: The index associating filenames to CRS expressions
crs2filename = dict()
#: The dictionary associating CRS expressions to their evaluation
crs2eval = dict()
#: The list of crs which file has been dropped since last synchronisation between in-memory index and file index
#  (or at least since the beginning of the session)
dropped_crs = list()

#: A dict containing cache index entries (as listed in index file), which
# were up to now not interpretable, given the set of defined projects
crs_not_yet_evaluable = dict()
dic_special = dict()


def setNewUniqueCache(path, raz=True):
    """
    Define PATH as the sole cache to use from now. And clear it
    """

    path = os.path.expanduser(path)
    env.environment.cachedirs = [path]  # The list of cache directories
    env.environment.cacheIndexFileName = path + \
        "/index"  # The place to write the index
    env.environment.currentCache = path
    if raz:
        craz(hideError=True)


def generateUniqueFileName(expression, format="nc", option="new", create_dirs=True):
    """
    Generate a filename path from string EXPRESSION and FILEFORMAT,
    almost unique for the expression and the cache directory

    This uses hashlib.sha224, which are truncated to fileNameLength.

    Generated names drive a structure where each directory name
    has dirNameLength characters

    If the filename already exists and is a symbolic link, check that the target
    actually exists, otherwise delete the link
    """
    #
    if format is None:
        return ""
    else:
        prefix = ""
        vhash = hashlib.sha224(expression.encode("utf-8")).hexdigest()
        rep = hash_to_path(vhash, format, option=option, prefix=prefix)
        # Create the relevant directory, so that user scripts don't have to care
        if create_dirs:
            dirn = os.path.dirname(rep)
            if not os.path.exists(dirn):
                try:
                    os.makedirs(dirn)
                except OSError:
                    if os.path.exists(dirn):
                        # Happens when two concurrent CliMAF are
                        # creating numerous cache files
                        pass
                    else:
                        raise Climaf_Cache_Error("Cannot create dir " % dirn)
        if os.path.islink(rep) and not os.path.exists(os.path.realpath(rep)):
            os.remove(rep)
        clogger.debug("returning %s" % rep)
        return rep


def hash_to_path(vhash, format, option="new", prefix=""):
    if option in ["new", ]:
        rep = os.sep.join([env.environment.currentCache,
                          prefix + vhash[0:2], vhash[2:]])
    else:
        rep = os.sep.join([env.environment.currentCache,
                           prefix + stringToPath(vhash[0:fileNameLength - 1], directoryNameLength)])
    rep = ".".join([rep, format])
    rep = os.path.expanduser(rep)
    return rep


def alternate_filename(fpath):
    """
    The cache tree structure has been changed from some CliMAF version.
    Function alternate_filename, when provided with an old form path,
    returns a new form path (and vice versa)
    Useful for handling transition between cache structures
    """
    # Get file format
    format = fpath.split(".")[-1]
    # Remove cache root location prefix
    relative_fpath = fpath[len(env.environment.currentCache) + 1:]
    # Get name without slashes nor extension
    vhash = relative_fpath.replace("/", "").split(".")[0]
    #
    # Test if new path, i.e. with two letters before first "/"
    if relative_fpath[2] == "/":
        option = "old"
    else:
        option = "new"
    rep = hash_to_path(vhash, format, option)
    return rep


def stringToPath(name, length):
    """ Breaks NAME to a path with LENGTH characters-long directory names , for avoiding crowded directories"""
    len_name = len(name)
    rep = ""
    i = 0
    while i + length < len_name:
        rep = rep + name[i:i + length] + "/"
        i += length
    rep += name[i:len_name]
    return rep


def searchFile(path):
    """ Search for first occurrence of PATH as a path in all
    directories listed in CACHEDIRS
    """
    for cdir in env.environment.cachedirs:
        candidate = os.path.expanduser(cdir + "/" + path)
        if os.path.lexists(candidate):
            # If this is a broken link, delete it ~ silently and return None
            if not os.path.exists(candidate):
                clogger.debug("Broken link for %s was deleted" % candidate)
                os.remove(candidate)
                return None
            return candidate


def register(filename, crs, costs, outfilename=None):
    """
    Adds in FILE a metadata named 'CRS_def' and with value CRS, and a
    metadata 'CLiMAF' with CliMAF version and ref URL

    Records this FILE in dict crs2filename

    If OUTFILENAME is not None, FILENAME is a temporary file and
    it is OUTFILENAME which is recorded in dict crs2filename

    Silently skip non-existing files
    """
    def do_move(crs, filename, outfilename):
        if outfilename is None:
            clogger.info("%s registered as %s" % (crs, filename))
            crs2filename[crs] = (filename, costs)
            if crs in dropped_crs:
                dropped_crs.remove(crs)
            return True
        else:
            cmd = 'mv -f %s %s ' % (filename, outfilename)
            if os.system(cmd) == 0:
                clogger.info("moved %s as %s " % (filename, outfilename))
                clogger.info("%s registered as %s" % (crs, outfilename))
                crs2filename[crs] = (outfilename, costs)
                if crs in dropped_crs:
                    dropped_crs.remove(crs)
                return True
            else:
                # clogger.critical("cannot move by" % cmd)
                raise Climaf_Cache_Error(
                    "cannot move (possibly after stamping) by %s" % cmd)

    global dropped_crs
    #
    # It appears that we have to let some time to the file system  for updating its inode tables
    waited = 0
    while waited < 50 and not os.path.exists(filename):
        time.sleep(0.1)
        waited += 1
    if not os.path.exists(filename):
        raise Climaf_Cache_Error("File %s wasn't created upstream (or not quick enough). It represents %s" %
                                 (filename, crs))
    else:
        if stamping is False:
            clogger.debug('No stamping')
            return do_move(crs, filename, outfilename)
        else:
            # while time.time() < os.path.getmtime(filename) + 0.2 : time.sleep(0.2)
            if re.findall(".nc$", filename) and ncatted_software is not None:
                command = "%s -h -a CRS_def,global,o,c,\"%s\" -a CliMAF,global,o,c,\"CLImate Model Assessment " \
                          "Framework version %s (http://climaf.rtfd.org)\" %s" % (ncatted_software, crs, climaf_version,
                                                                                  filename)
            elif re.findall(".png$", filename) and convert_software is not None:
                crs2 = crs.replace(r"%", r"\%").replace(r'"', r'\"')
                command = "%s -set \"CRS_def\" \"%s\" -set \"CliMAF\" " \
                          "\"CLImate Model Assessment Framework version " \
                          "%s (http://climaf.rtfd.org)\" %s %s.png && mv -f %s.png %s" % \
                          (convert_software, crs2, climaf_version,
                           filename, filename, filename, filename)
            elif re.findall(".pdf$", filename) and pdftk_software is not None:
                tmpfile = str(uuid.uuid4())
                command = "%s %s dump_data output %s && echo -e \"InfoBegin\nInfoKey: Keywords\nInfoValue: %s\" " \
                          ">> %s && %s %s update_info %s output %s.pdf && mv -f %s.pdf %s && rm -f %s" % \
                          (pdftk_software, filename, tmpfile, crs, tmpfile, pdftk_software, filename, tmpfile, filename,
                           filename, filename, tmpfile)
            elif re.findall(".eps$", filename) and exiv2_software is not None:
                command = '%s -M"add Xmp.dc.CliMAF CLImate Model Assessment Framework version %s ' \
                          '(http://climaf.rtfd.org)" -M"add Xmp.dc.CRS_def %s" %s' % \
                          (exiv2_software, climaf_version, crs, filename)
            else:
                command = None
            if command is None:
                if stamping is None:
                    clogger.warning(
                        "Command is None and stamping is None. No stamping done.")
                    return do_move(crs, filename, outfilename)
                elif stamping is True:
                    raise Climaf_Cache_Error("Cannot stamp by command None. "
                                             "You may set climaf.cache.stamping to False or None - see doc\n%s" %
                                             command)
            else:
                clogger.debug("trying stamping by %s" % command)
                command_return = os.system(command)
                if command_return == 0 or stamping is not True:
                    return do_move(crs, filename, outfilename)
                if command_return != 0:
                    if stamping is True:
                        raise Climaf_Cache_Error("Cannot stamp by command below. "
                                                 "You may set climaf.cache.stamping to False or None - see doc\n%s" %
                                                 command)
                    elif stamping is None:
                        clogger.critical("Cannot stamp by %s" % command)
                        return True


def getCRS(filename):
    """ Returns the CRS expression found in FILENAME's meta-data"""
    if re.findall(".nc$", filename):
        form = 'ncdump -h %s | grep -E "CRS_def *=" | ' + \
               'sed -r -e "s/.*:CRS_def *= *\\\"(.*)\\\" *;$/\\1/" '
    elif re.findall(".png$", filename):
        form = 'identify -verbose %s | grep -E " *CRS_def: " | sed -r -e "s/.*CRS_def: *//"'
    elif re.findall(".pdf$", filename):
        form = 'pdfinfo %s | grep "Keywords" | awk -F ":" \'{print $2}\' | sed "s/^ *//g"'
    elif re.findall(".eps$", filename):
        form = 'exiv2 -p x %s | grep "CRS_def" | awk \'{for (i=4;i<=NF;i++) {print $i " "} }\' '
    else:
        clogger.error("unknown filetype for %s" % filename)
        return None
    command = form % filename
    try:
        rep = get_subprocess_output(command, to_replace=[("\n", "")])
        if (rep == "") and ('Empty.png' not in filename):
            clogger.error("file %s is not well formed (no CRS)" % filename)
        if re.findall(".nc$", filename):
            rep = rep.replace(r"\'", r"'")
    except:
        rep = "failed"
    clogger.debug("CRS expression read in %s is %s" % (filename, rep))
    return rep


def rename(filename, crs):
    """ Rename FILENAME to match CRS. Also updates crs in file and
    crs2filename """
    newfile = generateUniqueFileName(crs, format="nc")
    if newfile:
        costs = 0.
        for c in [f for f in crs2filename if crs2filename[f][0] in [filename, alternate_filename(filename)]]:
            _, costs = crs2filename.pop(c)
        os.rename(filename, newfile)
        register(newfile, crs, costs)
        return newfile


def hasMatchingObject(cobject, ds_func):
    """
    If the cache holds a file which represents an object with the
    same nodes as COBJECT and which leaves/datasets, when paired with
    those of COBJECT and applying ds_func, returns an identical (and not
    None) value for all pairs, then returns its filename, its CRS and
    this value (for the first one in dict crs2filename)

    Can be applied for finding same object with included or including
    time-period
    """

    # First read index from file if it is yet empty - No : done at startup
    # if len(crs2filename.keys()) == 0 : cload()
    def op_squeezes_time(operator):
        return not cscripts[operator].flags.commuteWithTimeConcatenation

    #
    global crs2eval
    key_to_rm = list()
    for crs in crs2filename:
        # First, basic, screening
        if crs.split("(")[0] != cobject.crs.split("(")[0]:
            continue
        co = crs2eval.get(crs, None)
        if co is None:
            try:
                co = eval(crs, sys.modules['__main__'].__dict__)
            except:
                continue  # usually case of a CRS which project is not currently defined
        if co:
            crs2eval[crs] = co
            clogger.debug("Compare trees for %s and %s" % (crs, cobject.crs))
            altperiod = compare_trees(co, cobject, ds_func, op_squeezes_time)
            if altperiod:
                f, costs = crs2filename[crs]
                if os.path.exists(f) or os.path.exists(alternate_filename(f)):
                    return co, altperiod
                else:
                    clogger.debug(
                        "Removing %s from cache index, because file is missing", crs)
                    key_to_rm.append(crs)
    for el in key_to_rm:
        crs2filename.pop(el)
    return None, None


def hasIncludingObject(cobject):
    def ds_period_difference(includer, included):
        if includer.buildcrs(period="") == included.buildcrs(period=""):
            return includer.period.includes(included.period)

    clogger.debug("search for including object for " + repr(cobject))
    return hasMatchingObject(cobject, ds_period_difference)


def hasBeginObject(cobject):
    def ds_period_begins(begin, longer):
        if longer.buildcrs(period="") == begin.buildcrs(period=""):
            return longer.period.start_with(begin.period)

    return hasMatchingObject(cobject, ds_period_begins)


def hasExactObject(cobject):
    i = 0
    found = False
    formats_to_test = known_formats + graphic_formats
    f = None
    while not found and i < len(formats_to_test):
        f = generateUniqueFileName(
            cobject.crs, format=formats_to_test[i], create_dirs=False)
        if os.path.exists(f):
            found = True
        else:
            f = alternate_filename(f)
            if os.path.exists(f):
                found = True
            else:
                i += 1
    if found and f is not None:
        crs = cobject.crs
        # if isinstance(cobject, cdataset):
        #    crs = "select(" + crs + ")"
        if crs not in crs2filename:
            clogger.warning("Next object exists in cache but was not "
                            "registered in index. Assuming its compute cost is zero. "
                            "Use cdrop if this is inadequate. Object %s" % crs)
            crs2filename[crs] = (f, compute_cost())
        return crs2filename[crs]
        # return f
    else:
        if cobject.crs in crs2filename:
            clogger.debug(
                "Dropping cobject.crs from cache index, because file is missing")
            crs2filename.pop(cobject.crs)
        return None, compute_cost()


def complement(crsb, crse, crs):
    """ Extends time period of file object of CRSB (B for 'begin')
    with file object of CRSE (E for 'end') for creating file object of
    CRS. Assumes that everything is OK with args compatibility and
    file contents
    """
    fileb, costsb = crs2filename[crsb]
    if not os.path.exists(fileb):
        fileb = alternate_filename(fileb)
    filee, costse = crs2filename[crse]
    if not os.path.exists(filee):
        filee = alternate_filename(filee)
    filet = generateUniqueFileName(crs)
    costs = compute_cost()
    costs.add(costsb)
    costs.add(costse)
    tim1 = time.time()
    command = "ncrcat -O %s %s %s" % (fileb, filee, filet)
    duration = time.time() - tim1
    costs.increment(duration)
    if os.system(command) != 0:
        clogger.error("Issue when merging %s and %s in %s (using command:%s)" % (
            crsb, crse, crs, command))
        return None, costs
    else:
        cdrop(crsb)
        cdrop(crse)
        register(filet, crs, costs)
        return filet, costs


[docs]def cdrop(obj, rm=True, force=False): """ Deletes the cached file for a CliMAF object, if it exists Args: obj (cobject or string) : object to delete, or its string representation (CRS) force (bool) : should we delete the object even if it is 'protected' rm (bool) : for advanced use only; should we actually delete (rm) the file, or just forget it in CliMAF cache index Returns: None if object does not exists, False if failing to delete, True if OK Example :: >>> dg=ds(project='example', simulation='AMIPV6ALB2G', variable='tas', period='1980-1981') >>> f=cfile(dg) >>> os.system('ls -al '+f) >>> cdrop(dg) """ global crs2filename global dropped_crs if isinstance(obj, cobject): crs = repr(obj) elif isinstance(obj, six.string_types): crs = str(obj) else: clogger.error("%s is not a CliMAF object" % repr(obj)) return if crs in crs2filename: clogger.info( "Discarding cached value for %s (except if protected)" % crs) fil, _ = crs2filename[crs] if not os.path.exists(fil): fil = alternate_filename(fil) else: # In case the cache index is not up-to-date fil, cost = hasExactObject(obj) if fil: crs2filename[crs] = (fil, compute_cost()) if fil: if rm: try: if force: os.system("chmod +w " + fil) if not os.access(fil, os.W_OK): clogger.info("Object %s is protected" % crs) return path_file = os.path.dirname(fil) os.remove(fil) crs2filename.pop(crs) dropped_crs.append(crs) try: os.rmdir(path_file) except OSError: pass return True except: clogger.warning( "When trying to remove %s : file does not exist in cache" % crs) return False else: clogger.info("%s is not cached" % crs) return None
[docs]def cprotect(obj, stop=False): """ Protects the cache file for a given object (or stops protection with arg 'stop=True'). In order to erase it, argument 'force=True' must then be used with function :py:func:`~climaf.cache.craz` or :py:func:`~climaf.cache.cdrop` """ if isinstance(obj, cobject): crs = repr(obj) if isinstance(obj, cdataset): crs = "select(" + crs + ")" elif isinstance(obj, six.string_types): crs = obj else: clogger.error("%s is not a CliMAF object" % repr(obj)) return if crs in crs2filename: f, _ = crs2filename[crs] if not os.path.exists(f): f = alternate_filename(f) if stop is False: clogger.info("Protecting cached value for " + crs) os.system("chmod -w " + f) else: clogger.info("Stopping protection on cached value for " + crs) os.system("chmod +w " + f) return else: clogger.info("%s is not (yet) cached; use cfile() to cache it" % crs)
[docs]def csync(update=False): """ Merges current in-memory cache index and current on-file cache index for updating both If arg `update` is True, additionally ensures consistency between files set and index content, either : - if cache.stamping is true, by reading CRS in all files - else, by removing files which are not in the index; this may erase result files which have been computed by another running instance of CliMAF """ # import pickle global cacheIndexFileName global dropped_crs global crs2filename # Merge index on file and index in memory file_index = cload(True) for crs in dropped_crs: file_index.pop(crs, None) crs2filename.update(file_index) # check if cache index is up to date; if not enforce consistency if update: clogger.info("Listing crs from files present in cache") files_in_cache = list_cache() files_in_cache.sort() files_costs = list(crs2filename.values()) files_in_index = [f[0] for f in files_costs] files_in_index.sort() if files_in_index != files_in_cache: if stamping: clogger.info("Rebuilding cache index from file content") rebuild() else: clogger.warning('In no stamp mode, there is no way to seriously ' 'identify CRS from files in cache !') # clogger.warning('Removing cache files which content is not known. # This is an issue in concurrent mode !') # for fil in files_in_cache : # if fil not in files_in_index : # os.system("rm %"%fil) # else : # Should also remove empty files, as soon as # file creation will be atomic enough # Save index to disk fn = os.path.expanduser(env.environment.cacheIndexFileName) with open(fn, "wb") as cacheIndexFile: pickle.dump(crs2filename, cacheIndexFile) dropped_crs = list()
def cload(alt=None): global crs2filename global crs_not_yet_evaluable rep = dict() if len(crs2filename) != 0 and not alt: raise Climaf_Cache_Error( "attempt to reset cache index - would lead to inconsistency !") cacheFilen = os.path.expanduser(env.environment.cacheIndexFileName) if not os.path.exists(cacheFilen): clogger.debug("no index file yet") return {} with open(cacheFilen, "rb") as cacheIndexFile: rep = pickle.load(cacheIndexFile) for c in rep: f = rep[c] if type(f) is tuple: f, costs = f else: costs = compute_cost() if len(f.split("/")[-2]) == directoryNameLength: f = alternate_filename(f) rep[c] = (f, costs) # if alt: return rep else: crs2filename = rep # must_check_index_entries = False if must_check_index_entries: # We may have some crs inherited from past sessions and for which # some operator may have become non-standard, or some projects are yet # undeclared crs_not_yet_evaluable = dict() allow_error_on_ds() for crs in list(crs2filename.keys()): try: # print "evaluating crs="+crs eval(crs, sys.modules['__main__'].__dict__) except: print("Inconsistent cache object is skipped : %s" % crs) # clogger.debug("Inconsistent cache object is skipped : %s"%crs) p = guess_projects(crs) if p not in crs_not_yet_evaluable: crs_not_yet_evaluable[p] = dict() crs_not_yet_evaluable[p][crs] = crs2filename[crs] crs2filename.pop(crs) # Analyze projects of inconsistent cache objects projects = list(crs_not_yet_evaluable) if projects: clogger.info( "The cache has %d objects for non-declared projects %s.\n" "For using it, consider including relevant project(s) " "declaration(s) in ~/.climaf and restarting CliMAF.\n" "You can also declare these projects right now and call 'csync(True)'\n" "Or you can erase corresponding data by 'crm(pattern=...project name...)'" % (len(crs_not_yet_evaluable), repr(list(projects)))) allow_error_on_ds(False) def cload_for_project(project): """ Append to the cache index dict those left index entries for 'project' which evaluate successfully """ d = crs_not_yet_evaluable[project] for crs in d.copy(): try: # print "evaluating crs="+crs eval(crs, sys.modules['__main__'].__dict__) crs2filename[crs] = d[crs] d.pop(crs) except: clogger.error( "CRS expression %s is not valid for project %s" % (crs, project))
[docs]def craz(force=False, hideError=False): """ Clear CliMAF cache : erase existing files content, reset in-memory index Args: force (bool): should we erase also all 'protected' files hideError (bool): if True, will not warn for non existing cache """ global crs2filename cc = os.path.expanduser(env.environment.currentCache) if os.path.exists(env.environment.currentCache) or hideError is False: if force: os.system("chmod -R +w " + cc) os.system("rm -fR " + cc + "/*") crs2filename = dict() else: list_of_crs = list(crs2filename) for crs in list_of_crs: if cdrop(crs): clogger.debug('Removed file: %s', generateUniqueFileName(crs)) else: clogger.debug( 'Could not remove file (either not existing or protected): %s', crs2filename[crs]) clogger.debug('Associated CRS : %s', crs)
# os.system("ls " + cc) def cdump(use_macro=True): """ List the in-memory content of CliMAF cache index. Interpret it using macros except if arg use_macro is False """ for crs in crs2filename: if not use_macro: # No interpretation by macros # print "%s : %s"%(crs2filename[crs][-30:],crs) print("%s : %s" % (crs2filename[crs], crs)) else: # Must update for new macros print("%s : %s" % (crs2filename[crs], crewrite(crs))) def list_cache(): """ Return the list of files in cache directories, using `find` """ find_return = "" for dir_cache in env.environment.cachedirs: rep = os.path.expanduser(dir_cache) filter = r" \( -name '*.png' -o -name '*.nc' -o -name '*.pdf' -o -name '*.eps' \) " with os.popen(r"find %s -type f " % rep + filter + " -print") as fil: find_return += fil.read() files_in_cache = find_return.split('\n') files_in_cache.pop(-1) return files_in_cache
[docs]def clist(size="", age="", access=0, pattern="", not_pattern="", usage=False, count=False, remove=False, CRS=False, special=False): """ Internal function used by its front-ends : :py:func:`~climaf.cache.cls`, :py:func:`~climaf.cache.crm`, :py:func:`~climaf.cache.cdu`, :py:func:`~climaf.cache.cwc` List the content of CliMAF cache according to some search criteria and operate possibly an action (usage, count or remove) on this list. Please consider the cost and benefit of first updating CliMAF cache index (by scanning files on disk) using :py:func:`csync()` Args: size (string, optional): n[ckMG] Search files using more than n units of disk space, rounding up. The following suffixes can be used: - "c" for bytes (default) - "k" for Kilobytes (units of 1,024 bytes) - "M" for Megabytes (units of 1,048,576 bytes) - "G" for Gigabytes (units of 1,073,741,824 bytes) age (string, optional): Number of 24h periods. Search files which status was last changed n*24 hours ago. Any fractional part is ignored, so to match age='+1', a file has to have been changed at least two days ago. Numeric arguments can be specified as: - `+n` for greater than n - `-n` for less than n, - `n` for exactly n. access (int, optional): n Search files which were last accessed more than n*24 hours ago. Any fractional part is ignored, so to match access='1', a file has to have been accessed at least two days ago. pattern (string, optional): Scan through crs and filenames looking for the first location where the regular expression pattern produces a match. not_pattern (string, optional): Scan through crs and filenames looking for the location where the regular expression not_pattern does not produce a match. usage (bool, optional): Estimate found files space usage, for each found file and total size. If count is True, estimate only found files total space usage. count (bool, optional): Return the number of found files. If CRS is True, also return crs of found files. remove (bool, optional): Remove the found files. This argument is exclusive. CRS (bool, optional): if True, print also CRS expression. Useful only if count is True. Return: The dictionary corresponding to the request and associated action ( or dictionary of CliMAF cache index if no argument is provided) Example to search files using more than 3M of disk space, which status was last changed more than 15 days ago and containing the pattern '1980-1981' either in crs or filename. For found files, we want to estimate only found files total space usage:: >>> clist(size='3M', age='+15', pattern= '1980-1981', usage=True, count=True) """ # cache directories # TBD: le cache ne contient qu un rep pr le moment => voir pour boucler sur tous les caches rep = os.path.expanduser(env.environment.cachedirs[0]) # command for research on size/age/access opt_find = "" if size: if re.search('[kMG]', size) is None: opt_find += "-size +%sc " % size else: opt_find += "-size +%s " % size if age: opt_find += "-ctime %s " % age if access != 0: opt_find += "-atime +%s" % str(int(access)) var_find = False if size or age or access != 0: var_find = True command = r"find %s -type f \( -name '*.png' -o -name '*.nc' -o -name '*.pdf' -o -name '*.eps' \) %s -print" % \ (rep, opt_find) clogger.debug("Find command is :" + command) # construction of the new dictionary after research on size/age/access new_dict = dict() find_return = os.popen(command).read() list_search_files_after_find = find_return.split('\n') list_search_files_after_find.pop(-1) clogger.debug("List of search files: " + repr(list_search_files_after_find)) # Search CRS for each found file for filen in list_search_files_after_find: for crs in crs2filename: if crs2filename[crs][0] == filen: new_dict[crs] = crs2filename[crs] if len(new_dict) != 0: if new_dict != crs2filename: clogger.debug( "Dictionary after find for size/age/access: " + repr(new_dict)) else: clogger.debug( "Size/age/access criteria do not lead to any filtering") else: clogger.debug("No file meet the size/age/access criteria") else: new_dict = crs2filename.copy() # size of new dictionary len_new_dict = len(new_dict) # filter on pattern find_pattern = False if pattern: list_crs_to_rm = list() for crs in new_dict: try: if re.search(pattern, crewrite(crs)) or re.search(pattern, new_dict[crs][0]): clogger.debug("Pattern found in %s: %s" % (crs, new_dict[crs][0])) find_pattern = True else: # Do not remove now from new_dict, because we loop on it list_crs_to_rm.append(crs) except: print("bad type for arguments to re.search : ", crewrite(crs), type(crewrite(crs)), new_dict[crs][0], type(new_dict[crs][0])) for crs in list_crs_to_rm: del new_dict[crs] if find_pattern: clogger.debug( "Dictionary after search for pattern: " + repr(new_dict)) elif len_new_dict != 0: clogger.debug("No string found for pattern => no result") # update size new dictionary len_new_dict = len(new_dict) # research on not_pattern find_not_pattern = False if not_pattern: list_crs_to_rm = [] for crs in new_dict: if re.search(not_pattern, crewrite(crs)) is None and \ re.search(not_pattern, new_dict[crs][0]) is None: clogger.debug("Pattern not found in %s: %s" % (crs, new_dict[crs][0])) find_not_pattern = True else: list_crs_to_rm.append(crs) for crs in list_crs_to_rm: del new_dict[crs] if find_not_pattern: clogger.debug( "Dictionary after search for not_pattern: " + repr(new_dict)) elif len_new_dict != 0: clogger.debug("All strings contain not_pattern => no result") # update size new dictionary len_new_dict = len(new_dict) # request on new dictionary through usage, count and remove work_dic = new_dict if (var_find or pattern != "" or not_pattern != "") else crs2filename if usage is True and len_new_dict != 0: # construction of a dictionary containing crs and disk-usage associated dic_usage = dict() tmp = "" for crs in work_dic: if isinstance(work_dic[crs], tuple): tmp += work_dic[crs][0] + " " if isinstance(work_dic[crs], str): tmp += work_dic[crs] + " " res = os.popen("du -sc %s" % tmp).read() regex = re.compile('([0-9]+)\t') list_size = re.findall(regex, res) regex2 = re.compile('([0-9]+\t)') str_path = regex2.sub('', res) list_fig = str_path.split('\n') list_fig.pop(-1) for fig, size in zip(list_fig, list_size): if fig != "total": for crs in work_dic: if fig == work_dic[crs]: dic_usage[crs] = size else: dic_usage[fig] = size # sort of usage dictionary and units conversion du_list_sort = list(dic_usage.items()) du_list_sort.sort(key=itemgetter(1), reverse=False) unit = ["K", "M", "G", "T"] for n, pair in enumerate(du_list_sort): i = 0 flt = float(pair[1]) while flt >= 1024. and i < 4: flt /= 1024. i += 1 du_list_sort[n] = (du_list_sort[n][0], "%6.1f%s" % (flt, unit[i])) if count is True: # Display total volume of found files for fig, size in du_list_sort: if fig == "total": print("%7s : %s" % (size, fig)) else: # retrieve disk-usage of each found file and total volume for fig, size in du_list_sort: print("%7s : %s" % (size, fig)) elif count is True and len_new_dict != 0: print("Number of files found:", len(work_dic)) if CRS is True: for crs in work_dic: print(crs) elif remove is True and len_new_dict != 0: print("Removed files:") if var_find or pattern != "" or not_pattern != "": list_tmp_crs = list(new_dict) else: list_tmp_crs = list(crs2filename) for crs in list_tmp_crs: cdrop(crs, rm=True) return list(map(crewrite, list_tmp_crs)) else: # usage, count and remove are False if var_find or pattern != "" or not_pattern != "": if len(new_dict) != 0: if new_dict != crs2filename: print("Filtered objects :") else: print("Filtered objects = cache content") return list(map(crewrite, new_dict)) # else : print "No matching file " else: print("Content of CliMAF cache") return list(map(crewrite, crs2filename)) # TBD if special is True: global dic_special dic_special = dict() if var_find is True or pattern != "" or not_pattern != "": dic_special = new_dict.copy() else: dic_special = crs2filename.copy() print("List of marked figures as 'special'", list(dic_special.values())) return dic_special # TBD: declarer comme var globale et enlever son effacement dans creset new_dict.clear()
[docs]def cls(**kwargs): """ List CliMAF cache objects. Synonym to clist(). See :py:func:`~climaf.cache.clist` """ return clist(**kwargs)
[docs]def crm(**kwargs): """ Remove the cache files found by 'clist()' when using same arguments. See :py:func:`~climaf.cache.clist` Example to remove files using more than 3M of disk space, which status was last changed more than 15 days ago and containing the pattern '1980-1981' either in crs or filename:: >>> crm(size='3M', age='+15', pattern='1980-1981') """ kwargs['remove'] = True kwargs['usage'] = False kwargs['count'] = False return clist(**kwargs)
[docs]def cdu(**kwargs): """ Report disk usage, for files matching some criteria, as specified for :py:func:`~climaf.cache.clist`. With count=True, report only total disk usage. Example to search files using more than 3M of disk space, which status was last changed more than 15 days ago and containing the pattern '1980-1981' either in crs or filename. For found files, we want to estimate only found files total space usage:: >>> cdu(size='3M', age='+15', pattern= '1980-1981', count=True) """ kwargs['usage'] = True kwargs['remove'] = False return clist(**kwargs)
[docs]def cwc(**kwargs): """ Report number of cache files matching some criteria, as specified for :py:func:`~climaf.cache.clist`. If CRS is True, also return CRS expression of found files. Example to return the number and crs associated of files using more than 3M of disk space, which status was last changed more than 15 days ago and containing the pattern '1980-1981' either in crs or filename:: >>> cwc(size='3M', age='+15', pattern= '1980-1981', CRS=True) """ kwargs['count'] = True kwargs['remove'] = False kwargs['usage'] = False return clist(**kwargs)
def rebuild(): """ Rebuild the in-memory content of CliMAF cache index """ global crs2filename if not stamping: clogger.warning( "Cannot rebuild cache index, because we are not in 'stamping' mode") return None files_in_cache = list_cache() crs2filename.clear() for files in files_in_cache: filecrs = getCRS(files) if filecrs: crs2filename[filecrs] = (files, compute_cost()) else: os.system('rm -f ' + files) clogger.warning("File %s is removed" % files) return crs2filename def store_cvalue(crs, index, value, costs): """ Stores a scalar, as computed by cvalue, in a scalars cache """ if handle_cvalues is not False: if handle_cvalues in ["by_crs", ]: key = hashlib.sha224( (crs + "%d" % index).encode("utf-8")).hexdigest() else: key = crs + "[%d]" % index cvalues[key] = value, costs def has_cvalue(crs, index): """ Returns a scalar, as computed by cvalue, from the scalars cache (or None if not cached) """ if handle_cvalues is not False: if handle_cvalues in ["by_crs", ]: key = hashlib.sha224( (crs + "%d" % index).encode("utf-8")).hexdigest() else: key = crs + "[%d]" % index value = cvalues.get(key, None) if value is not None: if not type(value) is tuple: return value, compute_cost() else: return value return None, compute_cost() def load_cvalues(): """ Load in memory the cache of 'cvalue' scalars, from json file cvalues.json """ global cvalues if handle_cvalues is not False: cache_file = os.sep.join( [env.environment.currentCache, "cvalues.json"]) if os.path.exists(cache_file): with open(cache_file, "r") as f: tmp = json.load(f) cvalues = decode_index(tmp) del tmp def sync_cvalues(): """ Read the on-disk of 'cvalue' scalars, update it with the in-memory one and write it in json file cvalues.json """ global cvalues if handle_cvalues is not False: currentCache = env.environment.currentCache if not os.path.isdir(currentCache): os.makedirs(currentCache) ccache = os.path.sep.join([currentCache, "cvalues.json"]) # if os.path.exists(ccache): # get pre-existing on-disk content with open(ccache, "r") as f: encoded = json.load(f) onfile = decode_index(encoded) del encoded onfile.update(cvalues) del cvalues cvalues = onfile tmp = ccache.replace(".json", ".tmp") with open(tmp, "w") as f: tofile = encode_index(cvalues) json.dump(tofile, f, separators=(',', ': '), indent=3, ensure_ascii=True) del tofile os.rename(tmp, ccache) def raz_cvalues(): """ Clear in-memory and on-disk cache of 'cvalue' scalars """ global cvalues if handle_cvalues is not False: cvalues = dict() sync_cvalues() class compute_cost(object): """ Handling compute costs for objects : - tc : a cost for all compute operations involved in object's genesis - lc : another cost, for the last( top-level) operation """ def __init__(self, cost=0., total_cost=None): """ Create a compute_cost object. COST -> last_operation_cost TOTAL_COST -> total cost, defaukts to COST """ # Attribute names are kept compact for saving on index size # through pickle.dump self.lc = cost if total_cost is None: total_cost = cost self.tc = total_cost def __repr__(self): return "total cost : %.1f s, last operation : %.1f s" % (self.tc, self.lc) def add(self, other): """Just sum up total costs with another cost object. Also sum up last operation costs, but this is questionnable, as you could consider that summing up costs implies there is some new 'last operation' around """ self.tc = self.tc + other.tc self.lc = self.lc + other.lc def increment(self, cost): """Assuming that COST is the cost of a new 'last operation', updates object for representing that""" self.tc = self.tc + cost self.lc = cost
[docs]def ccost(cobject): """Provide the compute costs (elapsed time) for an object Returned value is of class cost, which has fields tc (for total_cost) and lc (for last_op_cost): - tc is the sum of all compute operations that lead to the object - lc is the cost for the last operation (top level) When a single operation provides multiple outputs, all outputs are charged with the cost of that operation (cost is not shared). This may lead to an overestimated cost when such outputs are re-used together in further operations """ if cobject.crs in crs2filename: return crs2filename[cobject.crs][1] else: if type(cobject) is cens: cost = compute_cost() for member in cobject: if cobject[member].crs in crs2filename: cost.add(crs2filename[cobject[member].crs][1]) else: raise Climaf_Error( "At least one ensemble member is not cached (%s)" % member) return cost else: clogger.warning("Object is not (yet) cached; try cfile()") return None
def encode_index(values): """ Encode a dict like crs2filename, i.e. a dict of crs : (filename, cost) in a structure compatible with Json encoding""" ret = dict() for crs in values: if type(values[crs]) is tuple: fn, cost = values[crs] else: fn = values[crs] cost = 0. ret[crs] = (fn, cost.lc, cost.tc) return ret def decode_index(values): """Decode a dict of triplets in a a dict like crs2filename, i.e. a dict of crs : (filename, cost) """ ret = dict() for crs in values: if type(values[crs]) is list: fn, last_op_cost, total_cost = values[crs] else: fn = values[crs] last_op_cost, total_cost = 0., 0. ret[crs] = (fn, compute_cost(last_op_cost, total_cost)) return ret