Source code for climaf.period

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""  Basic types and syntax for managing time periods in CLIMAF

"""

# S.Senesi 08/2014 : created

from __future__ import print_function, division, unicode_literals, absolute_import

import re
import datetime
import six
import copy

from climaf.utils import Climaf_Error
from env.clogging import clogger, dedent
from env.environment import *


class cperiod(object):
    """
    A class for handling a pair of datetime objects defining a period.

    Period is defined as [ date1, date2 ]. Resolution for date2 is 1 minute
    Attribute 'pattern' usually provides a more condensed form

    """

    def __init__(self, start, end=None, pattern=None):
        self.fx = False
        if isinstance(start, six.string_types) and start == 'fx':
            self.fx = True
            self.pattern = 'fx'
        else:
            if not isinstance(start, datetime.datetime) or not isinstance(end, datetime.datetime):
                try:
                    # Assuming start and end use Netcdf advanced calendars (e.g. noleap or 360-day)
                    # and trying to carry on anyway with dumb python datetime package
                    start = start._to_real_datetime()
                    end = end._to_real_datetime()
                except:
                    raise Climaf_Period_Error("issue with start or end, %s : %s,\n%s : %s" %
                                              (type(start), str(start), type(end), str(end)))
            if start > end:
                raise Climaf_Period_Error("Period's start (%s) must be before period's end (%s)" %
                                          (repr(start), repr(end)))
            self.start = start
            self.end = end
            if pattern is None:
                self.pattern = self.__repr__()
            else:
                self.pattern = pattern

    #
    def __eq__(self, other):
        test = not other == "*" and isinstance(other, cperiod)
        if test and self.fx != other.fx:
            test = False
        if test and self.pattern != other.pattern:
            test = False
        start_self = getattr(self, "start", None)
        start_other = getattr(other, "start", None)
        if test and start_self != start_other:
            test = False
        end_self = getattr(self, "end", None)
        end_other = getattr(other, "end", None)
        if test and end_self != end_other:
            test = False
        return test

    def __le__(self, other):
        if self.start != other.start:
            return self.start <= other.start
        else:
            return self.end <= other.end

    def __lt__(self, other):
        if self.start != other.start:
            return self.start < other.start
        else:
            return self.end < other.end

    def __ge__(self, other):
        if self.start != other.start:
            return self.start >= other.start
        else:
            return self.end >= other.end

    def __gt__(self, other):
        if self.start != other.start:
            return self.start > other.start
        else:
            return self.end > other.end

    #
    def __hash__(self):
        return hash((self.fx, self.pattern, getattr(self, "start", None), getattr(self, "end", None)))

    #
    def __repr__(self):
        return self.pr()
        # return("%04d%02d%02d%02d%02d-%04d%02d%02d%02d%02d"%(\
        #      self.start.year,self.start.month,self.start.day,self.start.hour,self.start.minute,
        #      self.end.year,self.end.month,self.end.day,self.end.hour,self.end.minute))

    #
    def iso(self):
        """ Return isoformat(start)-isoformat(end), (with inclusive end, and 1 minute accuracy)
        e.g. : 1980-01-01T00:00:00,1980-12-31T23:59:00
        """
        if self.fx:
            raise Climaf_Period_Error("There is no ISO representation for period 'fx'")
        endproxy = self.end - datetime.timedelta(0, 60)  # substract 1 minute
        return "%s,%s" % (self.start.isoformat(), endproxy.isoformat())

    #
    def pr(self):
        if self.fx:
            return 'fx'
        if self.start.minute != 0 or self.start.minute != 0:
            return ("%04d%02d%02d%02d%02d-%04d%02d%02d%02d%02d" % (self.start.year, self.start.month, self.start.day,
                                                                   self.start.hour, self.start.minute, self.end.year,
                                                                   self.end.month, self.end.day, self.end.hour,
                                                                   self.end.minute))
        elif self.start.hour != 0 or self.end.hour != 0:
            return ("%04d%02d%02d%02d-%04d%02d%02d%02d" % (self.start.year, self.start.month, self.start.day,
                                                           self.start.hour, self.end.year, self.end.month, self.end.day,
                                                           self.end.hour))
        elif self.start.day != 1 or self.end.day != 1:
            if self.end.day != 1:
                d = self.end.day - 1
                m = self.end.month
                y = self.end.year
            else:
                end = self.end - datetime.timedelta(1)
                y = end.year
                m = end.month
                d = end.day
            if (self.start.year, self.start.month, self.start.day) == (y, m, d):
                return "%04d%02d%02d" % (y, m, d)
            else:
                return "%04d%02d%02d-%04d%02d%02d" % (self.start.year, self.start.month, self.start.day, y, m, d)
        elif self.start.month != 1 or self.end.month != 1:
            if self.end.month != 1:
                m = self.end.month - 1
                y = self.end.year
            else:
                m = 12
                y = self.end.year - 1
            if self.start.year == y and self.start.month == m:
                return "%04d%02d" % (self.start.year, self.start.month)
            else:
                return "%04d%02d-%04d%02d" % (self.start.year, self.start.month, y, m)
        else:
            if self.start.year != self.end.year - 1:
                return "%04d-%04d" % (self.start.year, self.end.year - 1)
            else:
                return "%04d" % self.start.year

    #
    def hasFullYear(self, year):
        if self.fx:
            raise Climaf_Period_Error("Meaningless for period 'fx'")
        else:
            year = int(year)
            return self.start <= datetime.datetime(year=year, month=1, day=1) and \
                datetime.datetime(year=year + 1, month=1, day=1) <= self.end

    #
    def start_with(self, begin):
        """ If period BEGIN actually begins period SELF, returns the
        complement of BEGIN in SELF; otherwise returns None """
        if self.fx:
            return False
        if self.start == begin.start and self.end >= begin.end:
            return cperiod(begin.end, self.end)

    #
    def is_before(self, candidate):
        """ True if period SELF starts before period CANDIDATE
        """
        if self.fx:
            return False
        return self.start <= candidate.start

    #
    def includes(self, included):
        """ if period self does include period 'included', returns a pair of
        periods which represents the difference """
        if self.fx:
            return False
        # raise Climaf_Period_Error("Meaningless for period 'fx'")
        if self.start <= included.start and included.end <= self.end:
            return cperiod(self.start, included.start), cperiod(included.end, self.end)

    #
    def intersects(self, other):
        """
        Returns the intersection of period self and period 'other' if any
        """
        if other:
            if self.fx and other.fx:
                clogger.warning("Meaningless for period 'fx'")
                return cperiod("fx")
            elif self.fx:
                return cperiod(other.start, other.end)
            elif other.fx:
                return cperiod(self.start, self.start)
            else:
                start = self.start
                if other.start > start:
                    start = other.start
                end = self.end
                if other.end < end:
                    end = other.end
                if start < end:
                    return cperiod(start, end)
        else:
            if self.fx:
                return cperiod("fx")
            else:
                return cperiod(self.start, self.end)


[docs]def init_period(dates): """ Init a CliMAF 'period' object Args: dates (str): must match r'YYYY[MM[DD[HH[MM]]]][(-\|_)YYYY[MM[DD[HH[MM]]]]]' , or be 'fx' for fixed fields Returns: the corresponding CliMAF 'period' object When using only YYYY, can omit some Ys (for zeros). Cannot handle year 0000 Examples : - a one-year long period : '1980', or '1980-1980' - a decade : '1980-1989' - first millenium : 1-1000 # Must have leading zeroes if you want to quote a month - first century : 1-100 - one month : '198005' - two months : '198003-198004' - one day : '17890714' - the same single day, in a more complicated way : '17890714-17890714' CliMAF internally handles date-time values with a 1 minute accurracy; it can provide date information to external scripts in two forms; see keywords 'period' and 'period_iso' in :py:func:`~climaf.operators.cscript` """ def str_to_date(a_date, end=False): if a_date.startswith("-"): sign = -1 a_date = a_date[1:] elif a_date.startswith("+"): sign = 1 a_date = a_date[1:] else: sign = 1 a_date = a_date.zfill(4) year = int(a_date[0:4]) * sign month = int(a_date[4:6]) if len(a_date) > 5 else 1 day = int(a_date[6:8]) if len(a_date) > 7 else 1 hour = int(a_date[8:10]) if len(a_date) > 9 else 0 minute = int(a_date[10:12]) if len(a_date) > 11 else 0 add_day = 0 add_hour = 0 add_minute = 0 if end: if len(a_date) < 6: year += 1 elif len(a_date) < 8: month += 1 if month > 12: month = 1 year += 1 elif len(a_date) < 10: add_day = 1 elif len(a_date) < 12: add_hour = 1 else: add_minute = 1 try: if year <= 0: raise Climaf_Period_Error("Could not yet deal with negative or null years.") else: rep = datetime.datetime(year=year, month=month, day=day, hour=hour, minute=minute) except: raise Climaf_Period_Error( "String %s is not a date (%s %s %s %s %s)" % (a_date, year, month, day, hour, minute)) if end: rep += datetime.timedelta(days=add_day, hours=add_hour, minutes=add_minute) return rep # clogger.debug("analyzing %s"%dates) if isinstance(dates, cperiod): return dates elif not isinstance(dates, six.string_types): raise Climaf_Period_Error("arg is not a string : " + repr(dates)) else: dates = str(dates) if dates in ['fx', ]: return cperiod('fx') else: period_regexp = re.compile(r"(?P<start>-?\d+)([-_](?P<end>-?\d+))?") period_match = period_regexp.match(dates) if period_match: start = period_match.groupdict()["start"] s = str_to_date(start) end = period_match.groupdict()["end"] if end is None: e = str_to_date(start, end=True) else: e = str_to_date(end, end=True) if s < e: return cperiod(s, e, None) else: raise Climaf_Period_Error("Must have start (%s) before or equal to end (%s)" % (repr(s), repr(e))) else: raise Climaf_Period_Error("Could not create a period with string %s" % dates)
def sort_periods_list(periods_list): # class SortTree(object): def __init__(self, el): self.pivot = el self.smaller = None self.larger = None # def insert(el, tree=None): """ """ if tree is None: return SortTree(el) if repr(tree.pivot) == repr(el): return tree # Discard identical periods if el.is_before(tree.pivot): tree.smaller = insert(el, tree.smaller) else: tree.larger = insert(el, tree.larger) return tree # def walk(tree): if tree is None: return [] rep = walk(tree.smaller) rep.append(tree.pivot) rep.extend(walk(tree.larger)) return rep # if isinstance(periods_list, list) and all([isinstance(elt, cperiod) for elt in periods_list]): clist = copy.copy(periods_list) sorted_tree = SortTree(clist.pop()) while clist: insert(clist.pop(), sorted_tree) return walk(sorted_tree) else: raise Climaf_Period_Error("Can not deal with something else than a list of cperiod objects.") def merge_periods(remain_to_merge, already_merged=list(), handle_360_days_year=True): """ Provided with a list of periods (even un-sorted), returns a list of periods where all consecutive periods have been merged. Argument 'already_merged' is used only in the underlying recursion, and shouldn't usually be provided Argument 'handle_360_days_year' allows to merge consecutive periods which miss only a 31st december,such as in the case with 360-days calendars. It defaults to True For dealing with very long list of periods, which do not allow for recursion, we proceed with batches of N elements """ if not (isinstance(remain_to_merge, list) and all([isinstance(elt, cperiod) for elt in remain_to_merge])): raise Climaf_Period_Error("Can not deal with something else than a list of cperiod objects.") else: N = 300 if isinstance(already_merged, list) and len(already_merged) == 0: if len(remain_to_merge) < 2: return remain_to_merge else: sorted_remain = sorted(remain_to_merge) if len(sorted_remain) <= N: return merge_periods(sorted_remain[1:], [sorted_remain[0]], handle_360_days_year) else: # Avoid too much recursion first_batch = merge_periods(sorted_remain[0:N]) return merge_periods(sorted_remain[N:], first_batch, handle_360_days_year) else: if len(remain_to_merge) > 0: last = already_merged[-1] next_one = remain_to_merge.pop(0) # print "last.end=",last.end,"next.start=",next_one.start # if (last.end == next_one.start) : # already_merged[-1]=cperiod(last.start,next_one.end) if next_one.start <= last.end or (handle_360_days_year and last.end.month == 12 and last.end.day == 31 and next_one.start.month == 1 and next_one.start.day == 1 and next_one.start.year == last.end.year + 1): if next_one.end > last.end: # the next period is not entirely included in the # last merged one already_merged[-1] = cperiod(last.start, next_one.end) else: # There is no overlap between both periods already_merged.append(next_one) # if len(remain_to_merge) > 0: return merge_periods(remain_to_merge, already_merged, handle_360_days_year) else: return already_merged def intersect_periods_list(lperiod1, lperiod2): """ Given two lists of periods, returns a list of the periods representing their intersection Algorithm : for each period in l1, compute intersection with all periods in l2, and add it in a big list; finally, merge the big list """ if not(isinstance(lperiod1, list) and [isinstance(elt, cperiod) for elt in lperiod1] and isinstance(lperiod2, list) and [isinstance(elt, cperiod) for elt in lperiod2]): raise Climaf_Period_Error("Can not deal with something else than list of cperiod objects") else: big = [] for p1 in lperiod1: for p2 in lperiod2: inter = p1.intersects(p2) if inter: big.append(inter) return merge_periods(big) def lastyears(period, nyears): """ Returns a period ending at PERIOD's end and which duration is at most NYEARS """ # print "period=",period, 'type=',type(period),'nyears=',nyears if isinstance(period, six.string_types): period = init_period(period) elif not isinstance(period, cperiod): raise Climaf_Period_Error("Can not deal with periods that are not string or cperiod objects") if not isinstance(nyears, int): raise Climaf_Period_Error("nyears must be an integer, not %s" % nyears) rep = cperiod(period.start, period.end) yend = rep.end.year ystart = rep.start.year if ystart < yend - nyears: s = rep.end rep.start = datetime.datetime(year=yend - nyears, month=s.month, day=s.day, hour=s.hour, minute=s.minute) return repr(rep) def firstyears(period, nyears): """ Returns a period beginning at PERIOD's begin and which duration is at most NYEARS """ if isinstance(period, six.string_types): period = init_period(period) elif not isinstance(period, cperiod): raise Climaf_Period_Error("Can not deal with periods that are not string or cperiod objects") if not isinstance(nyears, int): raise Climaf_Period_Error("nyears must be an integer, not %s" % nyears) rep = cperiod(period.start, period.end) yend = rep.end.year ystart = rep.start.year if yend > ystart + nyears: s = rep.start rep.end = datetime.datetime(year=ystart + nyears, month=s.month, day=s.day, hour=s.hour, minute=s.minute) # print "period=",period, 'type=',type(period),'nyears=',nyears # print rep return repr(rep) def group_periods(diclist): """Assuming DICLIST is a list of dictionnaries which include key 'period', identifies all dicts which have the same content for the other keys, merge the periosd for those dicts and returns the list of dicts with this merge periods Used e.g. on 'return_combinations' output of a series of selectGenericFiles """ tempo = dict() for dic in diclist: aperiod = dic['period'] if not isinstance(aperiod, cperiod): aperiod = init_period(aperiod) # keys = list(dic.keys()) keys.remove('period') keys.sort() tuple_key = tuple([dic[k] for k in keys]) # if tuple_key not in tempo: tempo[tuple_key] = dic.copy() tempo[tuple_key]['period'] = list() tempo[tuple_key]['period'].append(aperiod) # output = list() for key in tempo: dic = tempo[key] dic['period'] = merge_periods(dic['period']) output.append(dic) # return output def freq_to_minutes(data_freq): """ Interprets values returned by Panda's infer_freq() , such as '2D', 'H', '6MS'.. Returns duration in minutes (quite arbitrary for months) """ data_freq = data_freq.replace("mon", "MS") number = re.findall("^[0-9]*", data_freq) if len(number[0]) == 0: number = 1 else: number = int(number[0]) units = re.findall("[A-Z]*$", data_freq)[0] scale = {"M": 1, "H": 60, "D": 60 * 24, "MS": 30 * 60 * 24} if units in scale: return number * scale[units] else: raise Climaf_Error("Cannot interpret frequency %s, returning O minutes" % data_freq) class Climaf_Period_Error(Exception): def __init__(self, valeur): self.valeur = valeur clogger.error(self.__str__()) dedent(100) def __str__(self): return repr(self.valeur)