CliMAF handling of external scripts and binaries , and of internal operators (Python funcs)


# Created : S.Senesi - 2014

import os, re, sys, subprocess
import driver
from clogging import clogger, dedent

# Next definition can be splitted in a set managed by an administrator, and
# other sets managed and fed by users. But it should be enforced that no redefinition
# occurs for some really basic operators (should it ?)


class scriptFlags():
    def __init__(self,canOpendap=False, canSelectVar=False, 
                 canSelectTime=False, canSelectDomain=False, 
                 canAggregateTime=False, canAlias=False,
                 canMissing=False, commuteWithEnsemble=True,
    def unset_selectors(self) :
        self.canSelectVar=False ; 
        self.canAlias=False ; 

[docs]class cscript(): def __init__(self,name, command, format="nc", canOpendap=False, commuteWithTimeConcatenation=False, commuteWithSpaceConcatenation=False, canSelectVar=False, **kwargs): """ Declare a script or binary as a 'CliMAF operator', and define a Python function with the same name Args: name (str): name for the CliMAF operator. command (str): script calling sequence, according to the syntax described below. format (str): script outputs format -- either 'nc', 'png', 'pdf', 'eps', 'None' or 'graph' ('graph' allows to the user to choose three different graphic output formats: 'png', 'pdf' or 'eps') or 'txt' (the text output are not managed by CliMAF, but only displayed - 'txt' allows to use e.g. 'ncdump -h' from inside CliMAF); defaults to 'nc' canOpendap (bool, optional): is the script able to use OpenDAP URIs ? default to False commuteWithTimeConcatenation (bool, optional): can the operation commute with concatenation of time periods ? set it to true, if the operator can be applied on time chunks separately, in order to allow for incremental computation / time chunking; defaults to False commuteWithSpaceConcatenation (bool, optional): can the operation commute with concatenation of space domains ? defaults to False (see commuteWithTimeConcatenation) **kwargs : possible keyword arguments, with keys matching '<outname>_var', for providing a format string allowing to compute the variable name for output 'outname' (see below). Returns: None The script calling sequence pattern string (arg 'command') indicates how to build the system call which actually launches the script, with a match between python objects and formal arguments; For introducing the syntax, please consider this example, with the following commands:: >>> cscript('mycdo','cdo ${operator} ${in} ${out}') >>> # define some dataset >>> tas_ds = ds(project='example', simulation='AMIPV6ALB2G', variable='tas', period='1980-1981') >>> # Apply operator 'mycdo' to dataset 'tas_ds', choosing a given 'operator' argument >>> tas_avg = mycdo(tas_ds,operator='timavg') CliMAF will later on launch this call behind the curtain:: $ cdo tim_avg /home/my/tmp/climaf_cache/8a/ /home/my/tmp/climaf_cache/4e/ where : - the last filename is generated by CliMAF from the formal expression describing 'tas_avg', and will receive the result - the first filename provides a file generated by CliMAF which includes the data required for tas_ds There are a number of examples declared in module :download:`standard_operators <../climaf/>`. **Detailed syntax**: - formal arguments appear as : ``${argument}`` (in the example : ``${in}``, ``${out}``, ``${operator}`` ) - except for reserved keywords, arguments in the pattern will be replaced by the values for corresponding keywords used when invoking the diagnostic operator: - in the example above : argument ``operator`` is replaced by value ``timavg``, which is a keyword known to the external binary called, CDO - reserved argument keywords are : - **in, in_<digit>, ins, ins_<digit>, mmin** : they will be replaced by CliMAF managed filenames for input data, as deduced from dataset description or upstream computation; these filenames can actually be remote URLs (if the script can use OpenDAP, see args), local 'raw' data files, or CliMAF cache filenames - **in** stands for the URL of the first dataset invoked in the operator call - **in_<digit>** stands for the next ones, in the same order - **ins** and **ins_<digit>** stand for the case where the script can select input from multiple input files or URLs (e.g. when the whole period to process spans over multiple files); in that case, a single string (surrounded with double quotes) will carry multiple URLs - **mmin** stands for the case where the script accepts an ensemble of datasets (only for first input stream yet). CliMAF will replace the keyword by a string composed of the corresponding input filenames (not surrounded by quotes - please add them yourself in declaration); see also ``labels`` below - **var, var_<digit>** : when a script can select a variable in a multi-variable input stream, this is declared by adding this keyword in the calling sequence; CliMAF will replace it by the actual variable name to process; 'var' stands for first input stream, 'var_<digit>' for the next ones; - in the example above, we assume that external binary CDO is not tasked with selecting the variable, and that CliMAF must feed CDO with a datafile where it has already performed the selection - **period, period_<digit>** : when a script can select a time period in the content of a file or stream, it should declare it by putting this keyword in the pattern, which will be replaced at call time by the period written as <date1>-<date2>, where date is formated as YYYYMMDD ; - time intervals must be interpreted as [date1, date2[ - 'period' stands for the first input_stream, - 'period_<n>' for the next ones, in the order of actual call; - in the example above, this keyword is not used, which means that CliMAF has to select the period upstream of feeding CDO with the data - **period_iso, period_iso_<digit>** : as for **period** above, except that the date formating fits CDO conventions : - date format is ISO : YYYY-MM-DDTHH:MM:SS - interval is [date1,date2_iso], where date2_iso is 1 minute before date2 - separator between dates is : , - **domain, domain_<digit>** : when a script can select a domain in the input grid, this is declared by adding this keyword in the calling sequence; CliMAF will replace it by the domain definition if needed, as 'latmin,latmax,lonmin,lonmax' ; 'domain' stands for first input stream, 'domain_<digit>' for the next ones : - in the example above, we assume that external binary CDO is not tasked with selecting the domain, and that CliMAF must feed CDO with a datafile where it has already performed the selection - **out, out_<word>** : CliMAF provide file names for output files (if there is no such field, the script will have only 'side effects', e.g. launch a viewer). Main output file must be created by the script with the name provided at the location of argument ${out}. Using arguments like 'out_<word>' tells CliMAF that the script provide some secondary output, which will be symbolically known in CliMAF syntax as an attribute of the main object; by default, the variable name of each output equals the name of the output (except for the main ouput, which variable name is supposed to be the same as for the first input); for other cases, see argument \*\*kwargs to provide a format string, used to derive the variable name from first input variable name as in e.g. : ``output2_var='std_dev(%s)'`` for the output labelled output2 (i.e. declared as '${out_output2}') - in the example above, we just apply the convention used by CDO, which expects that you provide an output filename as last argument on the command line. See example mean_and_sdev in doc for advanced usage. - **crs** : will be replaced by the CliMAF Reference Syntax expression describing the first input stream; can be useful for plot title or legend - **alias** : used if the script can make an on the fly re-scaling and renaming of a variable. Will be replaced by a string which pattern is : 'new_varname,file_varname,scale,offset'. The script should then transform on reading as new_varname = file_varname * scale + offset - **units, units_<digit>** : means that the script can set the units on-the-fly while reading one of the input streams - **missing** : means that the script can make an on-the-fly transformation of a given constant to missing values - **labels** : for script accepting ensembles, CliMAF will replace this keyword by a string bearing the labels associated with the ensemble, with delimiter $ as e.g. in: "CNRM-CM5 is fine$IPSL-CM5-LR is not bad$CCSM-29 is ..." """ # Check that script name do not clash with an existing symbol if name in sys.modules['__main__'].__dict__ and name not in scripts : clogger.error("trying to define %s as an operator, " "while it exists as smthing else"%name) return None if name in scripts : clogger.warning("Redefining CliMAF script %s"%name) # # Check now that script is executable scriptcommand=command.split(' ')[0].replace("(","") ex=subprocess.Popen(['which',scriptcommand], stdout=subprocess.PIPE) if ex.wait() != 0 : Climaf_Operator_Error("defining %s : command %s is not " "executable"%(name,scriptcommand))'\n','') # # Analyze inputs field keywords and populate dict # attribute 'inputs' with some properties self.inputs=dict() commuteWithEnsemble=True it=re.finditer( r"\${(?P<keyw>(?P<mult>mm)?in(?P<serie>s)?(_(?P<n>([\d]+)))?)}", command) for oc in it : if ("n") is not None) : rank=int("n")) else : rank=0 if rank in self.inputs : Climaf_Operator_Error( "When defining %s : duplicate declaration for input #%d"%\ (name,rank)) serie=("serie") is not None) multiple=("mult") is not None) if multiple : if rank != 0 : raise Climaf_Operator_Error( "Only first operand may accept members") if serie : raise Climaf_Operator_Error( "Operand %s cannot both accept" "members and files set""keyw")) commuteWithEnsemble=False self.inputs[rank]=("keyw"),multiple,serie) if len(self.inputs)==0 : Climaf_Operator_Error( "When defining %s : command %s must include at least one of " "${in} ${ins} ${mmin} or ${in_..} ... for specifying how CliMAF" " will provide the input filename(s)"% (name,command)) #print self.inputs for i in range(len(self.inputs)) : if i+1 not in self.inputs and not ( i == 0 and 0 in self.inputs) : Climaf_Operator_Error( "When defining %s : error in input sequence for rank %d"%\ (name,i+1)) # # Check if command includes an argument allowing for # providing an output filename if command.find("${out") < 0 : if format is not "txt" : format=None # # Search in call arguments for keywords matching "<output_name>_var" # which may provide format string for 'computing' outputs variable # name from input variable name outvarnames=dict() ; pattern=r"^(.*)_var$" for p in kwargs : if re.match(pattern,p): outvarnames[re.findall(pattern,p)[0]]=kwargs[p] clogger.debug("outvarnames for script %s = %s"%(name,`outvarnames`)) # # Analyze outputs names , associated variable names # (or format strings), and store it in attribute dict 'outputs' self.outputs=dict() it=re.finditer(r"\${out(_(?P<outname>[\w-]*))?}",command) for occ in it :"outname") if outname is not None : if (outname in outvarnames) : self.outputs[outname]=outvarnames[outname] else : self.outputs[outname]="%s"#outname else: self.outputs[None]=outvarnames.get('',"%s") self.outputs['']=outvarnames.get('',"%s") #clogger.debug("outputs = "+`self.outputs`) # canSelectVar= canSelectVar or (command.find("${var}") > 0 ) canAggregateTime=(command.find("${ins}") > 0 or command.find("${ins_1}") > 0) canAlias= (command.find("${alias}") > 0 ) canMissing= (command.find("${missing}") > 0 ) canSelectTime=False if command.find("${period}") > 0 or command.find("${period_1}") > 0 : canSelectTime=True if command.find("${period_iso}") > 0 or command.find("${period_iso_1}") > 0 : canSelectTime=True canSelectDomain=(command.find("${domain}") > 0 or command.find("${domain_1}") > 0) # self.command=command self.fixedfields=None self.flags=scriptFlags(canOpendap, canSelectVar, canSelectTime, \ canSelectDomain, canAggregateTime, canAlias, canMissing,\ commuteWithEnsemble,\ commuteWithTimeConcatenation, commuteWithSpaceConcatenation ) if format in known_formats or format in graphic_formats or format in none_formats: self.outputFormat=format else: raise Climaf_Operator_Error("Allowed formats yet are : 'object', 'nc', 'txt', %s"%', '.join([repr(x) for x in graphic_formats])) scripts[name]=self # Init doc string for the operator doc="CliMAF wrapper for command : %s"%self.command # try to get a better doc string from colocated doc/directory docfilename=os.path.dirname(__file__)+"/../doc/scripts/"+name+".rst" #print "docfilen= "+docfilename try: docfile=open(docfilename) docfile.close() except: pass # # creates a function named as requested, which will invoke # capply with that name and same arguments defs='def %s(*args,**dic) :\n """%s"""\n return driver.capply("%s",*args,**dic)\n'\ % (name,doc,name) exec defs in globals() # exec "from climaf.operators import %s"%name in \ sys.modules['__main__'].__dict__ clogger.debug("CliMAF script %s has been declared"%name) def __repr__(self): return "CliMAF operator : " def inputs_number(self): """ returns the number of distinct arguments of a script which are inputs """ l=re.findall(r"\$\{(mm)?ins?(_\d*)?\}",self.command) ls=[]; old=None for e in l : if e != old : ls.append(e) old=e return(len(ls))
[docs]def fixed_fields(operator, *paths): """ Declare that an operator (or a list of) needs fixed fields. CliMAF will provide them to the operator at execution time through symbolic links. This is 'set' type of operation, not an 'add' one : only the last call is considered (it reset the list of fields) Parameters: operator (string, or list of strings) : name of the CliMAF operator. paths (couples) : a number of couples composed of the filename as expected by the operator and a path for the data; the path may uses placeholders : ${model}, ${project}, ${simulation}, ${realm} and ${grid}, which will be replaced by the corresponding facet values for the first operand of the target operator. Returns: None Example: >>> fixed_fields('ccdftransport', ... ('','/data/climaf/${project}/${model}/'), ... ('','/data/climaf/${project}/${model}/')) >>> fixed_fields('plot', ... ('','/cnrm/ioga/Users/chevallier/chevalli/Partage/NEMO/')) """ if not isinstance(operator,list): namelist=[operator] else: namelist=operator for name_op in namelist: scripts[name_op].fixedfields=paths
class coperator(): def __init__(self,op, command, canOpendap=False, canSelectVar=False, canSelectTime=False, canSelectDomain=False, canAggregateTime=False, canAlias=False, canMissing=False, commuteWithEnsemble=False): clogger.error("Not yet developped")
[docs]def derive(project, derivedVar, Operator, *invars, **params): """ Define that 'derivedVar' is a derived variable in 'project', computed by applying 'Operator' to input streams which are datasets whose variable names take the values in ``*invars`` and the parameter/arguments of Operator take the values in ``**params`` 'project' may be the wildcard : '*' Example, assuming that operator 'minus' has been defined as :: >>> cscript('minus','cdo sub ${in_1} ${in_2} ${out}') which means that ``minus`` uses CDO for substracting the two datasets; you may define, for a given project 'CMIP5', a new variable e.g. for cloud radiative effect at the surface, named 'rscre', using the difference of values of all-sky and clear-sky net radiation at the surface by:: >>> derive('CMIP5', 'rscre','minus','rs','rscs') You may then use this variable name at any location you would use any other variable name Note : you may use wildcard '*' for the project Another example is rescaling or renaming some variable; here, let us define how variable 'ta' can be derived from ERAI variable 't' : >>> derive('erai', 'ta','rescale', 't', scale=1., offset=0.) **However, this is not the most efficient way to do that**. See :py:func:`~climaf.classes.calias()` Expert use : argument 'derivedVar' may be a dictionary, which keys are derived variable names and values are scripts outputs names; example :: >>> cscript('vertical_interp', ' ${in} surface_pressure=${in_2} ${out_l500} ${out_l850} method=${opt}') >>> derive('*', {'z500' : 'l500' , 'z850' : 'l850'},'vertical_interp', 'zg', 'ps', opt='log'} """ # Action : register the information in a dedicated dict which keys # are single derived variable names, and which will be used at the # object evaluation step # Also : some consistency checks w.r.t. script definition if Operator in scripts : if not isinstance(derivedVar,dict) : derivedVar=dict(out=derivedVar) for outname in derivedVar : if (outname != 'out' and (not getattr(Operator,"outvarnames",None) or outname not in Operator.outvarnames )): raise Climaf_Operator_Error( "%s is not a named ouput for operator %s; type help(%s)"%\ (outname,Operator,Operator)) s=scripts[Operator] if s.inputs_number() != len(invars) : clogger.error("number of input variables for operator" "%s is %d, which is inconsistent with " "script declaration : %s"\ %(,len(invars),s.command)) return # TBD : check parameters number ( need to build # its list in cscript.init() ) if project not in derived_variables : derived_variables[project]=dict() derived_variables[project][derivedVar[outname]]=(Operator, outname, list(invars), params) elif Operator in operators : clogger.warning("Cannot yet handle derived variables based on internal operators") else : clogger.error("second argument (%s) must be a script or operator, already declared"%`Operator`)
def is_derived_variable(variable,project): """ True if the variable is a derived variable, either in provided project or in wildcard project '*' """ rep= (project in derived_variables and variable in derived_variables[project] or \ "*" in derived_variables and variable in derived_variables["*"]) clogger.debug("Checking if variable %s is derived for project %s : %s"%(variable,project,rep)) return(rep) def derived_variable(variable,project): """ Returns the entry defining a derived variable in requested project or in wildcard project '*' """ if project in derived_variables and variable in derived_variables[project] : rep=derived_variables[project][variable] elif "*" in derived_variables and variable in derived_variables["*"] : rep=derived_variables['*'][variable] else : rep=None clogger.debug("Derived variable %s for project %s is %s"%(variable,project,rep)) return(rep) class Climaf_Operator_Error(Exception): def __init__(self, valeur): self.valeur = valeur clogger.error(self.__str__()) #clogging.dedent(100) def __str__(self): return `self.valeur` if __name__ == "__main__": def ceval(script_name,*args,**dic) : print script_name, " has been called with args=",args, " and dic=",dic print "Command would be:", cscript('test_script' ,'echo $*') test_script(arg1=1,arg2='two')