Commit eea9f298 authored by Alan Marchiori's avatar Alan Marchiori

begin adding localdatabase support

parent d09b155a
......@@ -39,4 +39,7 @@ connection_retry_interval = 25
logfile = campbell.log
localdb = campbell.db
......@@ -13,6 +13,7 @@ import logging
import json
import datetime
import sqlite3
#logging.basicConfig(filename="PyPakLogging.log", level=logging.DEBUG)
#log = logging.getLogger("DataLogger")
......@@ -41,13 +42,14 @@ class tblcache:
class DataLogger:
#def __init__(self, number, socket, my_node_id, metric_id, pakbus_id):
def __init__(self, socket, my_node_id, metric_id, pakbus_id,
def __init__(self, db, socket, my_node_id, metric_id, pakbus_id,
data_callback = None,
security_code = 0x1111):
#self.number = number
self.log = logging.getLogger(__name__)
self.db = sqlite3.connect(db)
self.socket = socket
self.my_node_id = my_node_id
self.metric_id = metric_id
......@@ -78,8 +80,67 @@ class DataLogger:
self.list_of_tables = self.getTables(3)
self.last_collect = {"ts": None,
"NextRecNbr": None}
"NextRecNbr": None}
self.check_db()
def check_db(self):
c = self.db.cursor()
sql = "CREATE TABLE IF NOT EXISTS campbell (" +\
"ID INTEGER PRIMARY KEY," +\
"SERIALNO INT NOT NULL" +\
");"
c.execute(sql)
self.db.commit()
# now get my ID for this datalogger or create
sql = "SELECT ID FROM campbell WHERE SERIALNO = ?"
c.execute(sql, (self.serno,))
rslt = c.fetchone()
if not rslt:
sql = 'INSERT INTO campbell VALUES (Null, ?)'
c.execute(sql, (self.serno,))
self.dbid = c.lastrowid
self.db.commit()
self.log.debug("Created datalogger in table with id {}".format(self.dbid))
else:
self.dbid = rslt[0]
self.log.debug("Datalogger ID in localdb is {}".format(self.dbid))
sql = "CREATE TABLE IF NOT EXISTS datalogger (" +\
"ID INTEGER PRIMARY KEY," +\
"campbell_id INTEGER NOT NULL," +\
"last_upload timestamp," +\
"last_recno INTEGER);"
c.execute(sql)
self.db.commit()
sql = "SELECT last_upload, last_recno FROM datalogger WHERE campbell_id = ?"
c.execute(sql, (self.dbid,))
rslt = c.fetchone()
if rslt and rslt[0] and rslt[1]:
self.last_collect["ts"] = rslt[0]
self.last_collect["NextRecNbr"] = rslt[1]
self.log.debug("Datalogger resuming from recnumber {} last uploaded on {}".format(rslt[1], rslt[0]))
else:
self.update_local_db("Null", "Null")
def update_local_db(self, timestamp, nextrecnumber):
sql = "INSERT INTO datalogger VALUES (Null, ?, ?, ?)"
self.db.execute(sql, (self.dbid, timestamp, nextrecnumber))
self.db.commit()
self.log.debug("Updated localdb with last record {} @ {}".format(nextrecnumber, timestamp))
def stop(self):
self.db.commit()
self.db.close()
def __repr__(self):
return "Metric ID: " + str(self.metric_id) + "; PakBus ID: " + str(self.pakbus_id) + \
"; Table List: " + str(self.list_of_tables)
......@@ -127,22 +188,26 @@ class DataLogger:
for tbl in self.list_of_tables:
self.log.debug("Collecting all from {}.{}".format(self.metric_id, tbl.name))
more_flag = True
# collect most recent record
data, more_flag = pakbus.collect_data(self.socket, self.pakbus_id,
self.my_node_id, self.tabledef,
tbl.name,
CollectMode = 0x03)
self.last_collect['ts'] = datetime.datetime.now()
# compute next record to collect
self.last_collect['NextRecNbr'] = data[0]['BegRecNbr'] + data[0]['NbrOfRecs']
if data and len(data) > 0:
self.emit_all(tbl, data[0]['RecFrag'])
#if self.data_callback:
#for rec in data[0]['RecFrag']:
#self.data_callback(self, rec)
self.last_collect['ts'] = datetime.datetime.now()
# compute next record to collect
self.last_collect['NextRecNbr'] = data[0]['BegRecNbr'] + data[0]['NbrOfRecs']
self.emit_all(tbl, data[0]['RecFrag'])
self.update_local_db(self.last_collect['ts'], self.last_collect['NextRecNbr'])
#if self.data_callback:
#for rec in data[0]['RecFrag']:
#self.data_callback(self, rec)
if more_flag:
self.collect_increment()
......@@ -163,21 +228,27 @@ class DataLogger:
tbl.name,
CollectMode = 0x04,
P1 = self.last_collect['NextRecNbr'])
self.last_collect['ts'] = datetime.datetime.now()
# compute next record to collect
self.last_collect['NextRecNbr'] = data[0]['BegRecNbr'] + data[0]['NbrOfRecs']
self.emit_all(tbl, data[0]['RecFrag'])
if more_flag:
self.log.info("{}.{} has more records to fetch".format(self.metric_id, tbl.name))
if data and len(data) > 0:
self.last_collect['ts'] = datetime.datetime.now()
# compute next record to collect
self.last_collect['NextRecNbr'] = data[0]['BegRecNbr'] + data[0]['NbrOfRecs']
# possible loss of data, we should wait for emit all to complete before
# committing this... but forget it for now.
self.update_local_db(self.last_collect['ts'], self.last_collect['NextRecNbr'])
self.emit_all(tbl, data[0]['RecFrag'])
if more_flag:
self.log.info("{}.{} has more records to fetch".format(self.metric_id, tbl.name))
else:
self.log.info("{}.{} is current".format(self.metric_id, tbl.name))
#if self.data_callback:
#for rec in data[0]['RecFrag']:
#self.data_callback(tbl.dic_of_sensorTags, rec)
else:
self.log.info("{}.{} is current".format(self.metric_id, tbl.name))
#if self.data_callback:
#for rec in data[0]['RecFrag']:
#self.data_callback(tbl.dic_of_sensorTags, rec)
self.log.warn("Collecting data but got no records.")
def collect(self):
if self.last_collect['ts'] == None:
......@@ -193,7 +264,6 @@ class DataLogger:
def collectData(self, addition_size, num_dataloggers):
''' Collect data from datalogger up to "addition_size" number of records '''
raise Exception ("depreciated")
up_to_date = False
......
No preview for this file type
......@@ -119,6 +119,7 @@ def main():
# get time frequency to update database connections
connection_retry_interval = int(cf.get('pakbus', 'connection_retry_interval'), base = 0)
localdb = cf.get('pakbus', 'localdb')
dataloggers = json.loads(cf.get('pakbus', 'datalogger_pakbus_id'))
sec_codes = json.loads(cf.get('pakbus', 'datalogger_security_code'))
......@@ -165,7 +166,8 @@ def main():
#replace the pakbus address with an instance of datalogger
for metric, address in dataloggers.iteritems():
dataloggers[metric] = DataLogger(skt,
dataloggers[metric] = DataLogger(localdb,
skt,
my_node_id,
metric,
address,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment