Source code for climaf.driver

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CliMAF driver

There is quite a lot of things to document here. Maybe at a later stage ....

"""
from __future__ import print_function, division, unicode_literals, absolute_import

# Created : S.Senesi - 2014

import re
import time
import shutil
import copy
from string import Template
import tempfile
from datetime import datetime
from functools import reduce
from six import string_types

import warnings
from xarray import open_dataset as xr_open_dataset
import subprocess

from climaf.dataloc import remote_to_local_filename
from climaf.utils import Climaf_Driver_Error, Climaf_Error

# Climaf modules
from env.environment import *
from climaf.operators_scripts import scriptFlags
from climaf.operators_derive import is_derived_variable, derived_variable, derive
from climaf import classes
from climaf.cache import compute_cost, hasExactObject, cdrop, hasIncludingObject, hasBeginObject, complement, \
    generateUniqueFileName, register, rename, has_cvalue, store_cvalue
from climaf.cmacro import instantiate
from env.clogging import clogger, indent as cindent, dedent as cdedent
from climaf.netcdfbasics import varOfFile, varsOfFile
from climaf.period import init_period, merge_periods
from climaf.classes import allow_errors_on_ds_call, cens, varOf, ctree, scriptChild, cdataset, cpage, cpage_pdf, \
    domainOf, cobject, modelOf, simulationOf, projectOf, realmOf, gridOf
from climaf.ESMValTool_diags import call_evt_script

warnings.filterwarnings("ignore", category=DeprecationWarning)

# When evaluating an object, default behaviour is to search cache for including or begin objects
# but this could be expensive
dig_hard_into_cache = True


def capply(climaf_operator, *operands, **parameters):
    """
    Builds the object representing applying a CliMAF operator (script, function or macro) by
    calling the dedicated function.
    :param climaf_operator: a CliMAF operator (either a declared script, macro or operator)
    :param operands: operands to be passed to the climaf operator
    :param parameters: parameters to be passed to the climaf operator (not available for macros)
    :return: a list of CliMAF objects
    """
    res = None
    if operands is None or operands[0] is None and not allow_errors_on_ds_call:
        raise Climaf_Driver_Error("Operands is None for operator %s" % climaf_operator)
    opds = list(map(str, operands))

    if climaf_operator in cscripts:
        # clogger.debug("applying script %s to"%climaf_operator + `opds` + `parameters`)
        res = capply_script(climaf_operator, *operands, **parameters)
        # Evaluate object right now if there is no output to manage
        op = cscripts[climaf_operator]
        if op.outputFormat in none_formats:
            ceval(res, userflags=copy.copy(op.flags))

    elif climaf_operator in cmacros:
        if len(parameters) > 0:
            raise Climaf_Driver_Error("Macros cannot be called with keyword args")
        clogger.debug("applying macro %s to" % climaf_operator + repr(opds))
        res = instantiate(cmacros[climaf_operator], *operands)

    elif climaf_operator in operators:
        clogger.debug("applying operator %s to" % climaf_operator + repr(opds) + repr(parameters))
        res = capply_operator(climaf_operator, *operands, **parameters)

    else:
        clogger.error("%s is not a known operator nor script" % climaf_operator)

    return res


def capply_script(script_name, *operands, **parameters):
    """
    Create object for application of a script to OPERANDS with keyword PARAMETERS.
    :param script_name: name of the script to be applied
    :param operands: operands to be passed to the script
    :param parameters: parameters to be passed to the script
    :return: an object that represents the application of the script
    """
    if script_name not in cscripts:
        raise Climaf_Driver_Error("Script %s is not know. Consider declaring it with function 'cscript' %s" %
                                  script_name)
    script = cscripts[script_name]
    # if len(operands) != script.inputs_number() :
    #     raise Climaf_Driver_Error("Operator %s is "
    #                               "declared with %d input streams, while you provided %d. Get doc with 'help(%s)'"%(
    #             script_name,script.inputs_number(),len(operands), script_name ))
    #
    # Check that all parameters to the call are expected by the script
    command = script.command
    for para in parameters:
        if not(r"{%s}" % para in command) and not(r"{%s_iso}" % para in command) and para not in ['member_label', ] \
                and not para.startswith("add_"):
            raise Climaf_Driver_Error("parameter '%s' is not expected by script %s (which command is : %s)" %
                                      (para, script_name, command))
    #
    # Check that only first operand can be an ensemble
    opscopy = list(operands)
    first = opscopy[0]
    if len(opscopy) > 1:
        opscopy = opscopy[1:]
    else:
        opscopy = list()
    # Next watch dog disabled for tests !!!!!
    # if True in [isinstance(op, cens) for op in opscopy]:
    #    raise Climaf_Driver_Error("Cannot yet have an ensemble as operand except as first one")
    #
    # If first operand is an ensemble, and the script is not ensemble-capable,
    # result is the ensemble of applying the script ot each member of first operand
    # Otherwise, just call maketree
    if isinstance(first, cens) and script.flags.commuteWithEnsemble:
        # Must iterate on members
        reps = []
        order = first.order
        for label in order:
            member = first[label]
            clogger.debug("processing member %s : " % label + repr(member))
            params = parameters.copy()
            params["member_label"] = label
            reps.append(maketree(script_name, script, member, *opscopy, **params))
        return cens(dict(list(zip(order, reps))), order)
    else:
        return maketree(script_name, script, *operands, **parameters)


def maketree(script_name, script, *operands, **parameters):
    # maketree takes care of
    #  - creating a ctree object representing the application of the scripts to its operands
    #  - checking that the time period of result makes sense
    #  - computing the variable name for all outputs, using dict script.outputs
    #  - for each secondary outputs, creating an attribute of the ctree named as this output
    add_dict = dict()
    for p in [p for p in parameters if p.startswith("add_")]:
        add_dict[p] = parameters.pop(p)
    if "add_variable" in add_dict:
        defaultVariable = add_dict["add_variable"]
        parameters["add_variable"] = defaultVariable
    else:
        defaultVariable = varOf(operands[0])
    rep = ctree(script_name, script, *operands, **parameters)
    # TBD Analyze script inputs cardinality vs actual arguments
    # Create one child for each output
    # defaultPeriod=operands[0].period
    for outname in script.outputs:
        if outname is None or outname == '':
            # This is the main output
            if "%s" in script.outputs['']:
                rep.variable = script.outputs[''] % defaultVariable
            else:
                rep.variable = script.outputs['']
            template = Template(rep.variable)
            rep.variable = template.substitute(parameters)
        else:
            # This is a secondary output
            son = scriptChild(rep, outname)
            if "%s" in script.outputs[outname]:
                son.variable = script.outputs[outname] % defaultVariable
            else:
                son.variable = script.outputs[outname]
            template = Template(son.variable)
            son.variable = template.substitute(parameters)
            rep.outputs[outname] = son
            setattr(rep, outname, son)
    # Check that time period of output makes sense
    p = timePeriod(rep)
    #
    return rep


def capply_operator(climaf_operator, *operands, **parameters):
    """
    Create object for application of an internal OPERATOR to OPERANDS with keywords PARAMETERS.

    """
    clogger.error("Not yet developped - TBD")
    return None


def ceval_for_cdataset(cobject, userflags=None, format="MaskedArray", deep=None, derived_list=list(),
                       recurse_list=list()):
    """
    Evaluate a CliMAF object of kind climaf.classes.cdataset
    :param cobject:
    :param userflags:
    :param format:
    :param deep:
    :param derived_list:
    :param recurse_list:
    :return: a pair : either a filename string or MaskedArray, and a cost object
    """
    recurse_list.append(cobject.crs)
    clogger.debug("Evaluating dataset operand " + cobject.crs + " having kvp= " + repr(cobject.kvp))
    ds = cobject
    # If the dataset was not defined by its path, check that it is completely defined
    if "path" not in ds.kvp or ds.kvp["path"] in ["", None]:
        ds_ambiguous_args = ds.explore("choices")
        if len(ds_ambiguous_args) != 0:
            ds = ds.explore("resolve")
            clogger.warning("When doing a cfile on dataset %s, we had to ensure that it "
                            "is completely defined, using  method 'explore', by updating "
                            "with the following arguments: %s" % (cobject.crs,
                                                                  str(ds_ambiguous_args)))
            clogger.warning("This was at the cost of querying the file system, more or "
                            "less heavily. You may wish to add such attributes by yourself")
            # clogger.debug("After resolve for ambiguous arg, updated dataset kvp is %s",str(ds.kvp))
    cache_value, costs = hasExactObject(ds)
    if cache_value is not None:
        clogger.debug("Dataset %s exists in cache" % ds)
        cdedent()
        if format in ['file', ]:
            return cache_value, costs
        else:
            return cread(cache_value, varOf(ds)), costs
    if ds.isLocal() or ds.isCached():
        clogger.debug("Dataset %s is local or cached " % ds)
        #  if the data is local, then
        #   if the caller operator can select the data and aggregate time, and requested format is
        #     'file' return the filenames
        #   else : read the data, create a cache file for that, and recurse
        #
        # First go to derived variable evaluation if applicable
        if is_derived_variable(ds.variable, ds.project):
            if ds.variable in derived_list:
                raise Climaf_Driver_Error("Loop detected while evaluating"
                                          "derived variable " + ds.variable + " " + repr(derived_list))
            # Create the object representing applying the operation needed to derive the var
            # and return it
            derived = derive_variable(ds)
            clogger.debug("evaluating derived variable %s as %s" % (ds.variable, repr(derived)))
            derived_value, costs = ceval(derived, format=format, deep=deep, userflags=userflags,
                                         derived_list=derived_list + [ds.variable], recurse_list=recurse_list)
            if derived_value:
                clogger.debug("succeeded in evaluating derived variable %s as %s" % (ds.variable, repr(derived)))
                set_variable(derived_value, ds.variable, format=format)
            cdedent()
            return derived_value, costs
        elif noselect(userflags, ds, format) and format in ["file", ]:
            # The caller is assumed to be able to select the needed sub-period or variable
            # and to select the variable
            clogger.debug("Delivering file set or sets is OK for the target use")
            cdedent()
            rep = ds.baseFiles()
            if not rep:
                raise Climaf_Driver_Error("No file found for %s" % repr(ds))
            return rep, compute_cost()  # first element is a single string with all filenames,
            # or a list of such strings in case of ensembles
        else:
            clogger.debug("Must subset and/or aggregate and/or select " +
                          "var from data files and/or get data, or provide object result")
            if format in ['file', "MaskedArray"]:
                if ds.hasOneMember():
                    clogger.debug("Fetching/selection/aggregation is done using an external script for now - TBD")
                    extract = capply('select', ds)
                else:
                    clogger.debug("On multi-member datafiles , fetching/selection/aggregation " +
                                  "is done using select_member - TBD")
                    extract = capply('select_member', ds)
                if extract is None:
                    raise Climaf_Driver_Error("Cannot access dataset" + repr(ds))
                rep, costs = ceval(extract, userflags=userflags, format=format)
            else:
                raise Climaf_Driver_Error("Untractable output format %s" % format)
            if userflags:
                userflags.unset_selectors()
            cdedent()
            return rep, costs
    else:
        # Non-local and non-cached dataset
        #   if the user can access the dataset by one of the dataset-specific protocols
        #   then assume it can also select on time; -> just provide it with the address
        #   else : fetch the relevant selection of the data, and store it in cache
        clogger.debug("Dataset is remote ")
        if userflags.canOpendap and format == 'file':
            clogger.debug("But user can OpenDAP ")
            cdedent()
            return ds.adressOf(), compute_cost()
        else:
            if noselect(userflags, ds, format):
                # ce cas-ci n'a jamais été activé !
                clogger.debug("Delivering file set or sets is OK for the target use")
                cdedent()
                rep = ds.baseFiles()
                if not rep:
                    raise Climaf_Driver_Error("No file found for %s" % repr(ds))
                return rep, compute_cost()
            else:
                # This matches reaching data using e.g. ftp
                clogger.debug("Must remote read and cache ")
                rep, costs = ceval(capply('remote_select', ds), userflags=userflags, format=format)
                ds.files = rep
                userflags.unset_selectors()
                cdedent()
                return rep, costs


def ceval_for_ctree(cobject, userflags=None, format="MaskedArray", deep=None, derived_list=list(),
                    recurse_list=list()):
    """

    :param cobject:
    :param userflags:
    :param format:
    :param deep:
    :param derived_list:
    :param recurse_list:
    :return: either a filename string or MaskedArray, and a cost object
    """
    recurse_list.append(cobject.crs)
    clogger.debug("Evaluating compound object : " + repr(cobject))
    #################################################################
    if deep is not None:
        cdrop(cobject)
    #
    clogger.debug("Searching cache for exact object : " + repr(cobject))
    #################################################################
    filename, costs = hasExactObject(cobject)
    # filename=None
    if filename:
        clogger.info("Object found in cache: %s is at %s:  " % (cobject.crs, filename))
        cdedent()
        if format == 'file':
            return filename, costs
        else:
            return cread(filename, varOf(cobject)), costs
    if dig_hard_into_cache:
        clogger.debug("Searching cache for including object for : " + repr(cobject))
        ########################################################################
        it, altperiod = hasIncludingObject(cobject)
        # clogger.debug("Finished with searching cache for including object for : " + `cobject`)
        # it=None
        if it:
            clogger.info("Including object found in cache : %s" % it.crs)
            if format == 'file':
                clogger.info("Selecting " + repr(cobject) + " out of it")
                # Just select (if necessary for the user) the portion relevant to the request
                rep, costs = ceval_select(it, cobject, userflags, format, deep, derived_list, recurse_list)
                cdedent()
                return rep, costs
            else:
                clogger.debug("Because out format %s is not (yet, TBD) supported by ceval_select, cannot use "
                              "including object found for : " % format + repr(cobject))
            #
        clogger.debug("Searching cache for begin  object for : " + repr(cobject))
        ########################################################################
        it, comp_period = hasBeginObject(cobject)
        clogger.debug("Finished with searching cache for begin  object for : " + repr(cobject))
        # it=None
        if it:
            clogger.info("partial result found in cache for %s : %s" % (cobject.crs, it.crs))
            clogger.debug("comp_period=" + repr(comp_period))
            begcrs = it.crs
            # Build complement object for end, and eval it
            comp = copy.deepcopy(it)
            comp.setperiod(comp_period)
            evalcomp, _ = ceval(comp, userflags, format, deep, derived_list, recurse_list)
            set_variable(evalcomp, cobject.variable, format=format)
            rep, costs = complement(begcrs, comp.crs, cobject.crs)
            cdedent()
            if format == 'file':
                return rep, costs
            else:
                return ceval(cobject), costs
        #
    clogger.info("nothing relevant found in cache for %s" % cobject.crs)
    #
    #  Only deep=True can propagate downward !
    if deep:
        down_deep = True
    else:
        down_deep = None
    #
    # the cache doesn't have a similar tree, let us recursively eval subtrees
    ##########################################################################
    # TBD  : analyze if the dataset is remote and the remote place 'offers' the operator
    if cobject.operator in cscripts:
        clogger.debug("Script %s found" % cobject.operator)
        # Does return a filename, or list of filenames
        filen, costs = ceval_script(cobject, down_deep, recurse_list=recurse_list)
        cdedent()
        if format in ['file', ]:
            return filen, costs
        else:
            return cread(filen, varOf(cobject)), costs
    elif cobject.operator in operators:
        clogger.debug("Operator %s found" % cobject.operator)
        # TODO: Implement ceval_operator (and handle costs)
        obj = ceval_operator(cobject, deep)
        cdedent()
        if format in ['file', ]:
            # TODO: Implement cstore
            rep = cstore(obj)
            return rep, compute_cost()
        else:
            return obj, compute_cost()
    else:
        raise Climaf_Driver_Error("operator %s is not a script nor known operator" % str(cobject.operator))


def ceval_operator(cobject, deep, *args, **kwargs):
    raise NotImplementedError()


def cstore(cobject, *args, **kwargs):
    raise NotImplementedError()


def ceval_for_scriptChild(cobject, userflags=None, format="MaskedArray", deep=None, derived_list=list(),
                          recurse_list=list()):
    """

    :param cobject:
    :param userflags:
    :param format:
    :param deep:
    :param derived_list:
    :param recurse_list:
    :return: either a filename string or MaskedArray, and a cost object
    """
    recurse_list.append(cobject.crs)
    clogger.debug("Evaluating compound object : " + repr(cobject))
    #################################################################
    if deep is not None:
        cdrop(cobject)
    #
    clogger.debug("Searching cache for exact object : " + repr(cobject))
    #################################################################
    filename, costs = hasExactObject(cobject)
    # filename=None
    if filename:
        clogger.info("Object found in cache: %s is at %s:  " % (cobject.crs, filename))
        cdedent()
        if format == 'file':
            return filename, costs
        else:
            return cread(filename, varOf(cobject)), costs
    if dig_hard_into_cache:
        clogger.debug("Searching cache for including object for : " + repr(cobject))
        ########################################################################
        it, altperiod = hasIncludingObject(cobject)
        # clogger.debug("Finished with searching cache for including object for : " + `cobject`)
        # it=None
        if it:
            clogger.info("Including object found in cache : %s" % it.crs)
            if format == 'file':
                clogger.info("Selecting " + repr(cobject) + " out of it")
                # Just select (if necessary for the user) the portion relevant to the request
                rep, costs = ceval_select(it, cobject, userflags, format, deep, derived_list, recurse_list)
                cdedent()
                return rep, costs
            else:
                clogger.debug("Because out format %s is not (yet, TBD) supported by ceval_select, cannot use "
                              "including object found for : " % format + repr(cobject))
            #
        clogger.debug("Searching cache for begin  object for : " + repr(cobject))
        ########################################################################
        it, comp_period = hasBeginObject(cobject)
        clogger.debug("Finished with searching cache for begin  object for : " + repr(cobject))
        # it=None
        if it:
            clogger.info("partial result found in cache for %s : %s" % (cobject.crs, it.crs))
            clogger.debug("comp_period=" + repr(comp_period))
            begcrs = it.crs
            # Build complement object for end, and eval it
            comp = copy.deepcopy(it)
            comp.setperiod(comp_period)
            evalcomp, _ = ceval(comp, userflags, format, deep, derived_list, recurse_list)
            set_variable(evalcomp, cobject.variable, format=format)
            rep, costs = complement(begcrs, comp.crs, cobject.crs)
            cdedent()
            if format in ['file', ]:
                return rep, costs
            else:
                return ceval(cobject)
        #
    clogger.info("nothing relevant found in cache for %s" % cobject.crs)
    #
    #  Only deep=True can propagate downward !
    if deep:
        down_deep = True
    else:
        down_deep = None
    # Force evaluation of 'father' script
    script_eval, _ = ceval_script(cobject.father, down_deep, recurse_list=recurse_list)
    if script_eval is not None:
        # Re-evaluate, which should succeed using cache
        rep, costs = ceval(cobject, userflags, format, None, recurse_list=recurse_list)
        cdedent()
        return rep, costs
    else:
        raise Climaf_Driver_Error("generating script aborted for " + cobject.father.crs)


def ceval_for_cpage(cobject, userflags=None, format="MaskedArray", deep=None, derived_list=list(),
                    recurse_list=list()):
    """

    :param cobject:
    :param userflags:
    :param format:
    :param deep:
    :param derived_list:
    :param recurse_list:
    :return: either a filename string or MaskedArray, and a cost object
    """
    recurse_list.append(cobject.crs)
    clogger.debug("Evaluating compound object : " + repr(cobject))
    #################################################################
    if deep is not None:
        cdrop(cobject)
    #
    clogger.debug("Searching cache for exact object : " + repr(cobject))
    #################################################################
    filename, costs = hasExactObject(cobject)
    # filename=None
    if filename:
        clogger.info("Object found in cache: %s is at %s:  " % (cobject.crs, filename))
        cdedent()
        if format in ['file', ]:
            return filename, costs
        else:
            return cread(filename, varOf(cobject)), costs
    #
    #  Only deep=True can propagate downward !
    if deep:
        down_deep = True
    else:
        down_deep = None
    filen, costs = cfilePage(cobject, down_deep, recurse_list=recurse_list)
    cdedent()
    if format in ['file', ]:
        return filen, costs
    else:
        return cread(filen), costs  # !! Does it make sense ?


def ceval_for_cpage_pdf(cobject, userflags=None, format="MaskedArray", deep=None, derived_list=list(),
                        recurse_list=list()):
    """

    :param cobject:
    :param userflags:
    :param format:
    :param deep:
    :param derived_list:
    :param recurse_list:
    :return: either a filename string or MaskedArray, and a cost object
    """
    recurse_list.append(cobject.crs)
    clogger.debug("Evaluating compound object : " + repr(cobject))
    #################################################################
    if deep is not None:
        cdrop(cobject)
    #
    clogger.debug("Searching cache for exact object : " + repr(cobject))
    #################################################################
    filename, costs = hasExactObject(cobject)
    # filename=None
    if filename:
        clogger.info("Object found in cache: %s is at %s:  " % (cobject.crs, filename))
        cdedent()
        if format in ['file', ]:
            return filename, costs
        else:
            return cread(filename, varOf(cobject)), costs
    #
    #  Only deep=True can propagate downward !
    if deep:
        down_deep = True
    else:
        down_deep = None
    #
    filen, costs = cfilePage_pdf(cobject, down_deep, recurse_list=recurse_list)
    cdedent()
    if format in ['file', ]:
        return filen, costs
    else:
        return cread(filen)  # !! Does it make sense ?


def ceval_for_cens(cobject, userflags=None, format="MaskedArray", deep=None, derived_list=list(),
                   recurse_list=list()):
    """

    :param cobject:
    :param userflags:
    :param format:
    :param deep:
    :param derived_list:
    :param recurse_list:
    :return: either a filename string or MaskedArray, and a cost object
    """
    recurse_list.append(cobject.crs)
    clogger.debug("Evaluating compound object : " + repr(cobject))
    #################################################################
    if deep is not None:
        cdrop(cobject)
    #
    clogger.debug("Searching cache for exact object : " + repr(cobject))
    #################################################################
    filename, costs = hasExactObject(cobject)
    # filename=None
    if filename:
        clogger.info("Object found in cache: %s is at %s:  " % (cobject.crs, filename))
        cdedent()
        if format == 'file':
            return filename, costs
        else:
            return cread(filename, varOf(cobject)), costs
    d = dict()
    costs = dict()
    for member in cobject.order:
        # print ("evaluating member %s"%member)
        d[member], costs[member] = ceval(cobject[member],
                                         copy.copy(userflags), format, deep, recurse_list=recurse_list)

    cdedent()
    if format in ["file", ]:
        files = reduce(lambda x, y: x + " " + y, [d[m] for m in cobject.order])
        total_cost = compute_cost()
        for m in cobject.order:
            total_cost.add(costs[m])
        return files, total_cost
    else:
        return d, costs


def ceval_for_string(cobject, userflags=None, format="MaskedArray", deep=None, derived_list=list(),
                     recurse_list=list()):
    """
    Evaluate a CliMAF object of kind string.
    TODO: Implement this function
    :param cobject:
    :param userflags:
    :param format:
    :param deep:
    :param derived_list:
    :param recurse_list:
    :return:
    """
    clogger.debug("Evaluating object from crs : %s" % cobject)
    raise NotImplementedError("Evaluation from CRS is not yet implemented ( %s )" % cobject)


def evaluate_inputs(call, deep=False, recurse_list=[]):
    # Evaluate input data for a script call , either a CliMAF-tye one or an ESMValTool one
    invalues = []
    sizes = []
    total_costs = compute_cost()
    for op in call.operands:
        if op:
            if call.operator != 'remote_select' and \
                    isinstance(op, cdataset) and \
                    not (op.isLocal() or op.isCached()):
                inValue, costs = ceval(op, format='file', deep=deep)
            else:
                inValue, costs = ceval(op, format='file', deep=deep,
                                       userflags=call.flags, recurse_list=recurse_list)
            clogger.debug("evaluating %s operand %s as %s" % (call.operator, op, inValue))
            if inValue in [None, ""]:
                raise Climaf_Driver_Error("When evaluating %s : value for %s is None" % (call.script, repr(op)))
            if isinstance(inValue, list):
                size = len(inValue)
            else:
                size = 1
            total_costs.add(costs)
        else:
            inValue = ''
            size = 0
        sizes.append(size)
        invalues.append(inValue)
    return invalues, sizes, total_costs


def ceval(cobject, userflags=None, format="MaskedArray",
          deep=None, derived_list=[], recurse_list=[]):
    """
    Actually evaluates a CliMAF object, either as an in-memory data structure or
    as a string of filenames (which either represent a superset or exactly includes
    the desired data). Returns it in pair with a cost object

    - with arg deep=True , re-evaluates all components
    - with arg deep=False, re-evaluates top level operation
    - without arg deep   , use cached values as far as possible

    arg derived_list is the list of variables that have been considered as 'derived'
    (i.e. not natives) in upstream evaluations. It avoids to loop endlessly
    """
    if format not in ["MaskedArray", "file", "txt"]:
        raise Climaf_Driver_Error("Allowed formats yet are : 'object', 'nc', 'txt', %s" % ', '.join(
            [repr(x) for x in graphic_formats]))
    #
    if userflags is None:
        userflags = scriptFlags()
    #
    # Next check is too crude for dealing with use of operator 'select'
    # if cobject.crs in recurse_list :
    #    clogger.critical("INTERNAL ERROR : infinite loop on object: "+cobject.crs)
    #    return None
    cindent()
    if isinstance(cobject, cdataset):
        return ceval_for_cdataset(cobject=cobject, userflags=userflags, format=format, deep=deep,
                                  derived_list=derived_list, recurse_list=recurse_list)
    #
    elif isinstance(cobject, ctree):
        return ceval_for_ctree(cobject=cobject, userflags=userflags, format=format, deep=deep,
                               derived_list=derived_list, recurse_list=recurse_list)
    elif isinstance(cobject, scriptChild):
        return ceval_for_scriptChild(cobject=cobject, userflags=userflags, format=format, deep=deep,
                                     derived_list=derived_list, recurse_list=recurse_list)
    elif isinstance(cobject, cpage):
        return ceval_for_cpage(cobject=cobject, userflags=userflags, format=format, deep=deep,
                               derived_list=derived_list, recurse_list=recurse_list)
    elif isinstance(cobject, cpage_pdf):
        return ceval_for_cpage_pdf(cobject=cobject, userflags=userflags, format=format, deep=deep,
                                   derived_list=derived_list, recurse_list=recurse_list)
    elif isinstance(cobject, cens):
        return ceval_for_cens(cobject=cobject, userflags=userflags, format=format, deep=deep,
                              derived_list=derived_list, recurse_list=recurse_list)
    elif isinstance(cobject, str):
        return ceval_for_string(cobject=cobject, userflags=userflags, format=format, deep=deep,
                                derived_list=derived_list, recurse_list=recurse_list)
    else:
        raise Climaf_Driver_Error("argument " + repr(cobject) + " is not (yet) managed")


def ceval_script(scriptCall, deep, recurse_list=[]):
    """ Actually applies a CliMAF-declared script on a script_call object

    Prepare operands as fields and build command from operands and parameters list
    Assumes that scripts are described in dictionary 'scripts'  by templates as
    documented in operators.cscript

    Returns a CLiMAF cache data filename
    """
    script = cscripts[scriptCall.operator]
    template = Template(script.command)
    total_costs = compute_cost()
    # Evaluate input data
    invalues, sizes, partial_cost = evaluate_inputs(scriptCall, deep, recurse_list)
    total_costs.add(partial_cost)
    # print("len(invalues)=%d"%len(invalues))
    #
    # Replace input data placeholders with filenames
    subdict = dict()
    opscrs = ""
    if 0 in script.inputs:
        label, multiple, serie = script.inputs[0]
        op = scriptCall.operands[0]
        # print("processing 0, op=%s"%`op`)
        infile = invalues[0]
        if (scriptCall.operator != 'remote_select') and \
                not all(map(os.path.exists, infile.split(" "))):
            raise Climaf_Driver_Error("Internal error : for script %s and 1st operand %s, "
                                      "some input file does not exist among %s:" % (scriptCall.operator, op, infile))
        subdict[label] = infile
        # if scriptCall.flags.canSelectVar :
        subdict["var"] = varOf(op)
        subdict["Var"] = varOf(op)
        if isinstance(op, cdataset) and op.alias:
            filevar, scale, offset, units, filenameVar, missing, conditions = op.alias
            if op.matches_conditions(conditions):
                if scriptCall.flags.canAlias and "," not in varOf(op):
                    # if script=="select" and ((varOf(op) != filevar) or scale != 1.0 or offset != 0.) :
                    subdict["var"] = Template(filevar).safe_substitute(op.kvp)
                    subdict["alias"] = "%s,%s,%.4g,%.4g" % (varOf(op), subdict["var"], scale, offset)
                if units:
                    subdict["units"] = units
                if scriptCall.flags.canMissing and missing:
                    subdict["missing"] = missing
        if isinstance(op, cens):
            if not multiple:
                raise Climaf_Driver_Error(
                    "Script %s 's input #%s cannot accept ensemble %s" % (scriptCall.script, 0, repr(op)))
            # subdict["labels"]=r'"'+reduce(lambda x,y : "'"+x+"' '"+y+"'", op.labels)+r'"'
            subdict["labels"] = reduce(lambda x, y: x + "$" + y, op.order)
        if op:
            per = timePeriod(op)
            if per and str(per) != "" and scriptCall.flags.canSelectTime:
                if isinstance(per, string_types):
                    if per != '*':
                        clogger.error("Period type (%s) is wrong for object %s. Try method 'explore'" % (type(per), op))
                    else:
                        clogger.warning("Period is * for object %s; this may reveal an internal error" % op)
                elif not per.fx:
                    subdict["period"] = str(per)
                    subdict["period_iso"] = per.iso()
        if scriptCall.flags.canSelectDomain:
            subdict["domain"] = domainOf(op)
    else:
        subdict["var"] = varOf(scriptCall)
        subdict["Var"] = varOf(scriptCall)

    i = 0
    for op in scriptCall.operands:
        if op:
            opscrs += op.crs + " - "
        # print("processing %s, i=%d"%(`op`,i))
        infile = invalues[i]
        if (scriptCall.operator != 'remote_select') and infile != '' and \
                not all(map(os.path.exists, infile.split(" "))):
            raise Climaf_Driver_Error("Internal error : some input file does not exist among %s:" % infile)
        i += 1
        if i > 1 or 1 in script.inputs:
            label, multiple, serie = script.inputs[i]
            subdict[label] = infile
            # Provide the name of the variable in input file if script allows for
            if isinstance(op, cobject):
                subdict["var_%d" % i] = varOf(op)
            if isinstance(op, cdataset) and op.alias:
                filevar, scale, offset, units, filenameVar, missing, conditions = op.alias
                if op.matches_conditions(conditions):
                    if ((varOf(op) != filevar) or (scale != 1.0) or (offset != 0.)) and \
                            "," not in varOf(op):
                        subdict["var_%d" % i] = Template(filevar).safe_substitute(op.kvp)
                        subdict["alias_%d" % i] = "%s %s %f %f" % (varOf(op),
                                                                   subdict["var_%d" % i], scale, offset)
                    if units:
                        subdict["units_%d" % i] = units
                    if missing:
                        subdict["missing_%d" % i] = missing
            # Provide period selection if script allows for
            if op:
                per = timePeriod(op)
                if not per.fx and per != "":
                    subdict["period_%d" % i] = str(per)
                    subdict["period_iso_%d" % i] = per.iso()
            subdict["domain_%d" % i] = domainOf(op)
    clogger.debug("subdict for operands is " + repr(subdict))
    # substitution is deffered after scriptcall parameters evaluation, which may
    # redefine e.g period
    #
    # Provide one cache filename for each output and instantiates the command accordingly
    if script.outputFormat not in none_formats:
        if script.outputFormat == "graph":
            if 'format' in scriptCall.parameters:
                if scriptCall.parameters['format'] in graphic_formats:
                    output_fmt = scriptCall.parameters['format']
                else:
                    raise Climaf_Driver_Error('Allowed graphic formats yet are : %s' % ', '.join(
                        [repr(x) for x in graphic_formats]))
            else:  # default graphic format
                output_fmt = "png"
        else:
            output_fmt = script.outputFormat
        # Compute a filename for each ouptut
        # Un-named main output
        # main_output_filename=tempfile.NamedTemporaryFile(suffix="."+output_fmt).name
        # #generateUniqueFileName(scriptCall.crs, format=output_fmt)
        tmpfile, tmpfile_fmt = os.path.splitext(generateUniqueFileName(scriptCall.crs, format=output_fmt))
        main_output_filename = "%s_%i%s" % (tmpfile, os.getpid(), tmpfile_fmt)

        subdict["out"] = main_output_filename
        subdict["out_" + varOf(scriptCall)] = main_output_filename

        subdict["out_final"] = generateUniqueFileName(scriptCall.crs, format=output_fmt)
        subdict["out_final_" + varOf(scriptCall)] = generateUniqueFileName(scriptCall.crs, format=output_fmt)

        # Named outputs
        for output in scriptCall.outputs:
            # subdict["out_"+output]=tempfile.NamedTemporaryFile(suffix="."+output_fmt).name
            tmpfile, tmpfile_fmt = os.path.splitext(
                generateUniqueFileName(scriptCall.crs + "." + output, format=output_fmt))
            subdict["out_" + output] = "%s_%i%s" % (tmpfile, os.getpid(), tmpfile_fmt)
            subdict["out_final_" + output] = generateUniqueFileName(scriptCall.crs + "." + output, format=output_fmt)

    # Account for script call parameters
    for p in scriptCall.parameters:
        # clogger.debug("processing parameter %s=%s"%(p,scriptCall.parameters[p]))
        subdict[p] = scriptCall.parameters[p]
        if p == "period":
            subdict["period_iso"] = init_period(scriptCall.parameters[p]).iso()
    subdict["crs"] = opscrs.replace("'", "")
    #
    # Discard selection parameters if selection already occurred for first operand
    # TBD : manage the cases where other operands didn't undergo selection
    exact, _ = hasExactObject(scriptCall.operands[0])
    if exact:
        # for key in ["period","period_iso","var","domain","missing","alias","units"]:
        for key in ["period", "period_iso", "var", "domain", "missing", "alias"]:
            if key in subdict:
                subdict.pop(key)
    #
    # print("subdict="+`subdict`)
    # Combine CRS and possibly member_label to provide/complement title
    if 'title' not in subdict:
        if 'member_label' in subdict:
            subdict["title"] = subdict['member_label']
    #        else:
    #            subdict["title"]=subdict["crs"]
    else:
        # print("Got a member label : %s"%subdict['member_label'])
        if 'member_label' in subdict:
            subdict["title"] = subdict["title"] + " " + subdict['member_label']
            subdict.pop('member_label')
    #
    # Substitute all args
    template = template.safe_substitute(subdict)
    #
    # Allowing for some formal parameters to be missing in the actual call:
    #
    # Discard remaining substrings looking like :
    #  some_word='"${some_keyword}"' , or simply : '"${some_keyword}"'
    template = re.sub(r'((--)?\w*=)?(\'\")?\$\{\w*\}(\"\')?', r"", template)
    #
    # Discard remaining substrings looking like :
    #  some_word=${some_keyword}  or  simply : ${some_keyword}
    template = re.sub(r"((--)?\w*=)?\$\{\w*\}", r"", template)
    #
    # Link the fixed fields needed by the script/operator
    if script.fixedfields is not None:
        # subdict_ff=dict()
        subdict_ff = scriptCall.parameters.copy()
        subdict_ff["model"] = modelOf(scriptCall.operands[0])
        subdict_ff["simulation"] = simulationOf(scriptCall.operands[0])
        subdict_ff["project"] = projectOf(scriptCall.operands[0])
        subdict_ff["realm"] = realmOf(scriptCall.operands[0])
        subdict_ff["grid"] = gridOf(scriptCall.operands[0])
        scr_fixed_fields = script.fixedfields  # return paths: (linkname, targetname)
        files_exist = dict()
        for ll, lt in scr_fixed_fields:
            # Replace input data placeholders with filenames for fixed fields
            template_ff_target = Template(lt).substitute(subdict_ff)
            # symlink if needed
            files_exist[ll] = False
            if os.path.islink(ll):
                if os.path.realpath(ll) != template_ff_target:
                    os.remove(ll)
                    os.symlink(template_ff_target, ll)
            elif os.path.isfile(ll):
                files_exist[ll] = True
            else:
                os.symlink(template_ff_target, ll)
    #
    tim1 = time.time()
    clogger.info("Launching command:" + template)
    #
    with open(logdir + '/last.out', 'w') as logfile:
        logfile.write("\n\nstdout and stderr of script call :\n\t " + template + "\n\n")
        try:
            subprocess.check_call(template, stdout=logfile, stderr=subprocess.STDOUT, shell=True)
        except subprocess.CalledProcessError:
            raise Climaf_Driver_Error("Something went wrong when computing %s. See file ./last.out for details" %
                                      scriptCall.crs)

    #
    duration = time.time() - tim1
    total_costs.increment(duration)
    # For remote files, we supply ds.local_copies_of_remote_files
    # for local filenames in order to can use ds.check()
    if scriptCall.operator == 'remote_select':
        local_filename = []
        for el in scriptCall.operands[0].baseFiles().split(" "):
            local_filename.append(remote_to_local_filename(el))
        scriptCall.operands[0].local_copies_of_remote_files = ' '.join(local_filename)
    #
    # Clean fixed fields symbolic links (linkname, targetname)
    if script.fixedfields:
        for ll, lt in script.fixedfields:
            if not files_exist[ll]:
                os.system("rm -f " + ll)

    # Handle outputs
    if script.outputFormat in ["txt", ]:
        with open(logdir + "/last.out", 'r') as f:
            for line in f.readlines():
                sys.stdout.write(line)
    if script.outputFormat in none_formats:
        return None, 0.0
    # Tagging output files with their CliMAF Reference Syntax definition
    # 1 - Un-named main output
    ok = register(main_output_filename, scriptCall.crs, total_costs, subdict["out_final"])
    # 2 - Named outputs
    for output in scriptCall.outputs:
        ok = ok and register(subdict["out_" + output], scriptCall.crs + "." + output, total_costs,
                             subdict["out_final_" + output])
    if ok:
        clogger.info("Done in %.1f s with script computation for "
                     "%s (command was :%s )" % (duration, repr(scriptCall), template))
        return subdict["out_final"], total_costs  # main_output_filename
    else:
        raise Climaf_Driver_Error("Some output missing when executing "
                                  ": %s. \n See %s/last.out" % (template, logdir))


[docs]def ceval_evt(climaf_name, script, *operands, **parameters): """ Evaluates OPERANDS and forward them to function :py:func:`~climaf.ESMValTool_diags.call_evt_script` together with all arguments. This function is NOT supposed to be called directly except by CliMAF driver, see doc. """ invalues, _, _ = evaluate_inputs(ctree(script, None, *operands, **parameters)) return call_evt_script(climaf_name, script, invalues, *operands, **parameters)
def timePeriod(cobject): """ Returns a time period for a CliMAF object : if object is a dataset, returns its time period, otherwise returns time period of first operand """ if isinstance(cobject, cdataset): return cobject.period elif isinstance(cobject, ctree): clogger.debug("timePeriod : processing %s,operands=%s" % (cobject.script, repr(cobject.operands))) if cobject.script.flags.doCatTime and len(cobject.operands) > 1: clogger.debug("Building composite period for results of %s" % cobject.operator) periods = [timePeriod(op) for op in cobject.operands] merged_period = merge_periods(periods) if len(merged_period) > 1: raise Climaf_Driver_Error("Issue when time assembling with %s, periods are not consecutive : %s" % (cobject.operator, merged_period)) return merged_period[0] else: clogger.debug("timePeriod logic for script is 'choose 1st operand' %s" % cobject.script) return timePeriod(cobject.operands[0]) elif isinstance(cobject, scriptChild): clogger.debug("for now, timePeriod logic for scriptChilds is basic - TBD") return timePeriod(cobject.father) elif isinstance(cobject, cens): clogger.debug("for now, timePeriod logic for 'cens' objet is basic (1st member)- TBD") return timePeriod(list(cobject.values())[0]) else: return None # clogger.error("unkown class for argument "+`cobject`) def ceval_select(includer, included, userflags, format, deep, derived_list, recurse_list): """ Extract object INCLUDED from (existing) object INCLUDER, taking into account the capability of the user process (USERFLAGS) and the required delivering FORMAT(file or object) """ if format == 'file': if userflags.canSelectTime or userflags.canSelectDomain: clogger.debug("TBD - should do smthg smart when user can select time or domain") # includer.setperiod(included.period) incperiod = timePeriod(included) clogger.debug("extract sub period %s out of %s" % (repr(incperiod), includer.crs)) clogger.debug("Variable considered in includer: %s" % includer.variable) clogger.debug("Variable considered in included: %s" % included.variable) extract = capply('select', includer, period=repr(incperiod)) clogger.debug("Variable considered in extract: %s" % extract.variable) clogger.debug("Extract crs found: %s" % extract) objfile, costs = ceval(extract, userflags, 'file', deep, derived_list, recurse_list) if objfile: crs = includer.buildcrs(period=incperiod) return rename(objfile, crs), costs else: clogger.critical("Cannot evaluate " + repr(extract)) exit() else: clogger.error("Can yet process only files - TBD") return None, compute_cost() def cread(datafile, varname=None, period=None): if not datafile: return None if re.findall(".png$", datafile): subprocess.Popen(["display", datafile, "&"]) elif re.findall(".pdf$", datafile) or re.findall(".eps$", datafile): if xdg_bin is None: subprocess.Popen(["display", datafile, "&"]) else: subprocess.Popen([xdg_bin, datafile]) elif re.findall(".nc$", datafile): clogger.debug("reading NetCDF file %s" % datafile) if varname is None: varname = varOfFile(datafile) if varname is None: raise Climaf_Error("No varname provided") if varname not in varsOfFile(datafile): raise Climaf_Error("File %s doesn't have requested variable %s" % (datafile, varname)) if period is not None: clogger.warning("Cannot yet select on period (%s) using CMa for files %s - TBD" % (period, datafile)) try: with xr_open_dataset(datafile, use_cftime=True, mask_and_scale=True) as f: return f[varname].to_masked_array(copy=False) except ValueError: with xr_open_dataset(datafile, decode_times=False, mask_and_scale=True) as f: clogger.error("Error (but going on anyway) : cannot use cftime when reading file %s : ", datafile) return f[varname].to_masked_array(copy=False) else: clogger.error("cannot yet handle %s" % datafile) return None def cview(datafile): if re.findall(".png$", datafile): subprocess.Popen(["display", datafile, "&"]) elif re.findall(".pdf$", datafile) or re.findall(".eps$", datafile): if xdg_bin is None: subprocess.Popen(["display", datafile, "&"]) else: subprocess.Popen([xdg_bin, datafile]) else: clogger.error("cannot yet handle %s" % datafile) return None def derive_variable(ds): """ Assuming that variable of DS is a derived variable, returns the CliMAF object representing the operation needed to compute it (using information in dict operators_derive.derived_variable """ if not isinstance(ds, cdataset): raise Climaf_Driver_Error("arg is not a dataset") if not is_derived_variable(ds.variable, ds.project): raise Climaf_Driver_Error("%s is not a derived variable" % ds.variable) op, outname, inVarNames, params = derived_variable(ds.variable, ds.project) inVars = list() first = True for varname in inVarNames: dic = copy.deepcopy(ds.kvp) dic['variable'] = varname # If the dataset has a version attribute, it should be inherited only # by the first input variable, an be set to "latest" for the next ones # (it would be tricky to do something smarter TBD) if not first and "version" in dic: dic['version'] = "latest" first = False inVars.append(cdataset(**dic)) params["add_variable"] = ds.variable # TODO: force the output variable to be well defined father = capply(op, *inVars, **params) clogger.debug("Father object is: %s" % repr(father)) if outname == "out" or outname == ds.variable: rep = father else: rep = scriptChild(father, outname) clogger.debug("DEBUG variable>>> %s" % rep.variable) # TODO: check the type of the outputs rep.variable = ds.variable clogger.debug("DEBUG variable>>> %s" % rep.variable) return rep def set_variable(obj, varname, format): """ Change to VARNAME the variable name for OBJ, which FORMAT maybe 'file' or 'MaskedArray'. Also set the variable long_name using CF convention (TBD) """ if obj is None: return None long_name = CFlongname(varname) if format == 'file': oldvarname = varOfFile(obj) if not oldvarname: raise Climaf_Driver_Error("Cannot change variable name in file : %s " % obj) if oldvarname != varname: command = "ncrename -v %s,%s %s >/dev/null 2>&1" % (oldvarname, varname, obj) if os.system(command) != 0: clogger.error("Issue with changing varname to %s in %s" % (varname, obj)) return None clogger.debug("Varname changed to %s in %s" % (varname, obj)) command = "ncatted -a long_name,%s,o,c,%s %s" % (varname, long_name, obj) if os.system(command) != 0: clogger.error("Issue with changing long_name for var %s in %s" % (varname, obj)) return None return True elif format == 'MaskedArray': clogger.warning('TBD - Cannot yet set the varname for MaskedArray') else: clogger.error('Cannot handle format %s' % format) def noselect(userflags, ds, format): """ Check the capability of the user process (USERFLAGS) and a set of attribute values of dataset (DS) Return True if the user can select the data and aggregate time, and requested FORMAT is 'file', and False otherwise """ can_select = False if (userflags and (userflags.canSelectVar or ds.oneVarPerFile()) and (userflags.canSelectTime or ds.periodIsFine()) and (userflags.canSelectDomain or ds.domainIsFine()) and (userflags.canAggregateTime or ds.periodHasOneFile()) and (userflags.canAlias or ds.hasExactVariable()) and (userflags.canMissing or ds.missingIsOK()) and (ds.hasOneMember()) and (format in ['file', ])): can_select = True return can_select # Commodity functions #########################
[docs]def cfile(object, target=None, ln=None, hard=None, deep=None): """ Provide the filename for a CliMAF object, or copy this file to target. Launch computation if needed. Args: object (CliMAF object) : either a dataset or a 'compound' object (e.g. the result of a CliMAF operator) target (str, optional) : name of the destination file in case you really need another filename; CliMAF will then anyway also store the result in its cache, either as a copy (default), or a sym- or a hard-link (see below); ln (logical, optional) : if True, CliMAF cache file is created as a symlink to the target; this allows to cross filesystem boundaries, while still saving disk space (wrt to a copy); CliMAF will manage the broken link cases (at the expense of a new computation) hard (logical, optional) : if True, CliMAF cache file is created as a hard link to the target; this allows to save disk space, but does not allow to cross filesystem boundaries deep (logical, optional) : governs the use of cached values when computing the object: - if missing, or None : use cache as much as possible (speed up the computation) - False : make a shallow computation, i.e. do not use cached values for the top level operation - True : make a deep computation, i.e. do not use any cached value Returns: - if target is provided, returns this filename (or linkname) if computation is successful ('target' contains the result), and None otherwise; - if target is not provided, returns the filename in CliMAF cache, which contains the result (and None if failure) """ clogger.debug("cfile called on " + str(object)) start_time = datetime.now() clogger.debug("Starting cfile at: " + start_time.strftime("%Y-%m-%d %H:%M:%S")) # # -- Evaluate the CliMAF object result, costs = ceval(object, format='file', deep=deep) # end_time = datetime.now() duration = end_time - start_time clogger.debug("cfile completed at: " + end_time.strftime("%Y-%m-%d %H:%M:%S") + " : total duration = " + str(duration.total_seconds()) + ' seconds') if target is None: return result else: if isinstance(object, cens): clogger.error("Cannot create a single file hosting an ensemble. See function efile()") return None target = os.path.abspath(os.path.expanduser(target)) target_dir = os.path.dirname(target) if isinstance(object, cens): raise Climaf_Driver_Error("Cannot yet copy or link result files for an ensemble") if result is None: raise Climaf_Driver_Error("Issue when evaluating %s" % object) else: if ln or hard: if ln and hard: Climaf_Driver_Error("flags ln and hard are mutually exclusive") elif ln: if os.path.exists(target): if not os.path.samefile(result, target): os.remove(target) shutil.move(result, target) os.symlink(target, result) else: if not os.path.exists(target_dir): os.makedirs(target_dir) shutil.move(result, target) os.symlink(target, result) else: # Must create hard link # If result is a link, follow links for finding source of hard link if os.path.islink(result): source = os.readlink(result) else: source = result if source == target: # This is a case where the file had already been symlinked to the same target name shutil.move(source, result) os.link(result, target) else: if os.path.exists(target): os.remove(target) os.link(source, target) else: if not os.path.exists(target_dir): os.makedirs(target_dir) shutil.copyfile(result, target) if not os.path.exists(target): raise Climaf_Driver_Error("Issue during the creation of the target file %s" % target) else: return target
[docs]def cshow(obj): """ Provide the in-memory value of a CliMAF object. For a figure object, this will lead to display it ( launch computation if needed. ) """ clogger.debug("cshow called on " + str(obj)) value, costs = ceval(obj, format='MaskedArray') return value
[docs]def cMA(obj, deep=None): """ Provide the Masked Array value for a CliMAF object. Launch computation if needed. Args: obj (CliMAF object) : either a datset or a 'compound' object (like the result of a CliMAF standard operator) deep (logical, optional) : governs the use of cached values when computing the object - if missing, or None : use cache as much as possible - False : make a shallow computation, i.e. do not use cached values for top level operation - True : make a deep computation, i.e. do not use any cached value Returns: a Masked Array containing the object's value """ clogger.debug("cMA called with arguments : " + str(obj)) value, costs = ceval(obj, format='MaskedArray', deep=deep) return value
[docs]def cvalue(obj, index=0, deep=None, cost=None): """ Return the value of the array for an object, at a given index, after MV flattening Arg DEEP is used as for :py:func:`climaf.driver.cfile()` If arg COST is True, a tuple is returned, which second element is the total compute cost for the value Example, where the target object is a scalar (0d) field: >>> data=ds(project='mine',variable='tas', ...) >>> data1=time_average(data) >>> data2=space_average(data1) >>> v=cvalue(data2) Does use the file representation of the object """ if deep is None: rep, costs = has_cvalue(obj.crs, index) else: rep = None if rep is None: value, costs = ceval(obj, format='MaskedArray', deep=deep) rep = float(value.data.flat[index]) store_cvalue(obj.crs, index, rep, costs) if cost: return rep, costs.tc else: return rep
def cexport(*args, **kwargs): """ Alias for climaf.driver.ceval. Create synonyms for arg 'format' """ clogger.debug("cexport called with arguments" + str(args)) if "format" in kwargs: if kwargs['format'] in ["NetCDF", "netcdf", "nc", "png", "pdf", "eps"]: kwargs['format'] = "file" if kwargs['format'] == "MA": kwargs['format'] = "MaskedArray" return ceval(*args, **kwargs) def cimport(cobject, crs): clogger.debug("cimport called with argument", cobject) clogger.debug("should check syntax of arg 'crs' -TBD") clogger.warning("cimport is not for the dummies - Playing at your own risks !") import numpy.ma if isinstance(cobject, numpy.ma.MaskedArray): clogger.debug("for now, use a file for importing - should revisit - TBD") clogger.error("not yet implemented fro Masked Arrays - TBD") elif isinstance(cobject, string_types): register(cobject, crs, compute_cost()) else: clogger.error("argument is not a Masked Array nor a filename", cobject) def get_fig_sizes(figfile): args_figsize = ["identify", figfile] # On some sites, getoutput first lines have warning messages # Furthermore, in case of missing file, last line could be an error -> only consider lines beginning with figfile output_figsize = getoutput(" ".join(args_figsize)).split("\n") with_figfile = [line for line in output_figsize if line.startswith(figfile)] if len(with_figfile) == 0: raise ValueError("No relevant line for fig size in command (%s) output %s" % (args_figsize, output_figsize)) output_figsize = with_figfile[-1] # comm_figsize = subprocess.Popen(args_figsize, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # output_figsize = comm_figsize.stdout.read() figsize = str(output_figsize).split(" ").pop(2) (fig_width, fig_height) = figsize.split("x") return int(fig_width), int(fig_height) def cfilePage(cobj, deep, recurse_list=None): """ Builds a page with CliMAF figures, computing associated crs Args: cobj (cpage object) Returns : the filename in CliMAF cache, which contains the result (and None if failure) """ if not isinstance(cobj, cpage): raise Climaf_Driver_Error("cobj is not a cpage object") clogger.debug("Computing figure array for cpage %s" % cobj.crs) # # page size and creation page_size = "%dx%d" % (cobj.page_width, cobj.page_height) args = ["convert", "-size", page_size, "xc:white"] # # margins x_left_margin = 30. # Left shift at start and end of line y_top_margin = 30. # Initial vertical shift for first line x_right_margin = 30. # Right shift at start and end of line y_bot_margin = 30. # Vertical shift for last line xmargin = 30. # Horizontal shift between figures ymargin = 30. # Vertical shift between figures # usable_height = cobj.page_height - ymargin * (len(cobj.heights) - 1.) - y_top_margin - y_bot_margin if cobj.title != "": usable_height -= cobj.ybox if cobj.insert != "": ins_base_width, ins_base_height = get_fig_sizes(cobj.insert) insert_height = int((float(ins_base_height) * cobj.insert_width) / float(ins_base_width)) usable_height -= insert_height # usable_width = cobj.page_width - xmargin * (len(cobj.widths) - 1.) - x_left_margin - x_right_margin # # page composition y = y_top_margin total_costs = compute_cost() for line, rheight in zip(cobj.fig_lines, cobj.heights): # Line height in pixels height = usable_height * rheight x = x_left_margin max_old = 0. for fig, rwidth in zip(line, cobj.widths): # Figure width in pixels width = usable_width * rwidth scaling = "%dx%d+%d+%d" % (width, height, x, y) if fig: figfile, costs = ceval(fig, format="file", deep=deep, recurse_list=recurse_list) else: figfile = 'xc:None' costs = compute_cost() total_costs.add(costs) clogger.debug("Compositing figure %s", fig.crs if fig else 'None') args.extend([figfile, "-geometry", scaling, "-composite"]) # Real size of figure in pixels: [fig_width x fig_height] try: fig_width, fig_height = get_fig_sizes(figfile) except: raise Climaf_Driver_Error("Issue with figure " + str(fig)) # Scaling and max height if float(fig_width) != 1. and float(fig_height) != 1.: if ((float(fig_width) / float(fig_height)) * float(height)) < width: new_fig_width = (float(fig_width) / float(fig_height)) * float(height) new_fig_height = height else: new_fig_height = (float(fig_height) / float(fig_width)) * float(width) new_fig_width = width else: # for figure = 'None' new_fig_height = fig_height new_fig_width = fig_width max_fig_height = max(float(new_fig_height), max_old) max_old = float(new_fig_height) if False and cobj.fig_trim and (float(fig_width) / float(fig_height) < width / height): width_adj = float(fig_width) * (height / float(fig_height)) x += width_adj + xmargin else: x += width + xmargin if cobj.fig_trim and (float(fig_width) / float(fig_height) > width / height): height_adj = max_fig_height y += height_adj + ymargin else: y += height + ymargin if cobj.insert != "": args.extend([cobj.insert, "-geometry", "x%d+%d+%d" % (insert_height, (cobj.page_width - cobj.insert_width) / 2, y), "-composite"]) out_fig = generateUniqueFileName(cobj.crs, format=cobj.format) if cobj.page_trim: args.append("-trim") if cobj.title != "": splice = "0x%d" % cobj.ybox annotate = "+%d+%d" % (cobj.x, cobj.y) args.extend(["-gravity", cobj.gravity, "-background", cobj.background, "-splice", splice, "-font", cobj.font, "-pointsize", "%d" % cobj.pt, "-annotate", annotate, '"%s"' % cobj.title]) args.append(out_fig) command = " ".join(args) clogger.debug("Compositing figures : %s" % command) tim1 = time.time() # try: with open("tmp.err", "w") as fic: out = subprocess.check_output(command, shell=True, stderr=fic) except subprocess.CalledProcessError: with open("tmp.err") as fic: err = fic.read() raise Climaf_Driver_Error("Compositing failed : %s for %s" % (err, command)) # duration = time.time() - tim1 total_costs.increment(duration) # # There are cases where subprocess doesn't raise an Error, while compositing failed if not os.path.exists(out_fig): with open("tmp.err") as fic: err = fic.readlines() raise Climaf_Driver_Error("Compositing failed %s for %s" % (err, command)) os.remove("tmp.err") if register(out_fig, cobj.crs, total_costs): clogger.debug("Registering file %s for cpage %s" % (out_fig, cobj.crs)) return out_fig, total_costs else: return None, compute_cost() def cfilePage_pdf(cobj, deep, recurse_list=None): """ Builds a PDF page with CliMAF figures using pdfjam, computing associated crs Args: cobj (cpage_pdf object) Returns : the filename in CliMAF cache, which contains the result (and None if failure) """ if not isinstance(cobj, cpage_pdf): raise Climaf_Driver_Error("cobj is not a cpage_pdf object") clogger.debug("Computing figure array for cpage %s" % cobj.crs) # # margins xmargin = 30. # Horizontal shift between figures ymargin = 30. # Vertical shift between figures # # page size and creation page_size = '"{%dpx,%dpx}"' % (cobj.page_width, cobj.page_height) fig_nb = '"%dx%d"' % (len(cobj.fig_lines[0]), len(cobj.fig_lines)) fig_delta = '"%d %d"' % (xmargin, ymargin) preamb = '"\\pagestyle{empty} \\usepackage{hyperref} \\usepackage{graphicx} \\usepackage{geometry} ' \ '\\geometry{vmargin=%dcm,hmargin=2cm}"' % cobj.y args = [pdf_page_builder, "--keepinfo", "--preamble", preamb, "--papersize", page_size, "--delta", fig_delta, "--nup", fig_nb] # "%s"%preamb # # page composition total_costs = compute_cost() for line in cobj.fig_lines: for fig in line: if fig: figfile, costs = ceval(fig, format="file", deep=deep, recurse_list=recurse_list) clogger.debug("Compositing figure %s", fig.crs) total_costs.add(costs) else: raise Climaf_Driver_Error("Each figure must exist ('None' figure is not accepted)") args.extend([figfile]) # # more optional options if cobj.openright is True: args.extend(["--openright", "True"]) if cobj.scale != 1.: args.extend(["--scale", "%.2f" % cobj.scale]) if cobj.title != "": if "\\" in cobj.pt: pt = cobj.pt.split("\\")[-1] else: pt = cobj.pt if cobj.titlebox: latex_command = r'"\begin{center} \hspace{%dcm} \setlength{\fboxrule}{0.5pt} ' \ r'\setlength{\fboxsep}{2mm} \fcolorbox{black}{%s}{\%s{\fontfamily{%s}\selectfont %s}}'\ r' \end{center}"' % (cobj.x, cobj.background, pt, cobj.font, cobj.title) else: latex_command = r'"\begin{center} \hspace{%dcm} \%s{\fontfamily{%s}\selectfont %s} \end{center}"' \ % (cobj.x, pt, cobj.font, cobj.title) args.extend(["--pagecommand", latex_command]) # # launch process and registering output in cache out_fig = generateUniqueFileName(cobj.crs, format='pdf') args.extend(["--outfile", out_fig]) clogger.debug("Compositing figures : %s" % repr(args)) tim1 = time.time() # try: with open("tmp.err", "w") as fic: out = subprocess.check_output(" ".join(args), shell=True, stderr=fic) except subprocess.CalledProcessError: with open("tmp.err") as fic: err = fic.read() raise Climaf_Driver_Error("Compositing failed : %s" % err) finally: os.remove("tmp.err") # duration = time.time() - tim1 total_costs.increment(duration) # if register(out_fig, cobj.crs, total_costs): clogger.debug("Registering file %s for cpage %s" % (out_fig, cobj.crs)) return out_fig, total_costs else: return None, compute_cost()
[docs]def calias(project, variable, fileVariable=None, **kwargs): """ See :py:func:`climaf.classes.calias` Declare that in ``project``, ``variable`` is to be computed by reading ``filevariable``; It allows to use a list of variables, given as a string where the name of variables are separated by commas """ if "," not in variable: # mono-variable classes.calias(project=project, variable=variable, fileVariable=fileVariable, **kwargs) else: # multi-variable classes.calias(project=project, variable=variable, fileVariable=fileVariable, **kwargs) list_variable = variable.split(",") for v in list_variable: derive(project, v, 'ccdo', variable, operator='selname,%s' % v) classes.calias(project=project, variable=v, fileVariable=None, **kwargs)
def CFlongname(varname): """ Returns long_name of variable VARNAME after CF convention """ return "TBD_should_improve_function_climaf.driver.CFlongname"
[docs]def efile(obj, filename, force=False): """ Create a single file for an ensemble of CliMAF objects (launch computation if needed). This is a convenience function. Such files are not handled in CliMAF cache Args: obj (CliMAF object) : an ensemble of CliMAF objects ('cens' objet) filename (str) : output filename. It will include a field for each ensemble's member, with a variable name suffixed by the member label (e.g. : tas_CNRM-CM, tas_IPSL-CM... ) (more formally : 'var(obj.order[n])'_'obj.ens[order[n]]') force (logical, optional) : if True, CliMAF will overwrite the file 'filename' if it already exists """ if isinstance(obj, cens): if os.path.isfile(filename): if force: os.system("rm -rf %s" % filename) clogger.warning("File '%s' already existed and has been overwritten" % filename) else: raise Climaf_Driver_Error("File '%s' already exists: use 'force=True' to overwrite it" % filename) for lab in obj.order: memb = obj[lab] ffile = cfile(memb) f = tempfile.NamedTemporaryFile(suffix=".nc") command = "ncrename -O -v %s,%s_%s %s %s" % (varOf(memb), varOf(memb), lab, ffile, f.name) if os.system(command) != 0: raise Climaf_Driver_Error("ncrename failed : %s" % command) command2 = "ncks -A %s %s" % (f.name, filename) if os.system(command2) != 0: raise Climaf_Driver_Error( "Issue when merging %s and %s (using command: %s)" % (f.name, filename, command2)) f.close() return True else: clogger.warning("objet is not a 'cens' objet")