#!/usr/bin/env python
import numpy as np
from scipy import interpolate
import sqlite3
import sys
from types import MethodType
#import inspect # Not needed?
import itertools
# XXX hack to turn event strings into valid Python instance names
def mangle(string):
# We have to split each string into parts, so each dotted name is mangled
parts = string.split(".")
newparts = []
for p in parts:
if p[0].isdigit():
p = "_" + p
p = p.replace("-","_")
p = p.replace("(","_")
p = p.replace(")","_")
p = p.replace(":","_")
newparts.append(p)
return ".".join(newparts)
def maybefloat(x):
try:
try:
return int(x)
except:
return float(x)
except:
return x
# Uses dotted notation to determine hierarchy
def dict_to_hierarchy(x):
# If the result is like {'': 1234}, then we just want the 1234
if len(x) == 1 and len(x.keys()[0]) == 0:
return x.values()[0]
result = {}
for k,v in x.items():
parts = k.split(".")
cur = result
for part in parts[:-1]: # All but the last one
if part not in cur:
cur[part] = {}
cur = cur[part]
cur[parts[-1]] = v
return result
def build_times_filter(start_time, stop_time):
times = []
if start_time is not None:
if stop_time is not None:
times = [start_time, stop_time]
else:
times = [start_time]
return times
def step_interpolator(t,x, new_t, datamap=lambda x: x):
idx = 0
newx = []
for tm in new_t:
while tm > t[idx] and idx < len(t)-1:
idx += 1
newx.append(datamap(x[idx]))
return newx
def build_query(times_, system, source, key, attr):
times_nano = times_ * 1e9
time_constraint = ""
if times_.size > 0:
# there's gotta be a different way
if times_.size > 1:
time_constraint = ( ' and origts >= ' + str(times_nano.min()) + ' and origts <= ' + str(times_nano.max()) )
else:
time_constraint = ( ' and origts >= ' + str(times_nano[0]) )
keyname_constraint = ''
if attr is not None:
keyname_constraint = 'WHERE keyname LIKE "%s.%%" OR keyname = "%s"'%(attr, attr)
source_constraint = ''
system_constraint = ''
key_constraint = ''
if system is not None:
system_constraint = 'AND system = "%s"'%system
if source is not None:
source_constraint = 'AND source = "%s"'%source
if key is not None:
key_constraint = 'AND key = "%s"'%key
tq = ("""SELECT attribute.event_id, attribute.value, b.keyname, b.value, etab.system, etab.source, etab.key, b.data_type FROM attribute
INNER JOIN (SELECT event_id, source, system, key FROM event WHERE 1=1 %s %s %s %s) AS etab
ON etab.event_id = attribute.event_id AND attribute.keyname = "__data_time"
INNER JOIN (SELECT event_id, value, keyname, data_type FROM attribute %s) AS b
ON etab.event_id = b.event_id
"""%(time_constraint, system_constraint, source_constraint, key_constraint, keyname_constraint))
return tq
def rows_to_object(rows, system, source, key, attr):
obj = {}
tm = 0.0
types = {}
eventKey = None
for event, time, keyname, val, esystem, esource, ekey, data_type in rows:
# Subtract the keyname we started with
if attr is not None:
keyname = keyname[len(attr)+1:]
if system is None:
keyname = "%s.%s.%s.%s"%(esystem, esource, ekey, keyname)
eventKey = "%s.%s.%s"%(esystem, esource, ekey)
elif source is None:
keyname = "%s.%s.%s"%(esource, ekey, keyname)
eventKey = "%s.%s.%s"%(system, esource, ekey)
elif key is None:
keyname = "%s.%s"%(ekey, keyname)
eventKey = "%s.%s.%s"%(system, source, ekey)
else:
eventKey = "%s.%s.%s"%(system, source, key)
eventKey = "%s.%s.%s"%(esystem, esource, ekey)
tm = float(time)
obj[keyname] = maybefloat(val)
types[keyname] = data_type
return eventKey, tm, obj, types
def query(cursor,system,source,key,attr,times,interp_kind):
# assume fixed precision seconds and convert to floating point nanoseconds for the query
times_ = np.array([float(i) for i in times])
tq = build_query(times_, system, source, key, attr)
result_iter = cursor.execute(tq)
# Now that we have the data, we convert it into an object with lists of the
# data. To account for inhomogeneity, each system.source.key has a different
# time list.
times = {}
data = {}
key_types = {}
# Group the data coming from the DB by the event
for k, rows in itertools.groupby(result_iter, lambda obj: obj[0]):
#obj = {}
#tm = 0.0
#eventKey = None
eventKey, tm, obj, objTypes = rows_to_object(rows, system, source, key, attr)
for keyname,data_type in objTypes.items():
# Update the datatype of this key appropriately
if eventKey not in key_types:
key_types[eventKey] = {}
prev_type = key_types[eventKey].get(keyname, data_type)
if prev_type != data_type:
print "Key '%s' type changed from '%s' to '%s'!"%(keyname, prev_type, data_type)
key_types[eventKey][keyname] = data_type
#objects.append(obj)
#times.append(tm)
if eventKey not in times:
times[eventKey] = []
times[eventKey].append(tm)
if eventKey not in data:
data[eventKey] = {}
for key in obj.keys():
data[eventKey][key] = []
for k,v in obj.items():
data[eventKey][k].append(v)
pass
# If we have no times or a start & stop time, return the results directly:
if times_.size <= 2:
# We have to transpose from object of arrays to array of objects.
objects = []
for eventKey, eventStructure in data.items():
event = []
for key,vals in eventStructure.items():
if len(event) == 0:
event = []
for tm in times[eventKey]:
event.append((tm, {}))
for i,val in enumerate(vals):
event[i][1][key] = val
pass
#print eventKey
objects += event
pass
times,data = zip(*objects)
return list(times), list(map(lambda obj: dict_to_hierarchy(obj), data))
# Interpolate all of the data
for eventKey, eventTypes in key_types.items():
for itemKey, data_type in eventTypes.items():
t = times[eventKey]
x = data[eventKey][itemKey]
#print "%s/%s is %s"%(eventKey, itemKey, data_type)
if data_type == "string":
data[eventKey][itemKey] = step_interpolator(t,x, times_)
elif data_type == "boolean":
data[eventKey][itemKey] = step_interpolator(t,x, times_, lambda x: x == "true")
elif data_type == "number":
# Interpolate as a number
if len(t) > 1:
data[eventKey][itemKey] = interpolate.interp1d(t, x, bounds_error=False, kind=interp_kind)(times_)
else:
# With 1 element, what can we do?
data[eventKey][itemKey] = [x]*len(times_)
pass
# Transpose the data from {eventKey: {itemKey: [...]}} to [{itemKey: } ... ]
result = []
for i in range(len(times_)):
newobj = {}
for eventKey, event in data.items():
for itemKey, d in event.items():
newobj[itemKey] = d[i]
result.append(newobj)
pass
return times_, map(lambda obj: dict_to_hierarchy(obj), result)
class attr_descriptor:
def __init__(self,cursor,system,source,key,attr):
self.cursor_ = cursor
self.system_ = system
self.source_ = source
self.key_ = key
self.attr_ = attr
def __call__(self, times=np.array([]), interp_kind='linear'):
return query(self.cursor_, self.system_, self.source_, self.key_, self.attr_, times, interp_kind)
class key_descriptor:
def __init__(self,cursor,system,source,key,attr):
self.cursor_ = cursor
self.system_ = system
self.source_ = source
self.key_ = key
self.add_attr(cursor,system,source,key,attr)
def __call__(self, times=np.array([]), interp_kind='linear'):
return query(self.cursor_, self.system_, self.source_, self.key_, None, times, interp_kind)
def add_attr(self,cursor,system,source,key,attr):
# split the attribute into components and append them to the data structure.
# the terminal component becomes the query method for this attribute, i.e.
# for system a.b.c with attribute x.y.z we get a method z, a.b.c.x.y.z(), that
# returns the underlying array from the database.
attrs = attr.split('.')
p = self
attrs_sofar = []
for attr in attrs:
n = mangle(attr)
attrs_sofar.append(attr)
if not hasattr(p,n):
adesc = attr_descriptor(cursor,system,source,key,".".join(attrs_sofar))
setattr(p, n, adesc)
p = adesc
else:
p = getattr(p,n)
pass
class source_descriptor:
def __init__(self,cursor,system,source,key,attr):
kdesc = key_descriptor(cursor,system,source,key,attr)
setattr(self, mangle(key), kdesc)
self.cursor_ = cursor
self.system_ = system
self.source_ = source
def __call__(self, times=np.array([]), interp_kind='linear'):
return query(self.cursor_, self.system_, self.source_, None, None, times, interp_kind)
class system_descriptor:
def __init__(self,cursor,system,source,key,attr):
sdesc = source_descriptor(cursor,system,source,key,attr)
setattr(self, mangle(source), sdesc)
self.cursor_ = cursor
self.system_ = system
def __call__(self, times=np.array([]), interp_kind='linear'):
return query(self.cursor_, self.system_, None, None, None, times, interp_kind)
[docs]class tcsdb:
def __init__(self, db):
# open the database.
self.db_ = sqlite3.connect(db)
self.db_.text_factory = str
cursor = self.db_.cursor()
# XXX keep this?
cursor.execute( "pragma read_uncommitted" )
# get a list of the event keys.
cursor.execute( "select distinct system, source, key from event" )
event_tokens = cursor.fetchall()
# since we are working with single attribute queries from here on, implement
# a row_factory that will return us a list from each query, rather than a list
# of tuples.
self.db_.row_factory = lambda cursor, row: row[0] if len(row) == 1 else row
cursor = self.db_.cursor()
self.cursor_ = cursor
self.events_ = []
self.event_ids_ = []
self.event_keys_ = {}
# build a dictionary of event keys to event contents.
for I in range(len(event_tokens)):
system = event_tokens[I][0]
source = event_tokens[I][1]
key = event_tokens[I][2]
self.events_.append( system + "." + source + "." + key )
cursor.execute( "select distinct event_id from event where "
"system=\"" + system + "\" and "
"source=\"" + source + "\" and "
"key=\"" + key + "\" "
"order by event_id asc limit 1" )
event_id = cursor.fetchone()
cursor.execute( "select keyname from attribute where event_id=" + str(event_id) )
event_keys = cursor.fetchall()
self.event_keys_[self.events_[-1]] = []
# XXX mangle the strings so that this works.
system_mangled = mangle(system)
source_mangled = mangle(source)
key_mangled = mangle(key)
# build accessor methods in an object hierarchy.
system_object = None
if not hasattr(self, system_mangled):
system_object = system_descriptor(cursor,system,source,key,event_keys[0])
setattr(self, system_mangled, system_object)
else:
system_object = getattr(self, system_mangled)
source_object = None
if not hasattr(system_object, source_mangled):
source_object = source_descriptor(cursor,system,source,key,event_keys[0])
setattr(system_object, source_mangled, source_object)
else:
source_object = getattr(system_object, source_mangled)
if not hasattr(source_object, key_mangled):
key_object = key_descriptor(cursor,system,source,key,event_keys[0])
setattr(source_object, key_mangled, key_object)
else:
key_object = getattr(source_object, key_mangled)
for E in event_keys:
self.event_keys_[self.events_[-1]].append(E)
#exec("self." + system_mangled + "." + source_mangled + "." + key_mangled + ".add_attr(cursor,system,source,key,E)")
key_object.add_attr(cursor, system, source, key, E)
cursor.execute( "select origts from event order by origts asc limit 1" )
self.start_ = cursor.fetchone() / 1e9
cursor.execute( "select origts from event order by origts desc limit 1" )
self.stop_ = cursor.fetchone() / 1e9
def keys(self,e):
return self.event_keys_[e]
def events(self):
return self.events_
def start(self):
return self.start_
def stop(self):
return self.stop_
def __call__(self, times=np.array([]), interp_kind='linear'):
return query(self.cursor_, None, None, None, None, times, interp_kind)
def __del__(self):
try:
self.db_.close()
except:
pass