Commit f981f392 authored by Alan Marchiori's avatar Alan Marchiori

significant rewrite

parent dc5ec3a8
[ZeroMQQueueServer]
ServerIP = 134.82.132.36
ServerIP = amm-csr2
ServerPort = 5555
[ZeroMQClient]
......
No preview for this file type
No preview for this file type
......@@ -16,9 +16,9 @@ class _ClientConf(object):
def __init__(self):
self.conf = ConfigParser.ConfigParser()
hasFile = False
if os.path.isfile("client.conf"):
self.conf.read("client.conf")
logging.info("Using configuration file at client.conf")
if os.path.isfile("./client.conf"):
self.conf.read("./client.conf")
logging.info("Using configuration file at {}/client.conf".format(os.getcwd()))
hasFile = True
else:
logging.warn("Could not find client.conf. Use default setting now")
......@@ -68,6 +68,7 @@ class SensorAPI(ZeroMQClient.Client):
datas = []
self.logger.info("Put data count: {0}".format(len(putDatas)))
# too much log
if self.logger.isEnabledFor(logging.DEBUG):
for data in putDatas:
self.logger.debug("PutData: {0}".format(data))
......
No preview for this file type
This diff is collapsed.
......@@ -7,16 +7,16 @@ import json
class Client(object):
def __init__(self):
self.REQUEST_TIMEOUT = 100000
self.REQUEST_TIMEOUT = 5000
self.REQUEST_RETRIES = 3
self.SERVER_ENDPOINT = ""
self.__client = None
self.context = None
self.__poll = None
self.log = logging.getLogger(__name__)
def _connect(self):
logging.info("Connecting to the server")
self.log.info("Connecting to the server: {}".format(self.SERVER_ENDPOINT))
self.context = zmq.Context(1)
self.__client = self.context.socket(zmq.REQ)
self.__client.connect(self.SERVER_ENDPOINT)
......@@ -29,17 +29,17 @@ class Client(object):
retry = self.REQUEST_RETRIES
while retry >= 0:
request = data
logging.debug("Sending {0}".format(request))
self.log.debug("Sending {0}".format(request))
self.__client.send_json(request)
expect_reply = True
socks = dict(self.__poll.poll(self.REQUEST_TIMEOUT))
if socks.get(self.__client) == zmq.POLLIN:
reply = self.__client.recv()
logging.debug("Received {0}".format(reply))
self.log.debug("Received {0}".format(reply))
return reply
else:
logging.error("No response from server, retrying again")
self.log.error("No response from server, retrying again")
# Socket is confused. Close and remove it
self.__client.setsockopt(zmq.LINGER, 0)
self.__client.close()
......@@ -54,7 +54,7 @@ class Client(object):
self.__poll.register(self.__client)
def _close(self):
logging.debug("Closing client")
self.log.warn("Closing client")
self.__client.term()
......@@ -7,8 +7,23 @@ timeout = 30
my_node_id = 0x802
pakbus_ids = 0x001,0x002,0x003
metric_ids = CR1000,CR800_1,CR800_2
#changed format of this to a JSON dict.
#pakbus_ids = 0x001,0x002,0x003
#metric_ids = CR1000,CR800_1,CR800_2
# datalogger name and it's pakbus ID
#datalogger_pakbus_id = {"CR1000": 1,
# "CR800_1": 2,
# "CR800_2": 3}
datalogger_pakbus_id = {"CR1000": 1}
datalogger_security_code = {"CR1000": 4369,
"CR800_1": 4369,
"CR800_2": 4369}
router_node = 0x001 # need to implement for DstPhyAddr = 0x001
# SrcPhyAddr = 0x802
......@@ -22,6 +37,6 @@ database_update_time_gap = 300
connection_retry_interval = 25
logfile = campbell.log
[ZeroMQQueueServer]
ServerIP = 134.82.132.36
ServerIP = amm-csr2
ServerPort = 5555
[ZeroMQClient]
......
......@@ -2,36 +2,83 @@ import pakbus
from table import Table
import os
from API import *
#from API import *
import json
import requests
import sys
import time
#import logging
import logging
import json
import datetime
#logging.basicConfig(filename="PyPakLogging.log", level=logging.DEBUG)
#log = logging.getLogger("DataLogger")
# create a sensor client
client = SensorClient()
#client = SensorClient()
class tblcache:
"""need to figure out a way to cache table defs"""
def __init__(self, filename='tablecache.json'):
if os.path.exists(filename):
with open(filename, 'r') as f:
self.cache = json.loads(f.read())
else:
self.cache = {}
def add(self, metric, id, tabledef):
"metric is the datalogger name and id is the pakbus id used to match loggers to our cache"
class DataLogger:
def __init__(self, number, socket, my_node_id, metric_id, pakbus_id):
self.number = number
#def __init__(self, number, socket, my_node_id, metric_id, pakbus_id):
def __init__(self, socket, my_node_id, metric_id, pakbus_id,
data_callback = None,
security_code = 0x1111):
#self.number = number
self.log = logging.getLogger(__name__)
self.socket = socket
self.my_node_id = my_node_id
self.metric_id = metric_id
self.pakbus_id = pakbus_id
self.FileData = pakbus.fileupload(self.socket, self.pakbus_id, self.my_node_id, FileName = '.TDF')[0]
self.security_code = security_code
self.data_callback = data_callback
self.log.debug("Getting serial number for {}@{}".format(metric_id, pakbus_id))
self.serno = pakbus.get_cr1000_serial(self.socket,
self.pakbus_id,
self.my_node_id,
self.security_code)
self.log.info("{}@{} has serial number {}".format(metric_id, pakbus_id, self.serno))
self.log.debug("Getting table defs")
self.FileData = pakbus.fileupload(self.socket,
self.pakbus_id,
self.my_node_id,
FileName = '.TDF',
SecurityCode = self.security_code)[0]
#self.log.debug("Filedata = {}".format(self.FileData))
self.tabledef = pakbus.parse_tabledef(self.FileData)
self.list_of_tables = self.getTables(3)
self.log.debug("tabledef = {}".format(self.tabledef))
print self.list_of_tables
self.list_of_tables = self.getTables(3)
self.last_collect = {"ts": None,
"NextRecNbr": None}
def __repr__(self):
return "Metric ID: " + str(self.metric_id) + "; PakBus ID: " + str(self.pakbus_id) + \
......@@ -42,6 +89,9 @@ class DataLogger:
count: maximum number of retries
'''
list_of_tables = []
self.log.debug('Getting tables for {}'.format( self.metric_id))
if self.FileData:
for tableno in range(len(self.tabledef)):
# ignore tables "Public" and "Status"
......@@ -52,13 +102,95 @@ class DataLogger:
self.FileData = pakbus.fileupload(self.socket, self.pakbus_id, self.my_node_id, FileName = '.TDF')[0]
return self.getTables(count-1)
else:
# raise exception??
print "Could not get tables for datalogger with metric: " + str(self.metric_id) + " and pakbus: " + str(self.pakbus_id) + " after 3 tries."
return list_of_tables
def emit_all(self, tbl, recs):
#self.data_callback(tbl.dic_of_sensorTags, rec)
#if self.data_callback:
#for rec in data[0]['RecFrag']:
#self.data_callback(tbl.dic_of_sensorTags, rec)
if self.data_callback:
for rec in recs:
timestamp = rec['TimeOfRec'][0] + 631166400
# emit record number
self.data_callback (timestamp, rec['RecNbr'], tbl.record_tag)
# and all fields
for sensor_name, values in rec['Fields'].iteritems():
self.data_callback(timestamp,
values[0],
tbl.dic_of_sensorTags[sensor_name].tag)
def collect_all(self):
for tbl in self.list_of_tables:
self.log.debug("Collecting all from {}.{}".format(self.metric_id, tbl.name))
# collect most recent record
data, more_flag = pakbus.collect_data(self.socket, self.pakbus_id,
self.my_node_id, self.tabledef,
tbl.name,
CollectMode = 0x03)
self.last_collect['ts'] = datetime.datetime.now()
# compute next record to collect
self.last_collect['NextRecNbr'] = data[0]['BegRecNbr'] + data[0]['NbrOfRecs']
self.emit_all(tbl, data[0]['RecFrag'])
#if self.data_callback:
#for rec in data[0]['RecFrag']:
#self.data_callback(self, rec)
if more_flag:
self.collect_increment()
def collect_increment(self):
for tbl in self.list_of_tables:
self.log.debug("Collecting increment from {}.{} NextRecNbr = {}".format(self.metric_id,
tbl.name,
self.last_collect['NextRecNbr']))
more_flag = True
while more_flag:
# collect from where we left off
data, more_flag = pakbus.collect_data(self.socket, self.pakbus_id,
self.my_node_id, self.tabledef,
tbl.name,
CollectMode = 0x04,
P1 = self.last_collect['NextRecNbr'])
self.last_collect['ts'] = datetime.datetime.now()
# compute next record to collect
self.last_collect['NextRecNbr'] = data[0]['BegRecNbr'] + data[0]['NbrOfRecs']
self.emit_all(tbl, data[0]['RecFrag'])
#if self.data_callback:
#for rec in data[0]['RecFrag']:
#self.data_callback(tbl.dic_of_sensorTags, rec)
def collect(self):
if self.last_collect['ts'] == None:
# initial collection, get all data
self.collect_all()
else:
# incremental collect
self.collect_increment()
def collectData(self, addition_size, num_dataloggers):
''' Collect data from datalogger up to "addition_size" number of records '''
raise Exception ("depreciated")
up_to_date = False
datalogger_size = len(self.list_of_tables)
start_record_numbers = [0]*datalogger_size
......@@ -82,7 +214,10 @@ class DataLogger:
for i in range(datalogger_size):
data = pakbus.collect_data(self.socket, self.pakbus_id, self.my_node_id, self.tabledef, self.list_of_tables[i].name, CollectMode = 0x05, P1 = 1)
data = pakbus.collect_data(self.socket, self.pakbus_id,
self.my_node_id, self.tabledef,
self.list_of_tables[i].name,
CollectMode = 0x05, P1 = 1)
start_time = time.time()
......@@ -123,7 +258,8 @@ class DataLogger:
# to avoid problem with "NaN"
try:
long(sensors[sensor][0])
data_to_add_to_database += [(time_stamp, sensors[sensor][0], self.list_of_tables[i].dic_of_sensorTags[sensor].tag)]
data_to_add_to_database += [(time_stamp, sensors[sensor][0],
self.list_of_tables[i].dic_of_sensorTags[sensor].tag)]
except:
print 'a data point was NaN or invalid'
# exclude data point
......
No preview for this file type
......@@ -21,6 +21,8 @@
import struct
import string
class PakBusException(Exception):
pass
#
# Global definitions
......@@ -312,7 +314,8 @@ def pkt_bye_cmd(DstNodeId, SrcNodeId):
#
# Create DevConfig Get Settings Command packet
#
def pkt_devconfig_get_settings_cmd(DstNodeId, SrcNodeId, BeginSettingId = None, EndSettingId = None, SecurityCode = 0x1111):
def pkt_devconfig_get_settings_cmd(DstNodeId, SrcNodeId, BeginSettingId = None,
EndSettingId = None, SecurityCode = 0x1111):
# DstNodeId: Destination node ID (12-bit int)
# SrcNodeId: Source node ID (12-bit int)
# BeginSettingId: First setting for the datalogger to include in response
......@@ -322,6 +325,7 @@ def pkt_devconfig_get_settings_cmd(DstNodeId, SrcNodeId, BeginSettingId = None,
TranNbr = newTranNbr() # Generate new transaction number
hdr = PakBus_hdr(DstNodeId, SrcNodeId, 0x0) # PakBus Control Packet
msg = encode_bin(['Byte', 'Byte'], [0x0f, TranNbr])
msg += encode_bin(['UInt2'], [SecurityCode])
if not BeginSettingId is None:
msg += encode_bin(['UInt2'], [BeginSettingId])
if not EndSettingId is None:
......@@ -1552,6 +1556,8 @@ def get_TableNbr(tabledef, TableName):
return TableNbr
################################################################################
#
# Network utilities
......@@ -1582,8 +1588,9 @@ def open_socket(Host, Port = 6785, Timeout = 30):
s.close()
s = None
continue
except Exception:
print 'Error here'
except Exception as x:
print 'Error opening socket:', x
s = None
continue
break
......@@ -1591,6 +1598,29 @@ def open_socket(Host, Port = 6785, Timeout = 30):
return s
#get devconfig
#def pkt_devconfig_get_settings_cmd(DstNodeId, SrcNodeId, BeginSettingId = None, EndSettingId = None, SecurityCode = 0x1111):
# DstNodeId: Destination node ID (12-bit int)
# SrcNodeId: Source node ID (12-bit int)
# BeginSettingId: First setting for the datalogger to include in response
# EndSettingId: Last setting for the datalogger to include in response
# SecurityCode: 16-bit security code (optional)
def get_cr1000_serial(s, DstNodeId, SrcNodeId, SecurityCode = 0x1111):
pkt, TranNbr = pkt_devconfig_get_settings_cmd(DstNodeId,
SrcNodeId,
1, #1 = serial number
1, #1 = serial number
SecurityCode)
send(s, pkt)
hdr, msg = wait_pkt(s, DstNodeId, SrcNodeId, TranNbr)
if msg['Outcome'] == 1:
# success
return decode_bin(['UInt4'], msg['Settings'][0]['SettingValue'])[0][0]
else:
raise PakBusException("Failed to get serial, outcome was: {}".format(msg['Outcome']))
#
# Check if remote host is available
#
......
No preview for this file type
# code to ring a node using link-level messaging
# setup logging FIRST.
import logging
LOGFMT = '%(asctime)s %(name)-30s %(levelname)-8s %(message).160s'
logging.basicConfig(level = logging.DEBUG,
format = LOGFMT)
import optparse
import ConfigParser, StringIO
from datalogger import DataLogger
#import logging
import pakbus
import time
import json
import socket
from API import SensorClient
import threading
from threading import Event
from Queue import Queue
from sensorTag import SensorTag
class DatabaseAL (threading.Thread):
"database abstraction layer"
def __init__(self):
threading.Thread.__init__(self)
self.name = "DatabaseAL worker thread"
self.setDaemon(True)
self.log = logging.getLogger('DatabaseAL')
self.client = SensorClient()
self.dataq = Queue(maxsize = 1024*1024)
self.shutdown_evt = Event()
def append(self, timestamp, values, tags):
"append data to the db"
#self.log.debug("Got data: {} {} {}".format(timestamp, values, tags))
# append a tuple
if type(tags) is SensorTag:
print tags
if self.dataq.full():
self.log.error("DataQ is full! Data is lost!")
else:
self.dataq.put( (timestamp, values, tags) )
def run(self):
while not self.shutdown_evt.wait(5):
if self.shutdown_evt.is_set():
break
if self.dataq.empty():
continue
worklist = []
while not self.dataq.empty():
worklist.append(self.dataq.get())
self.log.info("Putting a batch of {} samples".format(len(worklist)))
# now batch processes the worklist
if len(worklist) > 0:
try:
i = self.client.multiplePut(worklist)
self.log.info("Pushed {} samples to OpenTSDB.".format(i))
except Exception as x:
self.log.error("Push failed, error ={}; worklist = {}".format(x, worklist))
self.log.warn("Shutdown")
def stop(self):
self.shutdown_evt.set()
self.join(5)
def main():
# Start logging
......@@ -24,8 +91,14 @@ def main():
# Read configuration file
cf = ConfigParser.SafeConfigParser()
print 'configuration read from %s' % cf.read(options.config)
logging.info('configuration read from {}'.format(cf.read(options.config)))
if cf.has_option('pakbus', 'logfile'):
fh = logging.FileHandler(cf.get('pakbus', 'logfile'))
fmt = logging.Formatter(LOGFMT)
fh.setFormatter(fmt)
logging.getLogger('').addHandler(fh)
# get host and port
host = cf.get('pakbus', 'host')
port = cf.get('pakbus', 'port')
......@@ -39,8 +112,6 @@ def main():
# get addition_size
addition_size = int(cf.get('pakbus', 'addition_size'), base = 0)
# open socket
socket = pakbus.open_socket(host, port, timeout)
# get time frequency to update database
database_update_time_gap = int(cf.get('pakbus', 'database_update_time_gap'), base = 0)
......@@ -49,54 +120,109 @@ def main():
connection_retry_interval = int(cf.get('pakbus', 'connection_retry_interval'), base = 0)
dataloggers = json.loads(cf.get('pakbus', 'datalogger_pakbus_id'))
sec_codes = json.loads(cf.get('pakbus', 'datalogger_security_code'))
# get pakbus_id and metric_id information for dataloggers
pakbus_ids = cf.get('pakbus', 'pakbus_ids').split(',')
## print "pakbus_ids: " + str(pakbus_ids)
metric_ids = cf.get('pakbus', 'metric_ids').split(',')
## print "metric_ids: " + str(metric_ids)
#if (len(pakbus_ids) != len(metric_ids)):
#logging.error("'pakbus_ids' and 'metric_ids' need to have the same number of elements")
#logging.info("Finished reading config file. Now making datalogger instances...")
# create list datalogger instances
print 'MAKING DATALOGGERS'
datalogger_list = []
for i in range(len(pakbus_ids)):
print metric_ids[i]
tmp = [DataLogger(i, socket, my_node_id, metric_ids[i], int(pakbus_ids[i], base = 0))]
print len(tmp)
datalogger_list += [DataLogger(i, socket, my_node_id, metric_ids[i], int(pakbus_ids[i], base = 0))]
#logging.info("Finished making datalogger instances. Now collecting data...")
print 'COLLECTING DATA'
up_to_date = False;
count = 0
while True:
if up_to_date:
print "UP TO DATE; now waiting"
time.sleep(database_update_time_gap)
try:
if (count == connection_retry_interval):
datalogger_list = []
for i in range(len(pakbus_ids)):
## print metric_ids[i]
datalogger_list += [DataLogger(i, socket, my_node_id, metric_ids[i], int(pakbus_ids[i], base = 0))]
count = 0
up_to_date = True
for i in range(len(datalogger_list)):
print datalogger_list[i].metric_id
up_to_date = datalogger_list[i].collectData(addition_size, len(datalogger_list)) and up_to_date
except:
continue
count += 1
#pakbus_ids = cf.get('pakbus', 'pakbus_ids').split(',')
#print "pakbus_ids: " + str(pakbus_ids)
#metric_ids = cf.get('pakbus', 'metric_ids').split(',')
#print "metric_ids: " + str(metric_ids)
#exit()
logging.debug('Using dataloggers = {}'.format( dataloggers))
logging.getLogger("SensorAPI_API").setLevel(logging.INFO)
logging.getLogger("ZeroMQLayer.ZeroMQClient").setLevel(logging.DEBUG)
try:
db = None
db = DatabaseAL()
db.start()
while True:
try:
logging.info('MAKING CONNECTION')
# open socket
skt = pakbus.open_socket(host, port, timeout)
#if (len(pakbus_ids) != len(metric_ids)):
#logging.error("'pakbus_ids' and 'metric_ids' need to have the same number of elements")
#logging.info("Finished reading config file. Now making datalogger instances...")
# create list datalogger instances
logging.info( 'MAKING DATALOGGERS')
#datalogger_list = []
#for i in range(len(pakbus_ids)):
# print metric_ids[i]
# tmp = [DataLogger(i, socket, my_node_id, metric_ids[i], int(pakbus_ids[i], base = 0))]
# print len(tmp)
# datalogger_list += [DataLogger(i, socket, my_node_id, metric_ids[i], int(pakbus_ids[i], base = 0))]
#replace the pakbus address with an instance of datalogger
for metric, address in dataloggers.iteritems():
dataloggers[metric] = DataLogger(skt,
my_node_id,
metric,
address,
db.append,
sec_codes[metric],
)
#logging.info("Finished making datalogger instances. Now collecting data...")
logging.info( 'COLLECTING DATA')
while True:
for metric, dl in dataloggers.iteritems():
dl.collect()
logging.warn("all done, exiting")
exit();
time.sleep(database_update_time_gap)
except socket.error as msg:
logging.error("Socket died with: {}".format(msg))
skt.close()
skt = None
if 0:
up_to_date = False;
count = 0
while True:
if up_to_date:
print "UP TO DATE; now waiting"
time.sleep(database_update_time_gap)
try:
if (count == connection_retry_interval):
datalogger_list = []
for i in range(len(pakbus_ids)):
## print metric_ids[i]
datalogger_list += [DataLogger(i, socket, my_node_id, metric_ids[i], int(pakbus_ids[i], base = 0))]
count = 0
up_to_date = True
for i in range(len(datalogger_list)):
print datalogger_list[i].metric_id
up_to_date = datalogger_list[i].collectData(addition_size, len(datalogger_list)) and up_to_date
except:
continue
count += 1
finally:
if db:
db.stop()
# want addition_size to be greater than the number of records taken during database_update_time_gap
if __name__ == "__main__":
main()
from Client import *
#import logging
import logging
#logging.basicConfig(filename="PyPakLogging.log", level=logging.DEBUG)
##import pickle
......@@ -10,13 +10,15 @@ class SensorTag:
list_of_all_tags = []
def __init__(self, name, units, processing, table_name):
self.log = logging.getLogger(__name__)
self.name = name
self.units = units
self.processing = processing
self.tag = self.makeTag(table_name)
SensorTag.list_of_all_tags += [self.tag]
## pickle.dump(SensorTag.list_of_all_tags, open("allTags.txt", "w"))
print "Made tag for sensors with Table: " + str(table_name) + " for sensor: " + str(self.name)
self.log.info("Made tag for sensors with Table: {} for sensor {}".format(table_name,
self.name))
def __repr__(self):
return "Sensor Name: " + str(self.name) + "; Sensor Units: " + str(self.units) + \
......
No preview for this file type
......@@ -7,10 +7,10 @@ import requests
import sys
import logging
logging.basicConfig(filename="PyPakLogging.log", level=logging.DEBUG)
#logging.basicConfig(filename="PyPakLogging.log", level=logging.DEBUG)
# create a sensor client
client = SensorClient()
#client = SensorClient()
import string
def make_follow_tag_rules(s):
......@@ -28,6 +28,7 @@ class Table: