Source code for climaf.driver

""" 
CliMAF driver

There is quite a lot of things to document here. Maybe at a later stage ....

"""
from __future__ import print_function

# Created : S.Senesi - 2014

import sys, os, os.path, re, posixpath, subprocess, time, shutil, copy
from string import Template
import tempfile

# Climaf modules
import climaf
import classes
import cache
import operators
import cmacro
from clogging import clogger, indent as cindent, dedent as cdedent
from climaf.netcdfbasics import varOfFile
from climaf.period import init_period, cperiod
from climaf import xdg_bin

logdir="."

def capply(climaf_operator, *operands, **parameters):
    """ Builds the object representing applying a CliMAF operator (script, function or macro)
    
    Returns results as a list of CliMAF objects and stores them if auto-store is on
    """
    res=None
    if operands is None or operands[0] is None and not allow_errors_on_ds_call : 
        raise Climaf_Driver_Error("Operands is None for operator %s"%climaf_operator)
    opds=map(str,operands)
    if climaf_operator in operators.scripts :
        #clogger.debug("applying script %s to"%climaf_operator + `opds` + `parameters`)
        res=capply_script(climaf_operator, *operands, **parameters)
        # Evaluate object right now if there is no output to manage
        op=operators.scripts[climaf_operator]
        if op.outputFormat in operators.none_formats : ceval(res,userflags=copy.copy(op.flags))
    elif climaf_operator in cmacro.cmacros :
        if (len(parameters) > 0) :
            raise Climaf_Driver_Error("Macros cannot be called with keyword args")
        clogger.debug("applying macro %s to"%climaf_operator + `opds` )
        res=cmacro.instantiate(cmacro.cmacros[climaf_operator],*operands)
    elif climaf_operator in operators.operators :
        clogger.debug("applying operator %s to"%climaf_operator + `opds` + `parameters`)
        res=capply_operator(climaf_operator,*operands, **parameters)
    else:
        clogger.error("%s is not a known operator nor script"%climaf_operator)
    return res

def capply_script (script_name, *operands, **parameters):
    """ Create object for application of a script to OPERANDS with keyword PARAMETERS."""
    
    if script_name not in operators.scripts :
        raise Climaf_Driver_Error("Script %s is not know. Consider declaring it "
                                  "with function 'cscript'", script_name)
    script=operators.scripts[script_name]
    # if len(operands) != script.inputs_number() : 
    #     raise Climaf_Driver_Error("Operator %s is "
    #                               "declared with %d input streams, while you provided %d. Get doc with 'help(%s)'"%(
    #             script_name,script.inputs_number(),len(operands), script_name ))
    #
    # Check that all parameters to the call are expected by the script 
    for para in parameters :
        if re.match(r".*\{"+para+r"\}",script.command) is None :
            if re.match(r".*\{"+para+r"_iso\}",script.command) is None :
                if para != 'member_label' :
                    raise Climaf_Driver_Error("parameter '%s' is not expected by script %s"
                                          "(which command is : %s)"%(para,script_name,script.command))
    #
    # Check that only first operand can be an ensemble
    opscopy=[ o for o in operands ]
    opscopy.remove(opscopy[0])
    for op in opscopy  :
        if isinstance(op,classes.cens ):
            raise Climaf_Driver_Error("Cannot yet have an ensemble as operand except as first one")
    # 
    #print "op(0)="+`operands[0]`
    #print "script=%s , script.flags.commuteWithEnsemble="%script_name+`script.flags.commuteWithEnsemble`
    if (isinstance(operands[0],classes.cens) and script.flags.commuteWithEnsemble) :
        # Must iterate on members
        reps=[]
        first=operands[0]
        order=first.order
        for label in order:
            member=first[label]
            clogger.debug("processing member %s : "%label+`member`)
            params=parameters.copy()
            params["member_label"]=label
            reps.append(maketree(script_name, script, member, *opscopy, **params))
        return(classes.cens(dict(zip(order,reps)),order))
    else: 
        return(maketree(script_name, script, *operands, **parameters))
            
def maketree(script_name, script, *operands, **parameters):
    rep=classes.ctree(script_name, script, *operands, **parameters)
    # TBD Analyze script inputs cardinality vs actual arguments
    # Create one child for each output
    defaultVariable=classes.varOf(operands[0])
        #defaultPeriod=operands[0].period
    for outname in script.outputs :
        if outname is None  or outname=='' :
            if "%s" in script.outputs[''] :
                rep.variable=script.outputs['']%defaultVariable
            else:
                rep.variable=script.outputs['']
            template=Template(rep.variable)
            rep.variable=template.substitute(parameters)
        else :
            son=classes.scriptChild(rep,outname)
            if "%s" in script.outputs[outname] :
                son.variable=script.outputs[outname]%defaultVariable
            else:
                son.variable=script.outputs[outname]
            template=Template(son.variable)
            son.variable=template.substitute(parameters)
            rep.outputs[outname]=son
            setattr(rep,outname,son)
    return rep

def capply_operator (climaf_operator, *operands, **parameters):
    """ 
    Create object for application of an internal OPERATOR to OPERANDS with keywords PARAMETERS.
    
    """
    clogger.error("Not yet developped - TBD")
    return None
    

def ceval(cobject, userflags=None, format="MaskedArray",
          deep=None, derived_list=[], recurse_list=[]) :
    """ 
    Actually evaluates a CliMAF object, either as an in-memory data structure or
    as a string of filenames (which either represent a superset or exactly includes
    the desired data)
    
    - with arg deep=True , re-evaluates all components
    - with arg deep=False, re-evaluates top level operation
    - without arg deep   , use cached values as far as possible

    arg derived_list is the list of variables that have been considered as 'derived'
    (i.e. not natives) in upstream evaluations. It avoids to loop endlessly
    """
    if format != 'MaskedArray' and format != 'file' and format != 'txt' : 
        raise Climaf_Driver_Error("Allowed formats yet are : 'object', 'nc', 'txt', %s"%\
                                  ', '.join([repr(x) for x in operators.graphic_formats]))
    #
    if userflags is None : userflags=operators.scriptFlags()
    #
    # Next check is too crude for dealing with use of operator 'select'
    #if cobject.crs in recurse_list :
    #    clogger.critical("INTERNAL ERROR : infinite loop on object: "+cobject.crs)
    #    return None
    cindent()
    if isinstance(cobject,classes.cdataset):
        recurse_list.append(cobject.crs)
        clogger.debug("Evaluating dataset operand " + cobject.crs + "having kvp=" + `cobject.kvp`)
        ds=cobject
        if ds.isLocal() or ds.isCached() :
            clogger.debug("Dataset %s is local or cached "%ds )
            #  if the data is local, then
            #   if the user can select the data and aggregate time, and requested format is
            #     'file' return the filenames
            #   else : read the data, create a cache file for that, and recurse
            # Go to derived variable evaluation if appicable
            #if ds.variable in operators.derived_variables and not ds.hasRawVariable() :
            if operators.is_derived_variable(ds.variable,ds.project) :
                if ds.variable in derived_list :
                    raise Climaf_Driver_Error("Loop detected while evaluating"
                         "derived variable "+ ds.variable + " " + `derived_list`)
                derived=derive_variable(ds)
                clogger.debug("evaluating derived variable %s as %s"%\
                                  (ds.variable,`derived`))
                derived_value=ceval(derived, format=format, deep=deep, 
                                    userflags=userflags,
                                    derived_list=derived_list+[ds.variable],
                                    recurse_list=recurse_list)
                if derived_value : 
                    clogger.debug("succeeded in evaluating derived variable %s as %s"%\
                                      (ds.variable,`derived`))
                    set_variable(derived_value, ds.variable, format=format)
                cdedent()
                return(derived_value)
            elif noselect(userflags, ds, format) :
                clogger.debug("Delivering file set or sets is OK for the target use")
                cdedent()
                rep=ds.baseFiles()
                if not rep : raise Climaf_Driver_Error("No file found for %s"%`ds`)
                return(rep) # a single string with all filenames,
                               #or a list of such strings in case of ensembles
            else:
                clogger.debug("Must subset and/or aggregate and/or select "+
                              "var from data files and/or get data, or provide object result")
                ## extract=cread(ds)
                ## clogger.debug(" Done with subsetting and caching data files")
                ## cstore(extract) # extract should include the dataset def
                ## return ceval(extract,userflags,format)
                if format == 'file' or format == "MaskedArray" :
                    if ds.hasOneMember() :
                        clogger.debug("Fetching/selection/aggregation is done using an external script for now - TBD")
                        extract=capply('select',ds)
                    else :
                        clogger.debug("On multi-member datafiles , fetching/selection/aggregation is done using select_member - TBD")
                        extract=capply('select_member',ds)
                    if extract is None :
                        raise Climaf_Driver_Error("Cannot access dataset" + `ds`)
                    rep=ceval(extract,userflags=userflags,format=format)
                #elif format="MaskedArray" :
                #    if ds.hasOneMember() :
                #        clogger.debug("Selection + aggregation for MA output using cread for %s"%ds)
                #        rep=cread(ds.baseFiles(),classes.varOf(ds),period="%s"%ds.period)
                #    else:
                #        raise Climaf_Driver_Error("Cannot yet read multi-member dataset in MA format" + `ds`)
                else :
                    raise Climaf_Driver_Error("Untractable output format %s"%format)
                userflags.unset_selectors()
                cdedent()
                return rep
        else :
            # else (non-local and non-cached dataset)
            #   if the user can access the dataset by one of the dataset-specific protocols
            #   then assume it can also select on time and provide it with the address
            #   else : fetch the relevant selection of the data, and store it in cache
            clogger.debug("Dataset is remote " )
            if (userflags.canOpendap and format == 'file' ) :
                clogger.debug("But user can OpenDAP " )
                cdedent()
                return(ds.adressOf())
            else :
                if noselect(userflags, ds, format) :    
                    clogger.debug("Delivering file set or sets is OK for the target use")
                    cdedent()
                    rep=ds.baseFiles()
                    if not rep : raise Climaf_Driver_Error("No file found for %s"%`ds`)
                    return(rep) 
                else:
                    clogger.debug("Must remote read and cache " )
                    rep=ceval(capply('remote_select',ds),userflags=userflags,format=format)
                    ds.files=rep
                    userflags.unset_selectors()
                    cdedent()
                    return rep
    #
    elif isinstance(cobject,classes.ctree) or isinstance(cobject,classes.scriptChild) or \
             isinstance(cobject,classes.cpage) or isinstance(cobject,classes.cpage_pdf) or \
             isinstance(cobject,classes.cens) : 
        recurse_list.append(cobject.buildcrs())
        clogger.debug("Evaluating compound object : " + `cobject`)
        #################################################################
        if (deep is not None) : cache.cdrop(cobject.crs)
        #
        clogger.debug("Searching cache for exact object : " + `cobject`)
        #################################################################
        filename=cache.hasExactObject(cobject)
        #filename=None
        if filename :
            clogger.info("Object found in cache: %s is at %s:  "%(cobject.crs,filename))
            cdedent()
            if format=='file' : return filename
            else: return cread(filename,classes.varOf(cobject))
        if not isinstance(cobject,classes.cpage) and not isinstance(cobject,classes.cpage_pdf) and \
               not isinstance(cobject,classes.cens) :
            clogger.debug("Searching cache for including object for : " + `cobject`)
            ########################################################################
            it,altperiod=cache.hasIncludingObject(cobject)
            #clogger.debug("Finished with searching cache for including object for : " + `cobject`)
            #it=None
            if it :
                clogger.info("Including object found in cache : %s"%(it.crs))
                if (format == 'file') :
                    clogger.info("Selecting "+`cobject`+" out of it")
                    # Just select (if necessary for the user) the portion relevant to the request
                    rep=ceval_select(it,cobject,userflags,format,deep,derived_list, recurse_list)
                    cdedent()
                    return rep
                else :
                    clogger.debug("Because out format %s is not (yet, TBD) supported by ceval_select, cannot use including object found for : "%format + `cobject`)
            #
            clogger.debug("Searching cache for begin  object for : " + `cobject`)
            ########################################################################
            it,comp_period=cache.hasBeginObject(cobject) 
            clogger.debug("Finished with searching cache for begin  object for : " + `cobject`)
            #it=None
            if it : 
                clogger.info("partial result found in cache for %s : %s"%\
                             (cobject.crs,it.crs))
                clogger.debug("comp_period="+`comp_period`)
                begcrs=it.crs
                # Build complement object for end, and eval it
                comp=copy.deepcopy(it)
                comp.setperiod(comp_period)
                evalcomp=ceval(comp,userflags,format,deep,derived_list,recurse_list)
                set_variable(evalcomp,cobject.variable, format=format)
                rep=cache.complement(begcrs,comp.crs,cobject.crs)
                cdedent()
                if (format == 'file') : return rep
                else : return ceval(cobject)
            #
            clogger.info("nothing relevant found in cache for %s"%cobject.crs)
        #
        if deep==False : deep=None
        if isinstance(cobject,classes.ctree)  :
            #
            # the cache doesn't have a similar tree, let us recursively eval subtrees
            ##########################################################################
            # TBD  : analyze if the dataset is remote and the remote place 'offers' the operator
            if cobject.operator in operators.scripts :
                file=ceval_script(cobject,deep,recurse_list=recurse_list) # Does return a filename, or list of filenames
                cdedent()
                if ( format == 'file' ) : return (file)
                else : return cread(file,classes.varOf(cobject))
            elif cobject.operator in operators.operators :
                obj=ceval_operator(cobject,deep)
                cdedent()
                if (format == 'file' ) : 
                    rep=cstore(obj) ; return rep
                else : return(obj)
            else : 
                raise Climaf_Driver_Error("operator %s is not a script nor known operator",str(cobject.operator))
        elif isinstance(cobject,classes.scriptChild) :
            # Force evaluation of 'father' script
            if ceval_script(cobject.father,deep,recurse_list=recurse_list)  is not None :
                # Re-evaluate, which should succeed using cache
                rep=ceval(cobject,userflags,format,deep,recurse_list=recurse_list)
                cdedent()
                return rep
            else :
                raise Climaf_Driver_Error("generating script aborted for "+cobject.father.crs)
        elif isinstance(cobject,classes.cpage) :
            file=cfilePage(cobject, deep, recurse_list=recurse_list)
            cdedent()
            if ( format == 'file' ) : return (file)
            else : return cread(file)
        elif isinstance(cobject,classes.cpage_pdf) :
            file=cfilePage_pdf(cobject, deep, recurse_list=recurse_list)
            cdedent()
            if ( format == 'file' ) : return (file)
            else : return cread(file)
        elif isinstance(cobject,classes.cens) :
            d=dict()
            for member in cobject.order :
                # print ("evaluating member %s"%member)
                d[member]=ceval(cobject[member],copy.copy(userflags),format,deep,recurse_list=recurse_list)
            if (format=="file") : return(reduce(lambda x,y : x+" "+y, [ d[m] for m in cobject.order ]))
            else : return d
        else :
            raise Climaf_Driver_Error("Internal logic error")
    elif isinstance(cobject,str) :
        clogger.debug("Evaluating object from crs : %s"%cobject)
        raise Climaf_Driver_Error("Evaluation from CRS is not yet implemented ( %s )"%cobject)
    else :
        raise Climaf_Driver_Error("argument " +`cobject`+" is not (yet) managed")



def ceval_script (scriptCall,deep,recurse_list=[]):
    """ Actually applies a CliMAF-declared script on a script_call object 
    
    Prepare operands as fiels and build command from operands and parameters list
    Assumes that scripts are described in dictionary 'scripts'  by templates as
    documented in operators.cscript
    
    Returns a CLiMAF cache data filename
    """
    script=operators.scripts[scriptCall.operator]
    template=Template(script.command)
    # Evaluate input data 
    invalues=[]
    sizes=[]
    for op in scriptCall.operands :
        if op :
            if scriptCall.operator!='remote_select' and \
               isinstance(op,classes.cdataset) and \
               not (op.isLocal() or op.isCached()) :
                inValue=ceval(op,format='file',deep=deep)
            else:
                inValue=ceval(op,userflags=scriptCall.flags,format='file',deep=deep,
                          recurse_list=recurse_list)
            clogger.debug("evaluating %s operand %s as %s"%(scriptCall.operator,op,inValue))
            if inValue is None or inValue is "" :
                raise Climaf_Driver_Error("When evaluating %s : value for %s is None"\
                                      %(scriptCall.script,`op`))
            if isinstance(inValue,list) : size=len(inValue)
            else : size=1
        else:
            inValue=''
            size=0
        sizes.append(size)
        invalues.append(inValue)
    #print("len(invalues)=%d"%len(invalues))
    #
    # Replace input data placeholders with filenames
    subdict=dict()
    opscrs=""
    if 0 in script.inputs :
        label,multiple,serie=script.inputs[0]
        op=scriptCall.operands[0]
        #print("processing 0, op=%s"%`op`)
        infile=invalues[0]
        if (scriptCall.operator != 'remote_select') and \
               not all(map(os.path.exists,infile.split(" "))) :
            raise Climaf_Driver_Error("Internal error : some input file does not exist among %s:"%(infile))
        subdict[ label ]=infile
        #if scriptCall.flags.canSelectVar :
        subdict["var"]=classes.varOf(op)
        if isinstance(op,classes.cdataset) and op.alias:
            filevar,scale,offset,units,filenameVar,missing=op.alias
            if scriptCall.flags.canAlias and "," not in classes.varOf(op) :
                #if script=="select" and ((classes.varOf(op) != filevar) or scale != 1.0 or offset != 0.) :
                subdict["var"]=Template(filevar).safe_substitute(op.kvp)
                subdict["alias"]="%s,%s,%.4g,%.4g"%(classes.varOf(op),subdict["var"],scale,offset)
            if units : subdict["units"]=units 
            if scriptCall.flags.canMissing and missing :
                subdict["missing"]=missing
        if isinstance(op,classes.cens) :
            if not multiple :
                raise Climaf_Driver_Error(
                    "Script %s 's input #%s cannot accept ensemble %s"\
                        %(scriptCall.script,0,`op`))
            #subdict["labels"]=r'"'+reduce(lambda x,y : "'"+x+"' '"+y+"'", op.labels)+r'"'
            subdict["labels"]=reduce(lambda x,y : x+"$"+y, op.order)
        if op : 
            per=timePeriod(op)
            if per and not per.fx and str(per) != "" and scriptCall.flags.canSelectTime:
                subdict["period"]=str(per)
                subdict["period_iso"]=per.iso()
        if scriptCall.flags.canSelectDomain :
            subdict["domain"]=classes.domainOf(op)
    i=0
    for op in scriptCall.operands :
        if op : opscrs += op.crs+" - "
        #print("processing %s, i=%d"%(`op`,i))
        infile=invalues[i]
        if (scriptCall.operator != 'remote_select') and infile is not '' and \
               not all(map(os.path.exists,infile.split(" "))) :
            raise Climaf_Driver_Error("Internal error : some input file "
                                      "does not exist among %s:"%(infile))
        i+=1
        if ( i> 1 or 1 in script.inputs) :
            label,multiple,serie=script.inputs[i]
            subdict[ label ]=infile
            # Provide the name of the variable in input file if script allows for
            subdict["var_%d"%i]=classes.varOf(op)
            if isinstance(op,classes.cdataset) and op.alias :
                filevar,scale,offset,units,filenameVar,missing =op.alias
                if ((classes.varOf(op) != filevar) or (scale != 1.0) or (offset != 0.)) and \
                        "," not in classes.varOf(op):
                    subdict["var_%d"%i]=Template(filevar).safe_substitute(op.kvp)
                    subdict["alias_%d"%i]="%s %s %f %f"%(classes.varOf(op),subdict["var_%d"%i],scale,offset)
                if units : subdict["units_%d"%i]=units 
                if missing : subdict["missing_%d"%i]=missing
                # Provide period selection if script allows for
            if op :
                per=timePeriod(op)
                if not per.fx and per != "":
                    subdict["period_%d"%i]=str(per)
                    subdict["period_iso_%d"%i]=per.iso()
            subdict["domain_%d"%i]=classes.domainOf(op)
    clogger.debug("subdict for operands is "+`subdict`)
    # substitution is deffered after scriptcall parameters evaluation, which may
    # redefine e.g period
    #
    # Provide one cache filename for each output and instantiates the command accordingly
    if script.outputFormat not in operators.none_formats :
        if script.outputFormat=="graph" :
            if 'format' in scriptCall.parameters :
                if scriptCall.parameters['format'] in operators.graphic_formats :
                    output_fmt=scriptCall.parameters['format']
                else: 
                    raise Climaf_Driver_Error('Allowed graphic formats yet are : %s'%', '.join([repr(x) for x in operators.graphic_formats]))
            else : #default graphic format 
                output_fmt="png"
        else:
            output_fmt=script.outputFormat
        # Compute a filename for each ouptut
        # Un-named main output
        #main_output_filename=tempfile.NamedTemporaryFile(suffix="."+output_fmt).name #cache.generateUniqueFileName(scriptCall.crs, format=output_fmt)
        tmpfile,tmpfile_fmt=os.path.splitext(cache.generateUniqueFileName(scriptCall.crs, format=output_fmt))
        main_output_filename="%s_%i%s"%(tmpfile,os.getpid(),tmpfile_fmt) 

        subdict["out"]=main_output_filename
        subdict["out_"+classes.varOf(scriptCall)]=main_output_filename

        subdict["out_final"]=cache.generateUniqueFileName(scriptCall.crs, format=output_fmt) 
        subdict["out_final_"+classes.varOf(scriptCall)]=cache.generateUniqueFileName(scriptCall.crs, format=output_fmt) 
        
        # Named outputs
        for output in scriptCall.outputs:
            #subdict["out_"+output]=tempfile.NamedTemporaryFile(suffix="."+output_fmt).name
            tmpfile,tmpfile_fmt=os.path.splitext(cache.generateUniqueFileName(scriptCall.crs+"."+output, format=output_fmt))
            subdict["out_"+output]="%s_%i%s"%(tmpfile,os.getpid(),tmpfile_fmt)
            subdict["out_final_"+output]=cache.generateUniqueFileName(scriptCall.crs+"."+output,\
                                                         format=output_fmt)

    # Account for script call parameters
    for p in scriptCall.parameters : 
        #clogger.debug("processing parameter %s=%s"%(p,scriptCall.parameters[p]))
        subdict[p]=scriptCall.parameters[p]
        if p=="period" :
            subdict["period_iso"]=init_period(scriptCall.parameters[p]).iso()
    subdict["crs"]=opscrs.replace("'","")
    #
    #print("subdict="+`subdict`) 
    # Combine CRS and possibly member_label to provide/complement title 
    if 'title' not in subdict :
        if 'member_label' in subdict :
            subdict["title"]=subdict['member_label']
#        else:
#            subdict["title"]=subdict["crs"]
    else:
        #print("Got a member label : %s"%subdict['member_label'])
        if 'member_label' in subdict :
            subdict["title"]=subdict["title"]+" "+subdict['member_label']
            subdict.pop('member_label')
    #
    # Substitute all args
    template=template.safe_substitute(subdict)
    #
    # Allowing for some formal parameters to be missing in the actual call:
    #
    # Discard remaining substrings looking like :
    #  some_word='"${some_keyword}"' , or simply : '"${some_keyword}"'
    template=re.sub(r'(\w*=)?(\'\")?\$\{\w*\}(\"\')?',r"",template)
    #
    # Discard remaining substrings looking like :
    #  some_word=${some_keyword}  or  simply : ${some_keyword}
    template=re.sub(r"(\w*=)?\$\{\w*\}",r"",template)
    #
    # Link the fixed fields needed by the script/operator
    if script.fixedfields is not None :
        #subdict_ff=dict()
        subdict_ff=scriptCall.parameters.copy()
        subdict_ff["model"]=classes.modelOf(scriptCall.operands[0])
        subdict_ff["simulation"]=classes.simulationOf(scriptCall.operands[0])
        subdict_ff["project"]=classes.projectOf(scriptCall.operands[0])
        subdict_ff["realm"]=classes.realmOf(scriptCall.operands[0])
        subdict_ff["grid"]=classes.gridOf(scriptCall.operands[0])
        l=script.fixedfields #return paths: (linkname, targetname)
        files_exist=dict()
        for ll,lt in l:
            #Replace input data placeholders with filenames for fixed fields
            template_ff_target=Template(lt).substitute(subdict_ff)
            # symlink if needed
            files_exist[ll]=False
            if os.path.isfile(ll): files_exist[ll]=True
            else: os.system("ln -s "+template_ff_target+" "+ll)   
    #
    tim1=time.time()
    clogger.info("Launching command:"+template)
    #
    logfile=open(logdir+'/last.out', 'w')
    logfile.write("\n\nstdout and stderr of script call :\n\t "+template+"\n\n")
    try :
        subprocess.check_call(template, stdout=logfile, stderr=subprocess.STDOUT, shell=True)
    except subprocess.CalledProcessError, inst :
        logfile.close()
        raise Climaf_Driver_Error("Something went wrong - Type 'cerr()' for details")

    logfile.close()
    #
    # For remote files, we supply ds.local_copies_of_remote_files
    # for local filenames in order to can use ds.check()
    if scriptCall.operator == 'remote_select' :
        local_filename=[]
        for el in scriptCall.operands[0].baseFiles().split(" "):
            local_filename.append(climaf.dataloc.remote_to_local_filename(el))            
        scriptCall.operands[0].local_copies_of_remote_files=' '.join(local_filename)
    #
    # Clean fixed fields symbolic links (linkname, targetname)
    if script.fixedfields :
        for ll,lt in script.fixedfields:
            if files_exist[ll] == False: os.system("rm -f "+ll)  
    # Handle ouptuts
    if script.outputFormat=="txt" : 
        with open(logdir+"/last.out",'r') as f:
            for line in f.readlines() :
                sys.stdout.write(line)
    if script.outputFormat in operators.none_formats : return None
    # Tagging output files with their CliMAF Reference Syntax definition
    # 1 - Un-named main output
    ok = cache.register(main_output_filename,scriptCall.crs, subdict["out_final"])
    # 2 - Named outputs
    for output in scriptCall.outputs:
        ok = ok and cache.register(subdict["out_"+output], scriptCall.crs+"."+output, subdict["out_final_"+output])
    if ok : 
        duration=time.time() - tim1
        #print(...file=sys.stderr)
        clogger.info("Done in %.1f s with script computation for "
                     "%s (command was :%s )"% (duration,`scriptCall`,template))
        return subdict["out_final"] #main_output_filename
    else : 
        raise Climaf_Driver_Error("Some output missing when executing "
                                          ": %s. \n See %s/last.out"%(template,logdir))

def timePeriod(cobject) :
    """ Returns a time period for a CliMAF object : if object is a dataset, returns
    its time period, otherwise returns time period of first operand
    """
    if isinstance(cobject,classes.cdataset) : return cobject.period
    elif isinstance(cobject,classes.ctree) :
        clogger.debug("for now, timePeriod logic for scripts output is basic (1st operand) - TBD")
        return timePeriod(cobject.operands[0])
    elif isinstance(cobject,classes.scriptChild) :
        clogger.debug("for now, timePeriod logic for scriptChilds is basic - TBD")
        return timePeriod(cobject.father)
    elif isinstance(cobject,classes.cens) :
        clogger.debug("for now, timePeriod logic for 'cens' objet is basic (1st member)- TBD")
        return timePeriod(cobject.values()[0])
    else : return None #clogger.error("unkown class for argument "+`cobject`)
 
                  
def ceval_select(includer,included,userflags,format,deep,derived_list,recurse_list) :
    """ Extract object INCLUDED from (existing) object INCLUDER,
    taking into account the capability of the user process (USERFLAGS)
    and the required delivering FORMAT(file or object)
    """
    if format=='file' : 
        if userflags.canSelectTime or userflags.canSelectDomain:
            clogger.debug("TBD - should do smthg smart when user can select time or domain")
            #includer.setperiod(included.period)
        incperiod=timePeriod(included)
	clogger.debug("extract sub period %s out of %s"%(`incperiod`,includer.crs))
        extract=capply('select',includer, period=`incperiod`)
        objfile=ceval(extract,userflags,'file',deep,derived_list,recurse_list)
	if objfile :
            crs=includer.buildcrs(period=incperiod)
            return(cache.rename(objfile,crs))
        else :
            clogger.critical("Cannot evaluate "+`extract`)
            exit()
    else :
        clogger.error("Can yet process only files - TBD")


def cread(datafile,varname=None, period=None):
    import re
    if not datafile : return(None)
    if re.findall(".png$",datafile) or \
           ( (re.findall(".pdf$",datafile) or re.findall(".eps$",datafile)) and (not xdg_bin) ):
        subprocess.Popen(["display",datafile,"&"])
    elif ( (re.findall(".pdf$",datafile) or re.findall(".eps$",datafile)) and xdg_bin ) :
        subprocess.Popen(["xdg-open",datafile])
    elif re.findall(".nc$",datafile) :
        clogger.debug("reading NetCDF file %s"%datafile)
        if varname is None: varname=varOfFile(datafile)
        if varname is None: raise Climaf_Driver_Error("")
        if period is not None : clog.warning("Cannot yet select on period (%s) using CMa for files %s - TBD"%(period,files))
        from anynetcdf import ncf
        fileobj=ncf(datafile)
        #import netCDF4
        #fileobj=netCDF4.Dataset(datafile)
        # Note taken from the CDOpy developper : .data is not backwards 
        # compatible to old scipy versions, [:] is
        data=fileobj.variables[varname][:]
        import numpy.ma
        if '_FillValue' in dir(fileobj.variables[varname]) :
            fillv=fileobj.variables[varname]._FillValue
            rep= numpy.ma.array(data,mask = data==fillv)
        else : 
            rep= numpy.ma.array(data)
        fileobj.close()
        return(rep)
    else :
        clogger.error("cannot yet handle %s"%datafile)
        return None

def cview(datafile):
    if re.findall(".png$",datafile) or \
           ( (re.findall(".pdf$",datafile) or re.findall(".eps$",datafile)) and (not xdg_bin) ) :
        subprocess.Popen(["display",datafile,"&"])
    elif ( (re.findall(".pdf$",datafile) or re.findall(".eps$",datafile)) and xdg_bin) :
        subprocess.Popen(["xdg-open",datafile])
    else :
        clogger.error("cannot yet handle %s"%datafile)
        return None

def derive_variable(ds):
    """ Assuming that variable of DS is a derived variable, returns the CliMAF object
    representing the operation needed to compute it (using information in dict 
    operators.derived_variable
    """
    if not isinstance(ds,classes.cdataset):
        raise Climaf_Driver_Error("arg is not a dataset")
    if not operators.is_derived_variable(ds.variable,ds.project) :
        raise Climaf_Driver_Error("%s is not a derived variable"%ds.variable)
    op,outname,inVarNames,params=operators.derived_variable(ds.variable,ds.project)
    inVars=[]
    for varname in inVarNames :
        dic=ds.kvp.copy()
        dic['variable']=varname
        inVars.append(classes.cdataset(**dic))
    father=capply(op,*inVars,**params)
    if (outname == "out"): rep=father
    else : rep=scriptChild(father,outname)
    rep.variable=ds.variable
    return rep

def set_variable(obj, varname, format) :
    """ Change to VARNAME the variable name for OBJ, which FORMAT 
    maybe 'file' or 'MaskedArray'. 
    Also set the variable long_name using CF convention (TBD)
    """
    if obj is None : return None
    long_name=CFlongname(varname)
    if (format=='file') :
        oldvarname=varOfFile(obj)
        if not oldvarname : 
            raise Climaf_Driver_Error("Cannot set variable name for a multi-variable dataset")
        if (oldvarname != varname) :
            command="ncrename -v %s,%s %s >/dev/null 2>&1"%(oldvarname,varname,obj)
            if ( os.system(command) != 0 ) :
                clogger.error("Issue with changing varname to %s in %s"%(varname,obj))
                return None
            clogger.debug("Varname changed to %s in %s"%(varname,obj))
            command="ncatted -a long_name,%s,o,c,%s %s"%(varname,long_name,obj)
            if ( os.system(command) != 0 ) :
                clogger.error("Issue with changing long_name for var %s in %s"%
                              (varname,obj))
                return None
            return True
    elif (format=='MaskedArray') :
        clogger.warning('TBD - Cannot yet set the varname for MaskedArray')
    else :
        clogger.error('Cannot handle format %s'%format)

def noselect(userflags, ds, format) :
    """ Check the capability of the user process (USERFLAGS)
    and a set of attribute values of dataset (DS)

    Return True if the user can select the data and aggregate time,
    and requested FORMAT is 'file', and False otherwise          
    """
    
    can_select=False
    
    if ((userflags.canSelectVar     or ds.oneVarPerFile()   ) and \
        (userflags.canSelectTime    or ds.periodIsFine()    ) and \
        (userflags.canSelectDomain  or ds.domainIsFine()    ) and \
        (userflags.canAggregateTime or ds.periodHasOneFile()) and \
        (userflags.canAlias         or ds.hasExactVariable()) and \
        (userflags.canMissing       or ds.missingIsOK())      and \
        (ds.hasOneMember()) and \
        (format == 'file')) :

        can_select=True

    return(can_select)


# Commodity functions
#########################

[docs]def cfile(object,target=None,ln=None,hard=None,deep=None) : """ Provide the filename for a CliMAF object, or copy this file to target. Launch computation if needed. Args: object (CliMAF object) : either a dataset or a 'compound' object (e.g. the result of a CliMAF operator) target (str, optional) : name of the destination file in case you really need another filename; CliMAF will then anyway also store the result in its cache, either as a copy (default), or a sym- or a hard-link (see below); ln (logical, optional) : if True, CliMAF cache file is created as a symlink to the target; this allows to cross filesystem boundaries, while still saving disk space (wrt to a copy); CliMAF will manage the broken link cases (at the expense of a new computation) hard (logical, optional) : if True, CliMAF cache file is created as a hard link to the target; this allows to save disk space, but does not allow to cross filesystem boundaries deep (logical, optional) : governs the use of cached values when computing the object: - if missing, or None : use cache as much as possible (speed up the computation) - False : make a shallow computation, i.e. do not use cached values for the top level operation - True : make a deep computation, i.e. do not use any cached value Returns: - if target is provided, returns this filename (or linkname) if computation is successful ('target' contains the result), and None otherwise; - if target is not provided, returns the filename in CliMAF cache, which contains the result (and None if failure) """ clogger.debug("cfile called on "+str(object)) result=ceval(object,format='file',deep=deep) if target is None : return result else : target=os.path.abspath(os.path.expanduser(target)) if (isinstance(object,climaf.classes.cens)) : raise Climaf_Driver_Error("Cannot yet copy or link result files for an ensemble") if result is not None : if ln or hard : if ln and hard : Climaf_Driver_Error("flags ln and hard are mutually exclusive") elif ln : if os.path.exists(target) : if not os.path.samefile(result,target): os.remove(target) shutil.move(result,target) os.symlink(target,result) else: if not os.path.exists(os.path.dirname(target)): os.makedirs(os.path.dirname(target)) shutil.move(result,target) os.symlink(target,result) else: # Must create hard link # If result is a link, follow links for finding source of hard link if os.path.islink(result) : source=os.readlink(result) else : source=result if (source == target): # This is a case where the file had already been symlinked to the same target name shutil.move(source,result) os.link(result,target) else : if os.path.exists(target) : os.remove(target) os.link(source,target) else : shutil.copyfile(result,target) return target
[docs]def cshow(obj) : """ Provide the in-memory value of a CliMAF object. For a figure object, this will lead to display it ( launch computation if needed. ) """ clogger.debug("cshow called on "+str(obj)) return climaf.driver.ceval(obj,format='MaskedArray')
[docs]def cMA(obj,deep=None) : """ Provide the Masked Array value for a CliMAF object. Launch computation if needed. Args: obj (CliMAF object) : either a datset or a 'compound' object (like the result of a CliMAF standard operator) deep (logical, optional) : governs the use of cached values when computing the object - if missing, or None : use cache as much as possible - False : make a shallow computation, i.e. do not use cached values for top level operation - True : make a deep computation, i.e. do not use any cached value Returns: a Masked Array containing the object's value """ clogger.debug("cMA called with arguments"+str(obj)) return climaf.driver.ceval(obj,format='MaskedArray',deep=deep)
[docs]def cvalue(obj,index=0) : """ Return the value of the array for an object, after MV flattening, at a given index Example: >>> data=ds(project='mine',variable='tas', ...) >>> data1=time_average(data) >>> data2=space_average(data1) >>> v=cvalue(data2) Does use the file representation of the object """ return cMA(obj).data.flat[index]
def cexport(*args,**kwargs) : """ Alias for climaf.driver.ceval. Create synonyms for arg 'format' """ clogger.debug("cexport called with arguments"+str(args)) if "format" in kwargs : if (kwargs['format']=="NetCDF" or kwargs['format']=="netcdf" or kwargs['format']=="nc" \ or kwargs['format']=="png" or kwargs['format']=="pdf" or kwargs['format']=="eps") : kwargs['format']="file" if (kwargs['format']=="MA") : kwargs['format']="MaskedArray" return climaf.driver.ceval(*args,**kwargs) def cimport(cobject,crs) : clogger.debug("cimport called with argument",cobject) clogger.debug("should check syntax of arg 'crs' -TBD") clogger.warning("cimport is not for the dummies - Playing at your own risks !") import numpy, numpy.ma if isinstance(cobject,numpy.ma.MaskedArray) : clogger.debug("for now, use a file for importing - should revisit - TBD") clogger.error("not yet implemented fro Masked Arrays - TBD") elif isinstance(cobject,str) : cache.register(cobject,crs) else : clogger.error("argument is not a Masked Array nor a filename",cobject) def cfilePage(cobj, deep, recurse_list=None) : """ Builds a page with CliMAF figures, computing associated crs Args: cobj (cpage object) Returns : the filename in CliMAF cache, which contains the result (and None if failure) """ if not isinstance(cobj,classes.cpage): raise Climaf_Driver_Error("cobj is not a cpage object") clogger.debug("Computing figure array for cpage %s"%(cobj.crs)) # # page size and creation page_size="%dx%d"%(cobj.page_width, cobj.page_height) args=["convert", "-size", page_size, "xc:white"] # # margins x_left_margin=30. # Left shift at start and end of line y_top_margin=30. # Initial vertical shift for first line x_right_margin=30. # Right shift at start and end of line y_bot_margin=30. # Vertical shift for last line xmargin=30. # Horizontal shift between figures ymargin=30. # Vertical shift between figures # if cobj.title is "": usable_height=cobj.page_height-ymargin*(len(cobj.heights)-1.)-y_top_margin -y_bot_margin else: usable_height=cobj.page_height-ymargin*(len(cobj.heights)-1.)-y_top_margin -y_bot_margin-cobj.ybox usable_width=cobj.page_width -xmargin*(len(cobj.widths)-1.) -x_left_margin-x_right_margin # # page composition y=y_top_margin for line, rheight in zip(cobj.fig_lines, cobj.heights) : # Line height in pixels height=usable_height*rheight x=x_left_margin max_old=0. for fig, rwidth in zip(line, cobj.widths) : # Figure width in pixels width=usable_width*rwidth scaling="%dx%d+%d+%d" %(width,height,x,y) if fig : figfile=ceval(fig,format="file", deep=deep, recurse_list=recurse_list) else : figfile='xc:None' clogger.debug("Compositing figure %s",fig.crs if fig else 'None') args.extend([figfile, "-geometry", scaling, "-composite" ]) # Real size of figure in pixels: [fig_width x fig_height] args_figsize=["identify", figfile] comm_figsize=subprocess.Popen(args_figsize, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output_figsize=comm_figsize.stdout.read() figsize=output_figsize.split(" ").pop(2) fig_width=figsize.split("x").pop(0) fig_height=figsize.split("x").pop(1) # Scaling and max height if float(fig_width) != 1. and float(fig_height) != 1. : if ( (float(fig_width)/float(fig_height))*float(height) ) < width: new_fig_width=( float(fig_width)/float(fig_height) )*float(height) new_fig_height=height else: new_fig_height=( float(fig_height)/float(fig_width) )*float(width) new_fig_width=width else: # for figure = 'None' new_fig_height=fig_height new_fig_width=fig_width max_fig_height=max( float(new_fig_height),max_old) max_old=float(new_fig_height) if False and cobj.fig_trim and ( float(fig_width)/float(fig_height) < width/height ): width_adj=float(fig_width)*(height/float(fig_height)) x+=width_adj+xmargin else: x+=width+xmargin if cobj.fig_trim and ( float(fig_width)/float(fig_height) > width/height ): height_adj=max_fig_height y+=height_adj+ymargin else: y+=height+ymargin out_fig=cache.generateUniqueFileName(cobj.buildcrs(), format=cobj.format) if cobj.page_trim : args.append("-trim") if cobj.title != "": splice="0x%d" %(cobj.ybox) annotate="+%d+%d" %(cobj.x, cobj.y) args.extend(["-gravity", cobj.gravity, "-background", cobj.background, \ "-splice", splice, "-font", cobj.font, "-pointsize", "%d"%cobj.pt, \ "-annotate", annotate, cobj.title]) args.append(out_fig) clogger.debug("Compositing figures : %s"%`args`) comm=subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if comm.wait()!=0 : raise Climaf_Driver_Error("Compositing failed : %s" %comm.stderr.read()) if cache.register(out_fig,cobj.crs) : clogger.debug("Registering file %s for cpage %s"%(out_fig,cobj.crs)) return out_fig def cfilePage_pdf(cobj, deep, recurse_list=None) : """ Builds a PDF page with CliMAF figures using pdfjam, computing associated crs Args: cobj (cpage_pdf object) Returns : the filename in CliMAF cache, which contains the result (and None if failure) """ if not isinstance(cobj,classes.cpage_pdf): raise Climaf_Driver_Error("cobj is not a cpage_pdf object") clogger.debug("Computing figure array for cpage %s"%(cobj.crs)) # # margins xmargin=30. # Horizontal shift between figures ymargin=30. # Vertical shift between figures # # page size and creation page_size="{%dpx,%dpx}"%(cobj.page_width, cobj.page_height) fig_nb="%dx%d"%(len(cobj.fig_lines[0]),len(cobj.fig_lines)) fig_delta="%d %d"%(xmargin, ymargin) preamb="\pagestyle{empty} \usepackage{hyperref} \usepackage{graphicx} \usepackage{geometry} \geometry{vmargin=%dcm,hmargin=2cm}"%cobj.y args=["pdfjam","--keepinfo", "--preamble", preamb, "--papersize", page_size, "--delta", fig_delta, "--nup", fig_nb] #"%s"%preamb # # page composition for line in cobj.fig_lines : for fig in line : if fig : figfile=ceval(fig,format="file", deep=deep, recurse_list=recurse_list) clogger.debug("Compositing figure %s",fig.crs) else : raise Climaf_Driver_Error("Each figure must exist ('None' figure is not accepted)") args.extend([figfile]) # # more optional options if cobj.openright is True: args.extend(["--openright", "True"]) if cobj.scale != 1.: args.extend(["--scale", "%.2f"%cobj.scale]) if cobj.title != "": if "\\" in cobj.pt: pt=cobj.pt.split("\\")[-1] else: pt=cobj.pt if cobj.titlebox : latex_command="\\begin{center} \\hspace{%dcm} \\setlength{\\fboxrule}{0.5pt} \\setlength{\\fboxsep}{2mm} \\fcolorbox{black}{%s}{\%s{\\fontfamily{%s}\\selectfont %s}} \\end{center}"%(cobj.x, cobj.background, pt, cobj.font, cobj.title) else: latex_command="\\begin{center} \\hspace{%dcm} \%s{\\fontfamily{%s}\\selectfont %s} \\end{center}"\ %(cobj.x, pt, cobj.font, cobj.title) args.extend(["--pagecommand", latex_command]) # # launch process and registering output in cache out_fig=cache.generateUniqueFileName(cobj.buildcrs(), format='pdf') args.extend(["--outfile", out_fig]) clogger.debug("Compositing figures : %s"%`args`) comm=subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if comm.wait()!=0 : raise Climaf_Driver_Error("Compositing failed : %s" %comm.stderr.read()) if cache.register(out_fig,cobj.crs) : clogger.debug("Registering file %s for cpage %s"%(out_fig,cobj.crs)) return out_fig
[docs]def calias(project,variable,fileVariable=None,**kwargs): """ See :py:func:`climaf.classes.calias` Declare that in ``project``, ``variable`` is to be computed by reading ``filevariable``; It allows to use a list of variable, given as a string where the name of variables are separated by commas """ if not "," in variable: # mono-variable classes.calias(project=project,variable=variable,fileVariable=fileVariable,**kwargs) else : #multi-variable classes.calias(project=project,variable=variable,fileVariable=fileVariable,**kwargs) list_variable=variable.split(",") for v in list_variable: operators.derive(project,v,'ccdo',variable,operator='selname,%s'%v) classes.calias(project=project,variable=v,fileVariable=None,**kwargs)
def CFlongname(varname) : """ Returns long_name of variable VARNAME after CF convention """ return("TBD_should_improve_function_climaf.driver.CFlongname")
[docs]def efile(obj, filename, forced=False) : """ Create a single file for an ensemble of CliMAF objects (launch computation if needed). This is a convenience function. Such files are not handled in CliMAF cache Args: obj (CliMAF object) : an ensemble of CliMAF objects ('cens' objet) filename (str) : output filename. It will include a field for each ensemble's member, with a variable name suffixed by the member label (e.g. : tas_CNRM-CM, tas_IPSL-CM... ) (more formally : 'var(obj.order[n])'_'obj.ens[order[n]]') forced (logical, optional) : if True, CliMAF will override the file 'filename' if it already exists """ if isinstance(obj,classes.cens) : if os.path.isfile(filename): if forced: os.system("rm -rf %s" %filename) clogger.warning("File '%s' already existed and has been overriden" %filename) else: raise Climaf_Driver_Error("File '%s' already exists: use 'forced=True' to override it" %filename) for lab in obj.order: memb=obj[lab] ffile=cfile(memb) f = tempfile.NamedTemporaryFile(suffix=".nc") command="ncrename -O -v %s,%s_%s %s %s"%(classes.varOf(memb), classes.varOf(memb), lab, ffile, f.name) if ( os.system(command) != 0 ) : raise Climaf_Driver_Error("ncrename failed : %s" %command) command2="ncks -A %s %s"%(f.name,filename) if ( os.system(command2) != 0 ) : raise Climaf_Driver_Error("Issue when merging %s and %s (using command: %s)"%(f.name,filename,command2)) f.close() else: clogger.warning("objet is not a 'cens' objet")
class Climaf_Driver_Error(Exception): def __init__(self, valeur): self.valeur = valeur clogger.error(self.__str__()) cdedent(100) def __str__(self): return `self.valeur`