Source code for climaf.operators

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""

CliMAF handling of external scripts and binaries , and of internal operators (Python funcs)

"""

# Created : S.Senesi - 2014

from __future__ import print_function, division, unicode_literals, absolute_import

import re
import subprocess


from env.clogging import clogger
from env.environment import *
from env.utils import get_subprocess_output
from climaf.operators_scripts import scriptFlags
from climaf.utils import Climaf_Operator_Error
from climaf.driver import capply

# Next definition can be splitted in a set managed by an administrator, and
# other sets managed and fed by users. But it should be enforced that no redefinition
# occurs for some really basic operators (should it ?)
# TODO : Find out whether this variable is used or not
internals = []


[docs]class cscript(object): def __init__(self, name, command, format="nc", select=True, canOpendap=False, commuteWithTimeConcatenation=False, commuteWithSpaceConcatenation=False, canSelectVar=False, doCatTime=False, fatal=False, **kwargs): """ Declare a script or binary as a 'CliMAF operator', and define a Python function with the same name Args: name (str): name for the CliMAF operator. command (str): script calling sequence, according to the syntax described below. format (str): script outputs format -- either 'nc', 'png', 'pdf', 'eps', 'None' or 'graph' ('graph' allows to the user to choose three different graphic output formats: 'png', 'pdf' or 'eps') or 'txt' (the text output are not managed by CliMAF, but only displayed - 'txt' allows to use e.g. 'ncdump -h' from inside CliMAF); defaults to 'nc' select (bool, optional): should data selection/transformation be automatically done by CliMAF when applying this script directly to some dataset(s) (i.e. selection on variable, time, domain, aliasing ... according to the definition(s) of input dataset()s). Defaults to True canOpendap (bool, optional): is the script able to use OpenDAP URIs ? default to False commuteWithTimeConcatenation (bool, optional): can the operation commute with concatenation of time periods ? set it to true, if the operator can be applied on time chunks separately, in order to allow for incremental computation / time chunking; defaults to False commuteWithSpaceConcatenation (bool, optional): can the operation commute with concatenation of space domains ? defaults to False (see commuteWithTimeConcatenation) doCatTime (bool, optional): does this script concatenate data over time. Defaults to False. See example in $CLIMAF/doc/operators_which_concatenate_over_time.html fatal (bool, optional): if False and the executable is not available, do not crash but print a warning **kwargs : possible keyword arguments, with keys matching '<outname>_var', for providing a format string allowing to compute the variable name for output 'outname' (see below). Returns: None The script calling sequence pattern string (arg 'command') indicates how to build the system call which actually launches the script, with a match between python objects and formal arguments; For introducing the syntax, please consider this example, with the following commands:: >>> cscript('mycdo','cdo ${operator} ${in} ${out}') >>> # define some dataset >>> tas_ds = ds(project='example', simulation='AMIPV6ALB2G', variable='tas', period='1980-1981') >>> # Apply operator 'mycdo' to dataset 'tas_ds', choosing a given 'operator' argument >>> tas_avg = mycdo(tas_ds,operator='timavg') CliMAF will later on launch this call behind the curtain:: $ cdo tim_avg /home/my/tmp/climaf_cache/8a/5.nc /home/my/tmp/climaf_cache/4e/4.nc where : - the last filename is generated by CliMAF from the formal expression describing 'tas_avg', and will receive the result - the first filename provides a file generated by CliMAF which includes the data required for tas_ds There are a number of examples declared in module :download:`standard_operators <../climaf/standard_operators.py>`. **Detailed syntax**: - formal arguments appear as : ``${argument}`` (in the example : ``${in}``, ``${out}``, ``${operator}`` ) - except for reserved keywords, arguments in the pattern will be replaced by the values for corresponding keywords used when invoking the diagnostic operator: - in the example above : argument ``operator`` is replaced by value ``timavg``, which is a keyword known to the external binary called, CDO - reserved argument keywords are : - **in, in_<digit>, ins, ins_<digit>, mmin** : they will be replaced by CliMAF managed filenames for input data, as deduced from dataset description or upstream computation; these filenames can actually be remote URLs (if the script can use OpenDAP, see args), local 'raw' data files, or CliMAF cache filenames - **in** stands for the URL of the first dataset invoked in the operator call - **in_<digit>** stands for the next ones, in the same order - **ins** and **ins_<digit>** stand for the case where the script can select input from multiple input files or URLs (e.g. when the whole period to process spans over multiple files); in that case, a single string (surrounded with double quotes) will carry multiple URLs - **mmin** stands for the case where the script accepts as argument an ensemble of datasets. CliMAF will replace the keyword by a string composed of the corresponding input filenames (not surrounded by quotes - please add them yourself in declaration); see also ``labels`` below - **var, var_<digit>** : when a script can select a variable in a multi-variable input stream, this is declared by adding this keyword in the calling sequence; CliMAF will replace it by the actual variable name to process, but only if it has not already filtered data for that variable; 'var' stands for first input stream, 'var_<digit>' for the next ones; - in the example above, we assume that external binary CDO is not tasked with selecting the variable, and that CliMAF must feed CDO with a datafile where it has already performed the selection - if the script MUST receive the name of the variable in all circumstances, use keyword **Var** - **period, period_<digit>** : when a script can select a time period in the content of a file or stream, it should declare it by putting this keyword in the pattern, which will be replaced at call time by the period written as <date1>-<date2>, where date is formated as YYYYMMDD ; - time intervals must be interpreted as [date1, date2[ - 'period' stands for the first input_stream, - 'period_<n>' for the next ones, in the order of actual call; - in the example above, this keyword is not used, which means that CliMAF has to select the period upstream of feeding CDO with the data - **period_iso, period_iso_<digit>** : as for **period** above, except that the date formating fits CDO conventions : - date format is ISO : YYYY-MM-DDTHH:MM:SS - interval is [date1,date2_iso], where date2_iso is 1 minute before date2 - separator between dates is : , - **domain, domain_<digit>** : when a script can select a domain in the input grid, this is declared by adding this keyword in the calling sequence; CliMAF will replace it by the domain definition if needed, as 'latmin,latmax,lonmin,lonmax' ; 'domain' stands for first input stream, 'domain_<digit>' for the next ones : - in the example above, we assume that external binary CDO is not tasked with selecting the domain, and that CliMAF must feed CDO with a datafile where it has already performed the selection - **out, out_<word>** : CliMAF provide file names for output files (if there is no such field, the script will have only 'side effects', e.g. launch a viewer). Main output file must be created by the script with the name provided at the location of argument ${out}. Using arguments like 'out_<word>' tells CliMAF that the script provide some secondary output, which will be symbolically known in CliMAF syntax as an attribute of the main object; by default, the variable name of each output equals the name of the output (except for the main ouput, which variable name is supposed to be the same as for the first input); for other cases, see argument \*\*kwargs to provide a format string, used to derive the variable name from first input variable name as in e.g. : ``output2_var='std_dev(%s)'`` for the output labelled output2 (i.e. declared as '${out_output2}') or ``_var='std_dev(%s)'`` for the default (main) output - in the example above, we just apply the convention used by CDO, which expects that you provide an output filename as last argument on the command line. See example mean_and_sdev in doc for advanced usage. - **crs** : will be replaced by the CliMAF Reference Syntax expression describing the first input stream; can be useful for plot title or legend - **alias** : used if the script can make an on the fly re-scaling and renaming of a variable. Will be replaced by a string which pattern is : 'new_varname,file_varname,scale,offset'. The script should then transform on reading as new_varname = file_varname * scale + offset - **units, units_<digit>** : means that the script can set the units on-the-fly while reading one of the input streams - **missing** : means that the script can make an on-the-fly transformation of a given constant to missing values - **labels** : for script accepting ensembles, CliMAF will replace this keyword by a string bearing the labels associated with the ensemble, with delimiter $ as e.g. in: "CNRM-CM5 is fine$IPSL-CM5-LR is not bad$CCSM-29 is ..." """ # Check that script name do not clash with an existing symbol if name in sys.modules['__main__'].__dict__ and name not in cscripts: clogger.error("trying to define %s as an operator, " "while it exists as smthing else" % name) return None else: if name in cscripts: clogger.warning("Redefining CliMAF script %s" % name) # # Check now that script is executable scriptcommand = command.split(' ')[0].replace("(", "") try: executable = get_subprocess_output("which {}".format(scriptcommand), to_replace=[("\n", "")]) clogger.debug("Found %s" % executable) # # Analyze inputs field keywords and populate dict # attribute 'inputs' with some properties (key is rank) self.inputs = dict() commuteWithEnsemble = True it = re.finditer( r"\${(?P<keyw>(?P<mult>mm)?in(?P<serie>s)?(_(?P<n>([\d]+)))?)}", command) for oc in it: if oc.group("n") is not None: rank = int(oc.group("n")) else: rank = 0 if rank in self.inputs: clogger.warning("When defining %s : duplicate declaration for input #%d" % (name, rank)) serie = (oc.group("serie") is not None) multiple = (oc.group("mult") is not None) if multiple: if serie: raise Climaf_Operator_Error( "Operand %s cannot both accept" "members and files set" % oc.group("keyw")) commuteWithEnsemble = False self.inputs[rank] = (oc.group("keyw"), multiple, serie) if len(self.inputs) == 0: raise Climaf_Operator_Error( "When defining %s : command %s must include at least one of " "${in} ${ins} ${mmin} or ${in_..} ... for specifying how CliMAF" " will provide the input filename(s)" % (name, command)) # print self.inputs for i in range(len(self.inputs)): if i + 1 not in self.inputs and not (i == 0 and 0 in self.inputs): raise Climaf_Operator_Error( "When defining %s : error in input sequence for rank %d" % (name, i + 1)) # # Check if command includes an argument allowing for # providing an output filename if command.find("${out") < 0: if format not in ["txt", ]: format = None # # Search in call arguments for keywords matching "<output_name>_var" # which may provide format string for 'computing' outputs variable # name from input variable name outvarnames = dict() pattern = r"^(.*)_var$" for p in kwargs: if re.match(pattern, p): outvarnames[re.findall(pattern, p)[0]] = kwargs[p] clogger.debug("outvarnames for script %s = %s" % (name, repr(outvarnames))) # # Analyze outputs names , associated variable names # (or format strings), and store it in attribute dict 'outputs' self.outputs = dict() it = re.finditer(r"\${out(_(?P<outname>[\w-]*))?}", command) for occ in it: outname = occ.group("outname") if outname is not None: if outname in outvarnames: self.outputs[outname] = outvarnames[outname] else: self.outputs[outname] = "%s" # outname else: self.outputs[None] = outvarnames.get('', "%s") self.outputs[''] = outvarnames.get('', "%s") # clogger.debug("outputs = "+`self.outputs`) # canSelectVar = canSelectVar or (command.find("${var}") > 0) or (command.find("${Var}") > 0) canAggregateTime = (command.find("${ins}") > 0 or command.find("${ins_1}") > 0) canAlias = (command.find("${alias}") > 0) canMissing = (command.find("${missing}") > 0) canSelectTime = False if command.find("${period}") > 0 or command.find("${period_1}") > 0: canSelectTime = True if command.find("${period_iso}") > 0 or command.find("${period_iso_1}") > 0: canSelectTime = True canSelectDomain = (command.find("${domain}") > 0 or command.find("${domain_1}") > 0) # self.name = name self.command = command self.fixedfields = None if select: self.flags = scriptFlags(canOpendap, canSelectVar, canSelectTime, canSelectDomain, canAggregateTime, canAlias, canMissing, commuteWithEnsemble, commuteWithTimeConcatenation, commuteWithSpaceConcatenation, doCatTime) else: self.flags = scriptFlags(True, True, True, True, True, True, True, commuteWithEnsemble, commuteWithTimeConcatenation, commuteWithSpaceConcatenation, doCatTime) if format in known_formats or format in graphic_formats or format in none_formats: self.outputFormat = format else: raise Climaf_Operator_Error("Allowed formats yet are : 'object', 'nc', 'txt', %s" % ', '.join([repr(x) for x in graphic_formats])) cscripts[name] = self # Init doc string for the operator doc = "CliMAF wrapper for command : %s" % self.command # try to get a better doc string from colocated doc/directory docfilename = os.path.dirname(__file__) + "/../doc/scripts/" + name + ".rst" # print "docfilen= "+docfilename try: with open(docfilename) as docfile: doc = docfile.read().decode(encoding="utf-8") except: pass # # creates a function named as requested, which will invoke # capply with that name and same arguments defs = 'def %s(*args,**dic) :\n """%s"""\n return capply("%s",*args,**dic)\n' \ % (name, doc, name) exec(defs, globals()) # exec("from climaf.operators import %s" % name, sys.modules['__main__'].__dict__) clogger.debug("CliMAF script %s has been declared" % name) except subprocess.CalledProcessError: if fatal: raise Climaf_Operator_Error("defining %s : command %s is not executable" % (name, scriptcommand)) else: clogger.warning("defining %s : command %s is not executable" % (name, scriptcommand)) return None def __repr__(self): return "CliMAF operator : " + self.name def inputs_number(self): """ returns the number of distinct arguments of a script which are inputs """ args = re.findall(r"\$\{(mm)?ins?(_\d*)?\}", self.command) ls = sorted(list(set(args))) return len(ls)
[docs]def fixed_fields(operator, *paths): """ Declare that an operator (or a list of) needs fixed fields. CliMAF will provide them to the operator at execution time through symbolic links. This is 'set' type of operation, not an 'add' one : only the last call is considered (it reset the list of fields) Parameters: operator (string, or list of strings) : name of the CliMAF operator. paths (couples) : a number of couples composed of the filename as expected by the operator and a path for the data; the path may uses placeholders : ${model}, ${project}, ${simulation}, ${realm} and ${grid}, which will be replaced by the corresponding facet values for the first operand of the target operator. Returns: None Example: >>> fixed_fields('ccdftransport', ... ('mesh_hgr.nc','/data/climaf/${project}/${model}/ORCA1_mesh_hgr.nc'), ... ('mesh_zgr.nc','/data/climaf/${project}/${model}/ORCA1_mesh_zgr.nc')) >>> fixed_fields('plot', ... ('coordinates.nc','/cnrm/ioga/Users/chevallier/chevalli/Partage/NEMO/eORCA_R025_coordinates_v1.0.nc')) """ if not isinstance(operator, list): namelist = [operator] else: namelist = operator for name_op in namelist: cscripts[name_op].fixedfields = paths
class coperator(object): def __init__(self, op, command, canOpendap=False, canSelectVar=False, canSelectTime=False, canSelectDomain=False, canAggregateTime=False, canAlias=False, canMissing=False, commuteWithEnsemble=False): clogger.error("Not yet developped") if __name__ == "__main__": def ceval(script_name, *args, **dic): print(script_name, " has been called with args=", args, " and dic=", dic) print("Command would be:", end=" ") cscript('test_script', 'echo $*') test_script(arg1=1, arg2='two')