Source code for tcsdb

#!/usr/bin/env python

import numpy as np
from scipy import interpolate
import sqlite3
import sys
from types import MethodType
#import inspect # Not needed?
import itertools

# XXX hack to turn event strings into valid Python instance names
def mangle(string):
    # We have to split each string into parts, so each dotted name is mangled
    parts = string.split(".")
    newparts = []
    for p in parts:
        if p[0].isdigit():
            p = "_" + p
        p = p.replace("-","_")
        p = p.replace("(","_")
        p = p.replace(")","_")
        p = p.replace(":","_")
        newparts.append(p)
    return ".".join(newparts)

def maybefloat(x):
    try:
        try:
            return int(x)
        except:
            return float(x)
    except:
        return x

# Uses dotted notation to determine hierarchy
def dict_to_hierarchy(x):
    # If the result is like {'': 1234}, then we just want the 1234
    if len(x) == 1 and len(x.keys()[0]) == 0:
        return x.values()[0]
    result = {}
    for k,v in x.items():
        parts = k.split(".")
        cur = result
        for part in parts[:-1]: # All but the last one
            if part not in cur:
                cur[part] = {}
            cur = cur[part]
        cur[parts[-1]] = v
    return result

def build_times_filter(start_time, stop_time):
    times = []
    if start_time is not None:
        if stop_time is not None:
            times = [start_time, stop_time]
        else:
            times = [start_time]
    return times

def step_interpolator(t,x, new_t, datamap=lambda x: x):
    idx = 0
    newx = []
    for tm in new_t:
        while tm > t[idx] and idx < len(t)-1:
            idx += 1
        newx.append(datamap(x[idx]))
    return newx

def build_query(times_, system, source, key, attr):
    times_nano = times_ * 1e9
    time_constraint = ""
    if times_.size > 0:
        # there's gotta be a different way
        if times_.size > 1:
            time_constraint = ( ' and origts >= ' + str(times_nano.min()) + ' and origts <= ' + str(times_nano.max()) )
        else:
            time_constraint = ( ' and origts >= ' + str(times_nano[0]) )
    keyname_constraint = ''
    if attr is not None:
        keyname_constraint = 'WHERE keyname LIKE "%s.%%" OR keyname = "%s"'%(attr, attr)
    source_constraint = ''
    system_constraint = ''
    key_constraint = ''
    if system is not None:
        system_constraint = 'AND system = "%s"'%system
        if source is not None:
            source_constraint = 'AND source = "%s"'%source
            if key is not None:
                key_constraint = 'AND key = "%s"'%key

    tq = ("""SELECT attribute.event_id, attribute.value, b.keyname, b.value, etab.system, etab.source, etab.key, b.data_type FROM attribute
             INNER JOIN (SELECT event_id, source, system, key FROM event WHERE 1=1 %s %s %s %s) AS etab
                ON etab.event_id = attribute.event_id AND attribute.keyname = "__data_time"
             INNER JOIN (SELECT event_id, value, keyname, data_type FROM attribute %s) AS b
                ON etab.event_id = b.event_id
    """%(time_constraint, system_constraint, source_constraint, key_constraint, keyname_constraint))
    return tq

def rows_to_object(rows, system, source, key, attr):
    obj = {}
    tm = 0.0
    types = {}
    eventKey = None
    for event, time, keyname, val, esystem, esource, ekey, data_type in rows:
        # Subtract the keyname we started with
        if attr is not None:
            keyname = keyname[len(attr)+1:]

        if system is None:
            keyname = "%s.%s.%s.%s"%(esystem, esource, ekey, keyname)
            eventKey = "%s.%s.%s"%(esystem, esource, ekey)
        elif source is None:
            keyname = "%s.%s.%s"%(esource, ekey, keyname)
            eventKey = "%s.%s.%s"%(system, esource, ekey)
        elif key is None:
            keyname = "%s.%s"%(ekey, keyname)
            eventKey = "%s.%s.%s"%(system, source, ekey)
        else:
            eventKey = "%s.%s.%s"%(system, source, key)

        eventKey = "%s.%s.%s"%(esystem, esource, ekey)

        tm = float(time)
        obj[keyname] = maybefloat(val)
        types[keyname] = data_type
    return eventKey, tm, obj, types

def query(cursor,system,source,key,attr,times,interp_kind):
    # assume fixed precision seconds and convert to floating point nanoseconds for the query
    times_ = np.array([float(i) for i in times])
    tq = build_query(times_, system, source, key, attr)
    result_iter = cursor.execute(tq)

    # Now that we have the data, we convert it into an object with lists of the
    # data. To account for inhomogeneity, each system.source.key has a different
    # time list.

    times = {}
    data = {}
    key_types = {}

    # Group the data coming from the DB by the event
    for k, rows in itertools.groupby(result_iter, lambda obj: obj[0]):
        #obj = {}
        #tm = 0.0
        #eventKey = None

        eventKey, tm, obj, objTypes = rows_to_object(rows, system, source, key, attr)

        for keyname,data_type in objTypes.items():
            # Update the datatype of this key appropriately
            if eventKey not in key_types:
                key_types[eventKey] = {}
            prev_type = key_types[eventKey].get(keyname, data_type)
            if prev_type != data_type:
                print "Key '%s' type changed from '%s' to '%s'!"%(keyname, prev_type, data_type)
            key_types[eventKey][keyname] = data_type
        #objects.append(obj)
        #times.append(tm)
        if eventKey not in times:
            times[eventKey] = []
        times[eventKey].append(tm)
        if eventKey not in data:
            data[eventKey] = {}
            for key in obj.keys():
                data[eventKey][key] = []
        for k,v in obj.items():
            data[eventKey][k].append(v)
        pass

    # If we have no times or a start & stop time, return the results directly:
    if times_.size <= 2:
        # We have to transpose from object of arrays to array of objects.
        objects = []
        for eventKey, eventStructure in data.items():
            event = []
            for key,vals in eventStructure.items():
                if len(event) == 0:
                    event = []
                    for tm in times[eventKey]:
                        event.append((tm, {}))
                for i,val in enumerate(vals):
                    event[i][1][key] = val
                pass
            #print eventKey
            objects += event
            pass
        times,data = zip(*objects)
        return list(times), list(map(lambda obj: dict_to_hierarchy(obj), data))

    # Interpolate all of the data
    for eventKey, eventTypes in key_types.items():
        for itemKey, data_type in eventTypes.items():
            t = times[eventKey]
            x = data[eventKey][itemKey]
            #print "%s/%s is %s"%(eventKey, itemKey, data_type)
            if data_type == "string":
                data[eventKey][itemKey] = step_interpolator(t,x, times_)
            elif data_type == "boolean":
                data[eventKey][itemKey] = step_interpolator(t,x, times_, lambda x: x == "true")
            elif data_type == "number":
                # Interpolate as a number
                if len(t) > 1:
                    data[eventKey][itemKey] = interpolate.interp1d(t, x, bounds_error=False, kind=interp_kind)(times_)
                else:
                    # With 1 element, what can we do?
                    data[eventKey][itemKey] = [x]*len(times_)
            pass

    # Transpose the data from {eventKey: {itemKey: [...]}} to [{itemKey: } ... ]
    result = []
    for i in range(len(times_)):
        newobj = {}
        for eventKey, event in data.items():
            for itemKey, d in event.items():
                newobj[itemKey] = d[i]
        result.append(newobj)
        pass
    return times_, map(lambda obj: dict_to_hierarchy(obj), result)

class attr_descriptor:

    def __init__(self,cursor,system,source,key,attr):
        self.cursor_ = cursor
        self.system_ = system
        self.source_ = source
        self.key_ = key
        self.attr_ = attr

    def __call__(self, times=np.array([]), interp_kind='linear'):
        return query(self.cursor_, self.system_, self.source_, self.key_, self.attr_, times, interp_kind)

class key_descriptor:

    def __init__(self,cursor,system,source,key,attr):
        self.cursor_ = cursor
        self.system_ = system
        self.source_ = source
        self.key_ = key
        self.add_attr(cursor,system,source,key,attr)

    def __call__(self, times=np.array([]), interp_kind='linear'):
        return query(self.cursor_, self.system_, self.source_, self.key_, None, times, interp_kind)

    def add_attr(self,cursor,system,source,key,attr):
        # split the attribute into components and append them to the data structure.
        # the terminal component becomes the query method for this attribute, i.e.
        # for system a.b.c with attribute x.y.z we get a method z, a.b.c.x.y.z(), that
        # returns the underlying array from the database.
        attrs = attr.split('.')
        p = self
        attrs_sofar = []
        for attr in attrs:
            n = mangle(attr)
            attrs_sofar.append(attr)
            if not hasattr(p,n):
                adesc = attr_descriptor(cursor,system,source,key,".".join(attrs_sofar))
                setattr(p, n, adesc)
                p = adesc
            else:
                p = getattr(p,n)
            pass

class source_descriptor:

    def __init__(self,cursor,system,source,key,attr):
        kdesc = key_descriptor(cursor,system,source,key,attr)
        setattr(self, mangle(key), kdesc)
        self.cursor_ = cursor
        self.system_ = system
        self.source_ = source

    def __call__(self, times=np.array([]), interp_kind='linear'):
        return query(self.cursor_, self.system_, self.source_, None, None, times, interp_kind)

class system_descriptor:

    def __init__(self,cursor,system,source,key,attr):
        sdesc = source_descriptor(cursor,system,source,key,attr)
        setattr(self, mangle(source), sdesc)
        self.cursor_ = cursor
        self.system_ = system

    def __call__(self, times=np.array([]), interp_kind='linear'):
        return query(self.cursor_, self.system_, None, None, None, times, interp_kind)

[docs]class tcsdb: def __init__(self, db): # open the database. self.db_ = sqlite3.connect(db) self.db_.text_factory = str cursor = self.db_.cursor() # XXX keep this? cursor.execute( "pragma read_uncommitted" ) # get a list of the event keys. cursor.execute( "select distinct system, source, key from event" ) event_tokens = cursor.fetchall() # since we are working with single attribute queries from here on, implement # a row_factory that will return us a list from each query, rather than a list # of tuples. self.db_.row_factory = lambda cursor, row: row[0] if len(row) == 1 else row cursor = self.db_.cursor() self.cursor_ = cursor self.events_ = [] self.event_ids_ = [] self.event_keys_ = {} # build a dictionary of event keys to event contents. for I in range(len(event_tokens)): system = event_tokens[I][0] source = event_tokens[I][1] key = event_tokens[I][2] self.events_.append( system + "." + source + "." + key ) cursor.execute( "select distinct event_id from event where " "system=\"" + system + "\" and " "source=\"" + source + "\" and " "key=\"" + key + "\" " "order by event_id asc limit 1" ) event_id = cursor.fetchone() cursor.execute( "select keyname from attribute where event_id=" + str(event_id) ) event_keys = cursor.fetchall() self.event_keys_[self.events_[-1]] = [] # XXX mangle the strings so that this works. system_mangled = mangle(system) source_mangled = mangle(source) key_mangled = mangle(key) # build accessor methods in an object hierarchy. system_object = None if not hasattr(self, system_mangled): system_object = system_descriptor(cursor,system,source,key,event_keys[0]) setattr(self, system_mangled, system_object) else: system_object = getattr(self, system_mangled) source_object = None if not hasattr(system_object, source_mangled): source_object = source_descriptor(cursor,system,source,key,event_keys[0]) setattr(system_object, source_mangled, source_object) else: source_object = getattr(system_object, source_mangled) if not hasattr(source_object, key_mangled): key_object = key_descriptor(cursor,system,source,key,event_keys[0]) setattr(source_object, key_mangled, key_object) else: key_object = getattr(source_object, key_mangled) for E in event_keys: self.event_keys_[self.events_[-1]].append(E) #exec("self." + system_mangled + "." + source_mangled + "." + key_mangled + ".add_attr(cursor,system,source,key,E)") key_object.add_attr(cursor, system, source, key, E) cursor.execute( "select origts from event order by origts asc limit 1" ) self.start_ = cursor.fetchone() / 1e9 cursor.execute( "select origts from event order by origts desc limit 1" ) self.stop_ = cursor.fetchone() / 1e9 def keys(self,e): return self.event_keys_[e] def events(self): return self.events_ def start(self): return self.start_ def stop(self): return self.stop_ def __call__(self, times=np.array([]), interp_kind='linear'): return query(self.cursor_, None, None, None, None, times, interp_kind) def __del__(self): try: self.db_.close() except: pass