#!/usr/bin/env python3
# -*- coding: utf-8 -*-
""" CliMAF datasets location handling and data access module
Handles a database of attributes for describing organization and location of datasets
"""
# Created : S.Senesi - 2014
from __future__ import print_function, division, unicode_literals, absolute_import
import six
import os.path
import re
import glob
from string import Template
import env
from env.environment import *
from env.clogging import clogger
from climaf.utils import Climaf_Error
from climaf.period import init_period
from climaf.netcdfbasics import fileHasVar
from climaf.projects.optimize import cmip6_optimize_check_paths, cmip6_optimize_wildcards
from climaf.find_files import selectGenericFiles
[docs]class dataloc(object):
def __init__(self, project="*", organization='generic', url=None, model="*", simulation="*",
realm="*", table="*", frequency="*"):
"""
Create an entry in the data locations dictionary for an ensemble of datasets.
Args:
project (str,optional): project name
model (str,optional): model name
simulation (str,optional): simulation name
frequency (str,optional): frequency
organization (str): name of the organization type, among those handled by
:py:func:`~climaf.dataloc.selectFiles`
url (list of strings): list of URLS for the data root directories, local or remote
Each entry in the dictionary allows to store :
- a list of path or URLS (local or remote), which are root paths for
finding some sets of datafiles which share a file organization scheme.
- For remote data:
url is supposed to be in the format 'protocol:user@host:path', but
'protocol' and 'user' are optional. So, url can also be 'user@host:path'
or 'protocol:host:path' or 'host:path'. ftp is default protocol (and
the only one which is yet managed, AMOF).
If 'user' is given:
- if 'host' is in $HOME/.netrc file, CliMAF check if corresponding
'login == 'user'. If it is, CliMAF get associated
password; otherwise it will prompt the user for entering password;
- if 'host' is not present in $HOME/.netrc file, CliMAF will prompt
the user for entering password.
If 'user' is not given:
- if 'host' is in $HOME/.netrc file, CliMAF get corresponding 'login'
as 'user' and also get associated password;
- if 'host' is not present in $HOME/.netrc file, CliMAF prompt the
user for entering 'user' and 'password'.
Remark: The .netrc file contains login and password used by the
auto-login process. It generally resides in the user's home directory
($HOME/.netrc). So, it is highly recommended to supply this information
in .netrc file not to have to enter password in every request.
Warning: python netrc module does not handle multiple entries for a
single host. So, if netrc file has two entries for the same host, the
netrc module only returns the last entry.
We define two kinds of host: hosts with evolving files, e.g.
'beaufix'; and the others.
For any file returned by function :py:meth:`~climaf.classes.cdataset.listfiles`
which is found in cache:
- in case of hosts with dynamic files, the file is transferred only
if its date on server is more recent than that found in cache;
- for other hosts, the file found in cache is used
- the name for the corresponding data files organization scheme. The current set of known
schemes is :
- CMIP5_DRS : any datafile organized after the CMIP5 data reference syntax, such as on IPSL's Ciclad and
CNRM's Lustre
- EM : CNRM-CM post-processed outputs as organized using EM (please use a list of anyone string for arg urls)
- generic : a data organization described by the user, using patterns such as described for
:py:func:`~climaf.select_files.selectGenericFiles`. This is the default
Please ask the CliMAF dev team for implementing further organizations.
It is quite quick for data which are on the filesystem. Organizations
considered for future implementations are :
- NetCDF model outputs as available during an ECLIS or ligIGCM simulation
- ESGF
- the set of attribute values which simulation's data are
stored at that URLS and with that organization
For remote files, filename pattern must include ${varname}, which is instanciated
by variable name or filenameVar (given via :py:func:`~climaf.classes.calias()`),
for the sake of efficiency. Please complain if this is inadequate
For the sake of brievity, each attribute can have the '*'
wildcard value; when using the dictionary, the most specific
entries will be used (which means : the entry (or entries) with the lowest number of wildcards)
Example :
- Declaring that all IPSLCM-Z-HR data for project PRE_CMIP6 are stored under a single root path and folllows
organization named CMIP6_DRS::
>>> dataloc(project='PRE_CMIP6', model='IPSLCM-Z-HR', organization='CMIP6_DRS', url=['/prodigfs/esg/'])
- and declaring an exception for one simulation (here, both location and organization are supposed to be
different)::
>>> dataloc(project='PRE_CMIP6', model='IPSLCM-Z-HR', simulation='my_exp', organization='EM',
... url=['~/tmp/my_exp_data'])
- and declaring a project to access remote data (on multiple servers)::
>>> cproject('MY_REMOTE_DATA', ('frequency', 'monthly'), separator='|')
>>> dataloc(project='MY_REMOTE_DATA', organization='generic',
... url=['beaufix:/home/gmgec/mrgu/vignonl/*/${simulation}SFX${PERIOD}.nc',
... 'ftp:vignonl@hendrix:/home/vignonl/${model}/${variable}_1m_${PERIOD}_${model}.nc']),
>>> calias('MY_REMOTE_DATA','tas','tas',filenameVar='2T')
>>> tas = ds(project='MY_REMOTE_DATA', simulation='AMIPV6ALB2G', variable='tas', frequency='monthly',
... period='198101')
Please refer to the :ref:`example section <examples>` of the documentation for an example with each
organization scheme
"""
self.project = project
self.model = model
self.simulation = simulation
self.frequency = frequency
self.organization = organization
self.realm = realm
self.table = table
if organization not in ['EM', 'CMIP5_DRS', 'generic']:
raise Climaf_Error("Cannot process organization " + organization)
if isinstance(url, list):
self.urls = url
else:
if re.findall("^esgf://.*", url):
self.organization = "ESGF"
self.urls = [url]
self.urls = list(map(os.path.expanduser, self.urls))
alt = []
for u in self.urls:
# if u[0] != '$' : alt.append(os.path.abspath(u)) #lv
if u[0] != '$' and ':' not in u:
alt.append(os.path.abspath(u))
else:
alt.append(u)
# Change all datedeb-datend patterns to ${PERIOD} for upward compatibility
alt2 = []
for u in alt:
for pat in ["YYYYMMDDHHMM", "YYYYMMDDHH", "YYYYMMDD", "YYYYMM", "YYYY", "${period}"]:
u = u.replace(pat + "-" + pat, "${PERIOD}")
u = u.replace(pat + "_" + pat, "${PERIOD}")
u = u.replace(pat, "${PERIOD}")
alt2.append(u)
#
self.urls = alt2
# Register new dataloc only if not already registered
if not (any([loc == self for loc in locs])):
locs.append(self)
def derive(self, name):
"""
Create data location for a new project by copying the one existing for the current project.
:param name: name of the new project
:return: the data location for the new project
"""
return dataloc(project=name, organization=self.organization, url=self.urls, model=self.model,
simulation=self.simulation, realm=self.realm, table=self.table, frequency=self.frequency)
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.__dict__ == other.__dict__
else:
return False
def __ne__(self, other):
return not self.__eq__(other)
def __str__(self):
return self.model + self.project + self.simulation + self.frequency + self.realm + self.table + \
self.organization + repr(self.urls)
def pr(self):
print("For model " + self.model + " of project " + self.project +
" for simulation " + self.simulation + " and freq " + self.frequency +
" locations are : " + repr(self.urls) + " and org is :" + self.organization +
" and table is :" + self.table + " and realm is :" + self.realm)
def getlocs(project="*", model="*", simulation="*", frequency="*", realm="*", table="*"):
""" Returns the list of org,freq,url triples which may match the
list of given attributes values (allowing for wildcards '*') and which have
the lowest number of wildcards (*) in attributes
"""
rep = []
for loc in locs:
list_loc = [(loc.project, project), (loc.model, model), (loc.simulation, simulation),
(loc.frequency, frequency), (loc.realm, realm), (loc.table, table)]
if all([f[0] in ["*", f[1]] or f[1] == "*" for f in list_loc]):
stars = [f[0] == "*" or f[1] == "*" for f in list_loc].count(True)
rep.append((loc.organization, loc.frequency, loc.urls, stars))
# Must mimimize the number of '*' ? (allows wildcards in dir names, avoid too generic cases)
# When multiple answers with wildcards, return the ones with the lowest number
filtered = []
mini = 100
for org, freq, url, stars in rep:
if stars < mini:
mini = stars
for org, freq, url, stars in rep:
if stars == mini:
filtered.append((org, freq, url))
# Should we further filter ?
return filtered
def isLocal(project, model, simulation, frequency, realm="*", table="*"):
if project == 'file':
return True
ofu = getlocs(project=project, model=model, simulation=simulation,
frequency=frequency, realm=realm, table=table)
if len(ofu) == 0:
return False
rep = True
for org, freq, llocs in ofu:
for loc in llocs:
if re.findall(".*:.*", loc):
rep = False
return rep
[docs]def selectFiles(return_wildcards=None, merge_periods_on=None, return_combinations=None, with_periods=None,
use_frequency=False, **kwargs):
"""
Returns the shortest list of (local or remote) files which include
the data for the list of (facet,value) pairs provided
Method :
- use datalocations indexed by :py:func:`~climaf.dataloc.dataloc` to
identify data organization and data store urls for these (facet,value)
pairs
- check that data organization is as known one, i.e. is one of 'generic',
CMIP5_DRS' or 'EM'
- derive relevant filenames search function such as as :
py:func:`~climaf.dataloc.selectCmip5DrsFiles` from data
organization scheme
- pass urls and relevant facet values to this filenames search function
"""
rep = []
project = kwargs['project']
simulation = kwargs['simulation']
model = kwargs.get("model", "*")
frequency = kwargs.get("frequency", "*")
realm = kwargs.get("realm", "*")
table = kwargs.get("table", "*")
ofu = getlocs(project=project, model=model, simulation=simulation,
frequency=frequency, realm=realm, table=table)
clogger.debug("locs=" + repr(ofu))
if len(ofu) == 0:
clogger.warning("no datalocation found for %s %s %s %s %s %s " % (project, model, simulation, frequency, realm,
table))
for org, _, urls in ofu:
if org not in ['generic', ]:
clogger.warning("Organisation = %s will be deprecated quite soon." % org +
"Please refer to you CliMAF wizard for removing its use")
if return_wildcards is not None and len(return_wildcards) > 0 and org != "generic":
raise Climaf_Error(
"Can handle multiple facet query only for organization=generic ")
if return_combinations is not None and len(return_combinations) > 0 and org != "generic":
raise Climaf_Error(
"Can handle multiple facet query only for organization=generic ")
kwargs2 = kwargs.copy()
# Convert normalized frequency to project-specific frequency if applicable
if "frequency" in kwargs and project in frequencies:
normfreq = kwargs2['frequency']
if normfreq in frequencies[project]:
kwargs2['frequency'] = frequencies[project][normfreq]
# JS # Convert normalized realm to project-specific realm if applicable
if "realm" in kwargs and project in realms:
normrealm = kwargs2['realm']
if normrealm in realms[project]:
kwargs2['realm'] = realms[project][normrealm]
#
# Call organization-specific routine
if org in ["EM", ]:
rep.extend(selectEmFiles(**kwargs2))
elif org in ["CMIP5_DRS", ]:
rep.extend(selectCmip5DrsFiles(urls, **kwargs2))
elif org in ["generic", ]:
if project in ["CMIP6", ] and env.environment.optimize_cmip6_wildcards and \
cmip6_optimize_check_paths(urls):
kwargs_list = cmip6_optimize_wildcards(kwargs2)
if not with_periods and return_combinations is not None:
# Just return the list of dicts with facet values combinations
return_combinations.extend(kwargs_list)
rep.append('dummy')
else:
if return_combinations is None:
# Also for glob, but must get periods
clogger.warning("cdataset.explore doesn't anymore return choices with "
"optimized CMIP6 search. Use cdataset.glob() or set "
"env.environment.optimize_cmip6_wildcards to False")
for kwa in kwargs_list:
rep.extend(selectGenericFiles(urls, return_combinations=return_combinations,
use_frequency=use_frequency, **kwa))
else:
rep.extend(selectGenericFiles(urls, return_wildcards=return_wildcards,
return_combinations=return_combinations,
merge_periods_on=merge_periods_on, use_frequency=use_frequency,
**kwargs2))
else:
raise Climaf_Error("Cannot process organization " + org + " for simulation " + simulation + " and model " +
model + " of project " + project)
if not ofu:
return None
elif len(rep) == 0:
clogger.warning("no file found for %s, at these "
"data locations %s " % (repr(kwargs), repr(urls)))
clogger.warning("i.e. at " + str([url.replace("${PERIOD}", "$PERIOD").replace("$", "").format(**kwargs)
for url in urls]))
if env.environment.optimize_cmip6_wildcards:
clogger.warning("If you think this may be due to fresh data ingest, "
"you may wish to reset some of the tables used in "
"optimizing CMIP6 data search. See help(climaf.projects.optimize.clear_tables)")
return None
else:
# When returning strings (actually filenames), join them in a single string
if len(rep) > 0 and isinstance((rep[0]), six.string_types):
# Discard duplicates (assumes that sorting is harmless for later processing)
rep = sorted(list(set([f.strip() for f in rep])))
# Assemble filenames in one single string
rep = ' '.join(rep)
return rep
def selectEmFiles(**kwargs):
# Pour A et L : mon, day1, day2, 6hLev, 6hPlev, 3h
simulation = kwargs['simulation']
frequency = kwargs['frequency']
variable = kwargs['variable']
period = kwargs['period']
realm = kwargs['realm']
#
freqs = {"mon": "", "3h": "_3h"}
f = frequency
if f in freqs:
f = freqs[f]
rep = []
# Must look for all realms, here identified by a single letter
if realm == "*":
lrealm = ["A", "L", "O", "I"]
else:
lrealm = [realm]
for realm in lrealm:
clogger.debug("Looking for realm " + realm)
# Use EM data for finding data dir
freq_for_em = f
if realm == 'I':
freq_for_em = "" # This is a special case ...
command = ["grep", "^export EM_DIRECTORY_" + realm + freq_for_em + "=",
os.path.expanduser(os.getenv("EM_HOME")) + "/expe_" + simulation]
try:
ex = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except:
clogger.error("Issue getting archive_location for " + simulation + " for realm " + realm + " with: " +
repr(command))
break
if ex.wait() == 0:
dir = ex.stdout.read().split("=")[1].replace(
'"', "").replace("\n", "")
clogger.debug("Looking at dir " + dir)
if os.path.exists(dir):
lfiles = os.listdir(dir)
for fil in lfiles:
# clogger.debug("Looking at file "+fil)
fileperiod = periodOfEmFile(fil, realm, f)
if fileperiod and period.intersects(fileperiod):
if fileHasVar(dir + "/" + fil, variable):
rep.append(dir + "/" + fil)
# clogger.debug("Done with Looking at file "+fil)
else:
clogger.error("Directory %s does not exist for simulation %s, realm %s "
"and frequency %s" % (dir, simulation, realm, f))
else:
clogger.info("No archive location found for " + simulation + " for realm " + realm + " with: " +
repr(command))
return rep
def periodOfEmFile(filename, realm, freq):
"""
Return the period covered by a file handled by EM, based on filename
rules for EM. returns None if file frequency does not fit freq
"""
if realm == 'A' or realm == 'L':
if freq == 'mon' or freq == '':
year = re.sub(r'^.*([0-9]{4}).nc', r'\1', filename)
if year.isdigit():
speriod = "%s01-%s12" % (year, year)
return init_period(speriod)
else:
raise Climaf_Error(
"can yet handle only monthly frequency for realms A and L - TBD")
elif realm == 'O' or realm == 'I':
if freq == 'monthly' or freq == 'mon' or freq == '':
altfreq = 'm'
elif freq[0:2] == 'da':
altfreq = 'd'
else:
raise Climaf_Error(
"Can yet handle only monthly and daily frequency for realms O and I - TBD")
patt = r'^.*_1' + altfreq + r'_([0-9]{8})_*([0-9]{8}).*nc'
beg = re.sub(patt, r'\1', filename)
end = re.sub(patt, r'\2', filename)
# clogger.debug("beg=%s,end=%s,fn=%s"%(beg,end,filename))
if end == filename or beg == filename:
return None
return init_period("%s-%s" % (beg, end))
else:
raise Climaf_Error("unexpected realm " + realm)
def selectExampleFiles(urls, **kwargs):
rep = []
if kwargs['frequency'] == "monthly":
for url in urls:
for realm in ["A", "L"]:
# dir=l+"/"+realm+"/Origin/Monthly/"+simulation
dir = url + "/" + realm
clogger.debug("Looking at dir " + dir)
if os.path.exists(dir):
lfiles = os.listdir(dir)
for f in lfiles:
clogger.debug("Looking at file " + f)
fileperiod = periodOfEmFile(f, realm, 'mon')
if fileperiod and fileperiod.intersects(kwargs['period']):
if fileHasVar(dir + "/" + f, kwargs['variable']):
rep.append(dir + "/" + f)
# else: print "No var ",variable," in file", dir+"/"+f
return rep
def selectCmip5DrsFiles(urls, **kwargs):
# example for path : CMIP5/[output1/]CNRM-CERFACS/CNRM-CM5/1pctCO2/mon/atmos/
# Amon/r1i1p1/v20110701/clivi/clivi_Amon_CNRM-CM5_1pctCO2_r1i1p1_185001-189912.nc
#
# second path segment can be any string (allows for : output,output1, merge...),
# but if 'merge' exists, it is used alone
# This segment ca also be empty
#
# If version is 'last', tries provide version from directory 'last' if available,
# otherwise those of last dir
project = kwargs['project']
model = kwargs['model']
simulation = kwargs['simulation']
frequency = kwargs['frequency']
variable = kwargs['variable']
realm = kwargs['realm']
table = kwargs['table']
period = kwargs['period']
experiment = kwargs['experiment']
version = kwargs['version']
#
rep = []
frequency2drs = dict({'monthly': 'mon'})
freqd = frequency
if frequency in frequency2drs:
freqd = frequency2drs[frequency]
# TBD : analyze ambiguity of variable among realms+tables
for url in urls:
totry = ['merge/', 'output/', 'output?/', 'main/', '']
for p in totry:
pattern1 = url + "/" + project + "/" + p + \
"*/" + model # one * for modelling center
joker_version = "*"
patternv = os.sep.join(
[pattern1, experiment, freqd, realm, table, simulation, joker_version, variable])
if len(glob.glob(patternv)) > 0:
break
patternv = os.sep.join(
[pattern1, experiment, freqd, realm, table, simulation])
# Get version directories list
ldirs = glob.glob(patternv)
clogger.debug("Globbing with " + patternv + " gives:" + repr(ldirs))
for repert in ldirs:
lversions = os.listdir(repert)
lversions.sort()
# print "lversions="+`lversions`+ "while version="+version
cversion = version # initial guess of the version to use
if version == "last":
if len(lversions) == 1:
cversion = lversions[0]
elif len(lversions) > 1:
if "last" in lversions:
cversion = "last"
else:
# Assume that order provided by sort() is OK
cversion = lversions[-1]
# print "using version "+cversion+" for requested version: "+version
lfiles = glob.glob(os.sep.join(
[repert, cversion, variable, "*.nc"]))
# print "listing "+repert+"/"+cversion+"/"+variable+"/*.nc"
# print 'lfiles='+`lfiles`
for f in lfiles:
if freqd != 'fx':
# clogger.debug("checking period for "+ f)
if freqd == 'day':
regex = r'^.*([0-9]{8}-[0-9]{8}).nc$'
elif freqd == 'mon':
# regex=r'^.*([0-9]{4}[0-9]{2}-[0-9]{4}[0-9]{2}).nc$'
regex = r'^.*([0-9]{6}-[0-9]{6}).nc$'
elif freqd == 'yr':
regex = r'^.*([0-9]{4}-[0-9]{4}).nc$'
fileperiod = init_period(re.sub(regex, r'\1', f))
if fileperiod and period.intersects(fileperiod):
rep.append(f)
else:
clogger.debug("adding fixed field " + f)
rep.append(f)
return rep
def remote_to_local_filename(url):
"""
url: an url of remote data
Return local filename of remote file
"""
from env.environment import default_remote_cache
if len(url.split(":")) == 3:
k = 1
else:
k = 0
if re.findall("@", url.split(":")[k]):
hostname = url.split(":")[k].split("@")[-1]
else:
hostname = url.split(":")[k]
local_filename = os.path.expanduser(
default_remote_cache) + '/' + hostname + os.path.abspath(url.split(":")[-1])
return local_filename
def test2():
return
if __name__ == "__main__":
test2()