""" Basic types and syntax for managing time periods in CLIMAF
"""
# S.Senesi 08/2014 : created
import re, datetime
from climaf.clogging import clogger, dedent
class cperiod():
"""
A class for handling a pair of datetime objects defining a period.
Period is defined as [ date1, date2 ]. Resolution for date2 is 1 minute
Attribute 'pattern' usually provides a more condensed form
"""
def __init__(self,start,end=None,pattern=None) :
self.fx=False
if start == 'fx' :
self.fx=True
self.pattern='fx'
elif not isinstance(start,datetime.datetime) or not isinstance(end,datetime.datetime) :
raise Climaf_Period_Error("issue with start or end")
else :
self.start=start ; self.end=end ;
if pattern is None :
self.pattern=self.__repr__()
else:
self.pattern=pattern
#
def __repr__(self):
return self.pr()
#return("%04d%02d%02d%02d%02d-%04d%02d%02d%02d%02d"%(\
# self.start.year,self.start.month,self.start.day,self.start.hour,self.start.minute,
# self.end.year,self.end.month,self.end.day,self.end.hour,self.end.minute))
#
def iso(self):
""" Return isoformat(start)-isoformat(end), (with inclusive end, and 1 minute accuracy)
e.g. : 1980-01-01T00:00:00,1980-12-31T23:59:00
"""
if (self.fx) :
raise Climaf_Period_Error("There is no ISO representation for period 'fx'")
endproxy = self.end - datetime.timedelta(0,60) # substract 1 minute
return "%s,%s"%(self.start.isoformat(),endproxy.isoformat())
#
def pr(self) :
if self.fx : return 'fx'
if (self.start.minute != 0 or self.start.minute != 0):
return("%04d%02d%02d%02d%02d-%04d%02d%02d%02d%02d"%(\
self.start.year,self.start.month,self.start.day,self.start.hour,self.start.minute,
self.end.year,self.end.month,self.end.day,self.end.hour,self.end.minute))
elif (self.start.hour != 0 or self.end.hour != 0 ):
return("%04d%02d%02d%02d-%04d%02d%02d%02d"%(\
self.start.year,self.start.month,self.start.day,self.start.hour,
self.end.year,self.end.month,self.end.day,self.end.hour))
elif (self.start.day != 1 or self.end.day != 1 ):
if (self.end.day != 1 ):
d=self.end.day -1
m=self.end.month
y=self.end.year
else :
end=self.end - datetime.timedelta(1)
y=end.year ; m=end.month; d=end.day
if (self.start.year,self.start.month,self.start.day)== (y,m,d) :
return("%04d%02d%02d"%(y,m,d))
else:
return("%04d%02d%02d-%04d%02d%02d"%(\
self.start.year,self.start.month,self.start.day,
y,m,d))
elif (self.start.month != 1 or self.end.month != 1 ):
if (self.end.month != 1 ):
m=self.end.month -1
y=self.end.year
else :
m=12
y=self.end.year-1
if self.start.year==y and self.start.month==m :
return("%04d%02d"%(self.start.year,self.start.month))
else:
return("%04d%02d-%04d%02d"%(self.start.year,self.start.month,y, m))
else :
if self.start.year != self.end.year-1 :
return("%04d-%04d"%(self.start.year, self.end.year-1))
else:
return("%04d"%(self.start.year))
#
def hasFullYear(self,year):
if (self.fx) :
raise Climaf_Period_Error("Meaningless for period 'fx'")
return( int(year) >= self.start.year and int(year) < self.end.year)
#
def start_with(self,begin) :
""" If period BEGIN actually begins period SELF, returns the
complement of BEGIN in SELF; otherwise returns None """
if (self.fx) :return(False)
if self.start==begin.start and self.end >= begin.end :
return cperiod(begin.end,self.end)
#
def is_before(self,candidate) :
""" True if period SELF starts before period CANDIDATE
"""
if (self.fx) :return(False)
return self.start <= candidate.start
#
def includes(self,included) :
""" if period self does include period 'included', returns a pair of
periods which represents the difference """
if (self.fx) :return(False)
#raise Climaf_Period_Error("Meaningless for period 'fx'")
if self.start <= included.start and included.end <= self.end :
return cperiod(self.start,included.start), cperiod(included.end,self.end)
#
def intersects(self,other) :
"""
Returns the intersection of period self and period 'other' if any
"""
if (self.fx) :
raise Climaf_Period_Error("Meaningless for period 'fx'")
if other :
start=self.start
if (other.start > start) : start=other.start
end=self.end
if (other.end < end) : end=other.end
if (start < end) : return cperiod(start,end)
[docs]def init_period(dates) :
"""
Init a CliMAF 'period' object
Args:
dates (str): must match YYYY[MM[DD[HH[MM]]]][(-\|_)YYYY[MM[DD[HH[MM]]]]] , or
be 'fx' for fixed fields
Returns:
the corresponding CliMAF 'period' object
When using only YYYY, can omit some Ys (for zeros).
Cannot handle year 0000
Examples :
- a one-year long period : '1980', or '1980-1980'
- a decade : '1980-1989'
- first millenium : 1-1000 # Must have leading zeroes if you want to quote a month
- first century : 1-100
- one month : '198005'
- two months : '198003-198004'
- one day : '17890714'
- the same single day, in a more complicated way : '17890714-17890714'
CliMAF internally handles date-time values with a 1 minute accurracy; it can provide date
information to external scripts in two forms; see keywords 'period' and 'period_iso' in
:py:func:`~climaf.operators.cscript`
"""
#clogger.debug("analyzing %s"%dates)
if not type(dates) is str :
raise Climaf_Period_Error("arg is not a string : "+`dates`)
if (dates == 'fx' ) : return cperiod('fx')
start=re.sub(r'^([0-9]{1,12}).*',r'\1',dates)
# Pad with leading 0 to reach a length of 4 characters
start=(4-len(start))*"0"+start
# TBD : check that start actually matches a date
syear =int(start[0:4])
smonth =int(start[4:6]) if len(start) > 5 else 1
sday =int(start[6:8]) if len(start) > 7 else 1
shour =int(start[8:10]) if len(start) > 9 else 0
sminute=int(start[10:12])if len(start) > 11 else 0
try :
s=datetime.datetime(year=syear,month=smonth,day=sday,hour=shour,minute=sminute)
except :
raise Climaf_Period_Error("period start string %s is not a date (%s %s %s %s %s)"%(start,syear,smonth,sday,shour,sminute))
#
end=re.sub(r'.*[-_]([0-9]{1,12})$',r'\1',dates)
end=(4-len(end))*"0"+end
#clogger.debug("For dates=%s, start= %s, end=%s"%(dates,start,end))
done=False
if (end==dates) :
# No string found for end of period
if (len(start)<=4 ) : eyear=syear+1 ; emonth=1 ; eday=1 ; ehour=0
elif (len(start)==6 ) :
eyear=syear ; emonth=smonth+1 ;
if (emonth > 12) :
emonth=1
eyear=eyear+1
eday=1 ; ehour=0
elif (len(start)==8 ) :
eyear=syear ; emonth=smonth ; eday=sday ; ehour=0
if (sday > 27) :
# Must use datetime for handling month length
e=s+datetime.timedelta(1)
done=True
else : eday=sday+1
elif (len(start)==10 ) :
eyear=syear ; emonth=smonth ; eday=sday ; ehour=shour+1
if (ehour > 23) :
ehour=0
eday=eday+1
eminute = 0
else:
#clogger.debug("len(end)=%d"%len(end))
if len(start) != len(end) :
raise Climaf_Period_Error("Must have same numer of digits for start and end dates (%s and %s)"%(start,end))
if (len(end)<12) :
eminute = 0
else :
eminute=int(end[10:12])
if (len(end)==4 ) : eyear=int(end[0:4])+1 ; emonth=1 ; eday=1 ; ehour=0
elif (len(end)==6 ) :
eyear=int(end[0:4]) ; emonth=int(end[4:6])+1 ; eday=1 ; ehour=0
if (emonth > 12) :
emonth=1
eyear=eyear+1
elif (len(end)==8 ) :
eyear=int(end[0:4]) ; emonth=int(end[4:6]) ; eday=int(end[6:8]) ; ehour=0
if (eday > 27) :
try :
#print "trying %d %d %d %d %d"%(eyear,emonth,eday,ehour,eminute)
e=datetime.datetime(year=eyear,month=emonth,day=eday,hour=ehour,minute=eminute)
except:
raise Climaf_Period_Error("period end string %s is not a date"%end)
e=e+datetime.timedelta(1)
done=True
else:
eday=eday+1
elif (len(end)==10 ) :
eyear=int(end[0:4]) ; emonth=int(end[4:6]) ; eday=int(end[6:8]) ; ehour=int(end[8:10])+1
if (ehour > 23) :
ehour=0
eday=eday+1
elif (len(end)==12 ) :
eyear=int(end[0:4]) ; emonth=int(end[4:6]) ; eday=int(end[6:8]) ; ehour=int(end[8:10]) ; eminute=int(end[10:12])
#
if not done :
try :
#print "try:%d %02d %02d %02d %02d"%(eyear,emonth,eday,ehour,eminute)
e=datetime.datetime(year=eyear,month=emonth,day=eday,hour=ehour,minute=eminute)
except:
raise Climaf_Period_Error("period end string %s is not a date"%end)
if s < e :
return cperiod(s,e,None)
else :
raise Climaf_Period_Error("Must have start ("+`s`+") before,(or equal to, end ("+`e`+")")
def sort_periods_list(periods_list):
#
class SortTree() :
def __init__(self,el):
self.pivot=el
self.smaller=None
self.larger=None
#
def insert(el,tree=None):
"""
"""
if tree is None : return SortTree(el)
if (`tree.pivot`==`el`) : return tree # Discard identical periods
if el.is_before(tree.pivot) : tree.smaller=insert(el,tree.smaller)
else : tree.larger =insert(el,tree.larger )
return tree
#
def walk(tree):
if tree is None : return []
rep=walk(tree.smaller)
rep.append(tree.pivot)
rep.extend(walk(tree.larger))
return rep
#
import copy
clist=copy.copy(periods_list)
sorted_tree=SortTree(clist.pop())
while clist : insert(clist.pop(),sorted_tree)
return walk(sorted_tree)
def merge_periods(remain_to_merge, already_merged=[]):
if already_merged==[] :
if len (remain_to_merge) <2 :
return remain_to_merge
sorted=sort_periods_list(remain_to_merge)
return merge_periods(sorted[1:], [sorted[0]])
if len(remain_to_merge) > 0 :
last=already_merged[-1]
next_one=remain_to_merge.pop(0)
#print "last.end=",last.end,"next.start=",next_one.start
#if (last.end == next_one.start) :
# already_merged[-1]=cperiod(last.start,next_one.end)
if (next_one.start <= last.end) :
if next_one.end > last.end :
# the next period is not entirely included in the
# last merged one
already_merged[-1]=cperiod(last.start,next_one.end)
else:
# There is no overlap between both periods
already_merged.append(next_one)
#
if len(remain_to_merge) > 0 :
return merge_periods(remain_to_merge, already_merged)
else:
return already_merged
class Climaf_Period_Error(Exception):
def __init__(self, valeur):
self.valeur = valeur
clogger.error(self.__str__())
dedent(100)
def __str__(self):
return `self.valeur`