#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CliMAF macros module :
- define macros from CliMAF objects,
- find macro patterns in CRS expressions,
- rewrite CRS expressions,
- read/write macro set
"""
from __future__ import print_function, division, unicode_literals, absolute_import
import sys
import os
import types
import six
from env.clogging import clogger, dedent
from env.environment import *
from climaf.classes import cobject, cdataset, ctree, scriptChild, cpage, allow_error_on_ds, cens, cdummy
[docs]def macro(name, cobj, lobjects=[]):
"""
Define a CliMAF macro from a CliMAF compound object.
Transform a Climaf object in a macro, replacing all datasets,
and the objects of lobjects, by a dummy argument. Register it in
dict cmacros, if name is not None
Args:
name (string) : the name you want to give to the macro; a Python
function with the same name will be defined
cobj (CliMAF object, or string) : any CliMAF object, usually
the result of a series of operators, that you would like to
repeat using other input datasets; alternatively, you can provide
the macro formula as a string (when accustomed to the syntax)
lobjects (list, optional): for expert use- a list of objects,
which are sub-objects of cobject, and which should become arguments
of the macro
Returns:
a macro; the returned value is usualy not used 'as is' : a
python function is also defined in module cmacros and in main
namespace, and you may use it in the same way as a CliMAF
operator. All the datasets involved in ``cobj`` become arguments
of the macro, which allows you to re-do the same computations
and easily define objects similar to ``cobjs``
Example::
>>> # First use and combine CliMAF operators to get some interesting result using some dataset(s)
>>> january_ta=ds(project='example',simulation='AMIPV6ALB2G',variable='ta',frequency='monthly',period='198001')
>>> ta_europe=llbox(january_ta,latmin=40,latmax=60,lonmin=-15,lonmax=25)
>>> ta_ezm=ccdo(ta_europe,operator='zonmean')
>>> fig_ezm=plot(ta_ezm)
>>> #
>>> # Using this result as an example, define a macro named 'eu_cross_section',
>>> # which arguments will be the datasets involved in this result
>>> cmacro('eu_cross_section',fig_ezm)
>>> #
>>> # You can of course apply a macro to another dataset(s) (even here to a 2D variable)
>>> pr=ds(project='example',simulation='AMIPV6ALB2G', variable='pr', frequency='monthly', period='198001')
>>> pr_ezm=eu_cross_section(pr)
>>> #
>>> # All macros are registered in dictionary climaf.cmacro.cmacros,
>>> # which is imported by climaf.api; you can list it by :
>>> cmacros
Note : macros are automatically saved in file ~/.climaf.macros, and can be edited
See also much more explanations in the example at :download:`macro.py <../examples/macro.py>`
"""
if isinstance(cobj, six.string_types):
s = cobj
# Next line used for interpreting macros's CRS
exec("from climaf.cmacro import cdummy; ARG=cdummy()",
sys.modules['__main__'].__dict__)
try:
cobj = eval(cobj, sys.modules['__main__'].__dict__)
except:
# usually case of a CRS which project is not currently defined
clogger.error(
"Cannot interpret %s with the projects currently define" % s)
return None
# print "string %s was interpreted as %s"%(s,cobj)
domatch = False
for o in lobjects:
domatch = domatch or cobj == o or \
(isinstance(cobj, cobject) and cobj.buildcrs() == o.buildcrs())
if isinstance(cobj, cdataset) or isinstance(cobj, cdummy) or domatch:
return cdummy()
elif isinstance(cobj, ctree):
rep = ctree(cobj.operator, cobj.script,
*list(map(macro, [None] *
len(cobj.operands), cobj.operands)),
**cobj.parameters)
elif isinstance(cobj, scriptChild):
rep = scriptChild(macro(None, cobj.father), cobj.varname)
elif isinstance(cobj, cpage):
rep = cpage([list(map(macro, [None] * len(line), line)) for line in cobj.fig_lines],
cobj.widths, cobj.heights)
elif isinstance(cobj, cens):
d = dict()
for k, v in zip(list(cobj), map(macro, [None] * len(cobj.values()), cobj.values())):
d[k] = v
rep = cens(d)
elif cobj is None:
return None
else:
clogger.error("Cannot yet handle object :%s", repr(cobj))
rep = None
if name and rep:
cmacros[name] = rep
doc = "A CliMAF macro, which text is " + repr(rep)
defs = 'def %s(*args) :\n """%s"""\n return instantiate(cmacros["%s"],[ x for x in args])\n' \
% (name, doc, name)
exec(defs, globals())
exec("from climaf.cmacro import %s" %
name, sys.modules['__main__'].__dict__)
clogger.debug("Macro %s has been declared" % name)
return rep
def define_new_macro(name, doc):
# Define the new macro code
codestring = compile('def %s(*args) :\n """%s"""\n return instantiate(cmacros["%s"], args)\n'
% (name, doc, name), name, "exec")
code = types.CodeType(codestring.co_argcount, codestring.co_kwonlyargcount, codestring.co_nlocals,
codestring.co_stacksize, codestring.co_flags, codestring, codestring.co_consts,
codestring.co_names, codestring.co_varnames, codestring.co_filename, codestring.co_name,
codestring.co_firstlineno, codestring.co_lnotab, codestring.co_freevars,
codestring.co_cellvars)
rep = types.FunctionType(code=code, globals=globals(), name=name)
return rep
def crewrite(crs, alsoAtTop=True):
"""
Return the crs expression with sub-trees replaced by macro equivalent
when applicable
Search order is : from CRS tree root try all macros, then do the
same for first subtree, and recursively in depth, and then go
to second subtreesecond
"""
# Next line used for interpreting macros's CRS
exec("ARG=climaf.cmacro.cdummy()", sys.modules['__main__'].__dict__)
#
allow_error_on_ds()
try:
co = eval(crs, sys.modules['__main__'].__dict__)
except:
clogger.debug("Issue when rewriting %s" % crs)
return crs
allow_error_on_ds(False)
rep = None
if isinstance(co, ctree) or isinstance(co, scriptChild) or isinstance(co, cpage):
if alsoAtTop:
for m in cmacros:
clogger.debug("looking at macro : " + m + "=" + repr(cmacros[m]) + " \ncompared to : " +
repr(macro(None, co)))
argl = cmatch(cmacros[m], co)
if len(argl) > 0:
rep = m + \
"({})".format(
",".join([crewrite(arg.buildcrs(crsrewrite=crewrite)) for arg in argl]))
# No macro matches at top level, or top level not wished.
# Let us dig a bit
if rep is None:
rep = co.buildcrs(crsrewrite=crewrite)
else:
rep = crs
return rep
def cmatch(macro, cobj):
"""
Analyze if macro does match cobj, and return the list of objects
matching macro arguments, ordered by depth-first traversal
"""
clogger.debug("matching " + repr(macro) + " and " + repr(cobj))
if isinstance(cobj, ctree) and isinstance(macro, ctree) and macro.operator == cobj.operator:
nok = False
for mpara, para in zip(sorted(list(macro.parameters)), sorted(list(cobj.parameters))):
if mpara != para or macro.parameters[para] != cobj.parameters[para]:
nok = True
if nok:
return []
argsub = []
for mop, op in zip(macro.operands, cobj.operands):
if isinstance(mop, cdummy):
argsub.append(op)
else:
argsub += cmatch(mop, op)
return argsub
elif isinstance(cobj, scriptChild) and isinstance(macro, scriptChild) and macro.varname == cobj.varname:
return cmatch(macro.father, cobj.father) # , argslist)
elif isinstance(cobj, cpage) and isinstance(macro, cpage):
argsub = []
if cobj.heights == macro.heights and cobj.widths == macro.widths and cobj.orientation == macro.orientation:
for mlines, lines in zip(macro.fig_lines, cobj.fig_lines):
for mfig, fig in zip(mlines, lines):
if isinstance(mfig, cdummy):
argsub.append(fig)
else:
argsub += cmatch(mfig, fig)
return argsub
else:
return []
def read(filename):
"""
Read macro dictionary from filename, and add it to cmacros[]
"""
import json
macros_texts = None
try:
macrofile = open(os.path.expanduser(filename), "r")
clogger.debug("Macrofile %s read" % macrofile)
macros_texts = json.load(macrofile)
clogger.debug("After reading file %s, macros=%s" %
(macrofile, repr(macros_texts)))
macrofile.close()
except:
clogger.info("Issue reading macro file %s ", filename)
if macros_texts:
for m in macros_texts:
clogger.debug("loading macro %s=%s" % (m, macros_texts[m]))
macro(str(m), str(macros_texts[m]))
def write(filename):
"""
Writes macros dictionary to disk ; should be called before exit
"""
import json
filen = os.path.expanduser(filename)
try:
os.remove(filen)
except:
pass
macrofile = open(filen, "w")
dcrs = cmacros.copy()
for m in dcrs:
dcrs[m] = dcrs[m].buildcrs()
json.dump(dcrs, macrofile, sort_keys=True, indent=4)
macrofile.close()
def show(interp=True):
"""
List the macros, searching also for macro usage in macros (except if arg ``interp`` is True)
"""
for m in cmacros:
if interp:
print("% 15s : %s" %
(m, crewrite(cmacros[m].buildcrs(), alsoAtTop=False)))
else:
print("% 15s : %s" % (m, cmacros[m]))
def instantiate(mac, operands, toplevel=True):
"""
Return a copy of macro cobject ``mac`` where arguments are instantiated by the list
`operands', used in the order of depth-first tree traversal of ``mac``.
Check that the number of operands is OK vs mac
"""
if isinstance(mac, cdummy):
if len(operands) > 0:
rep = operands.pop(0)
else:
raise Climaf_Macro_Error('no operand left')
elif isinstance(mac, ctree):
opers = []
for o in mac.operands:
opers.append(instantiate(o, operands, toplevel=False))
rep = ctree(mac.operator, mac.script, *opers, **mac.parameters)
elif isinstance(mac, scriptChild):
father = instantiate(mac.father, operands, toplevel=False)
rep = scriptChild(father, mac.variable)
elif isinstance(mac, cdataset):
rep = cdataset
if toplevel and len(operands) != 0:
raise Climaf_Macro_Error(
'too many operands; left operands are : ' + repr(operands))
return rep
class Climaf_Macro_Error(Exception):
def __init__(self, valeur):
self.valeur = valeur
clogger.error(self.__str__())
dedent(100)
def __str__(self):
return repr(self.valeur)