ring.py 8.89 KB
Newer Older
Gilbert Kim's avatar
Gilbert Kim committed
1 2
# code to ring a node using link-level messaging

Alan Marchiori's avatar
Alan Marchiori committed
3 4 5 6 7 8
# setup logging FIRST.
import logging
LOGFMT = '%(asctime)s %(name)-30s %(levelname)-8s %(message).160s'
logging.basicConfig(level = logging.DEBUG,
                    format = LOGFMT)

Gilbert Kim's avatar
Gilbert Kim committed
9 10 11 12 13 14
import optparse
import ConfigParser, StringIO
from datalogger import DataLogger
#import logging
import pakbus
import time
Alan Marchiori's avatar
Alan Marchiori committed
15 16 17 18 19 20 21 22 23
import json
import socket

from API import SensorClient

import threading
from threading import Event

from Queue import Queue
Gilbert Kim's avatar
Gilbert Kim committed
24

Alan Marchiori's avatar
Alan Marchiori committed
25
from sensorTag import SensorTag
Gilbert Kim's avatar
Gilbert Kim committed
26

Alan Marchiori's avatar
Alan Marchiori committed
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
class DatabaseAL (threading.Thread):
    "database abstraction layer"
    
    def __init__(self):
        threading.Thread.__init__(self)
        self.name = "DatabaseAL worker thread"
        self.setDaemon(True)
        self.log = logging.getLogger('DatabaseAL')
        self.client = SensorClient()
        self.dataq = Queue(maxsize = 1024*1024)
        self.shutdown_evt = Event()
        
    def append(self, timestamp, values, tags):
        "append data to the db"
        #self.log.debug("Got data: {} {} {}".format(timestamp, values, tags))
        # append a tuple
        if type(tags) is SensorTag:
            print tags
            
        if self.dataq.full():
            self.log.error("DataQ is full! Data is lost!")
        else:
            self.dataq.put( (timestamp, values, tags) )
    
    def run(self):
        while not self.shutdown_evt.wait(5):
            if self.shutdown_evt.is_set():
                break
            if self.dataq.empty():
                continue
            
            worklist = []            
            while not self.dataq.empty():
                worklist.append(self.dataq.get())
            
            self.log.info("Putting a batch of {} samples".format(len(worklist)))            
            # now batch processes the worklist
            
            if len(worklist) > 0:
                try:
                    i = self.client.multiplePut(worklist)
                    self.log.info("Pushed {} samples to OpenTSDB.".format(i))
                except Exception as x:                    
                    self.log.error("Push failed, error ={}; worklist = {}".format(x, worklist))
                       
        self.log.warn("Shutdown")
    
    def stop(self):
        self.shutdown_evt.set()
        self.join(5)
        
Gilbert Kim's avatar
Gilbert Kim committed
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
def main():

    # Start logging
    #logging.basicConfig(filename="PyPakLogging.log", level=logging.DEBUG)
    #logging.info("Reading config file...")

    # Parse command line arguments
    parser = optparse.OptionParser()
    parser.add_option('-c', '--config', 
                      help = 'read configuration from FILE [default: %default]', 
                      metavar = 'FILE', 
                      default = 'campbell.conf')
    (options, args) = parser.parse_args()

    # Read configuration file
    cf = ConfigParser.SafeConfigParser()
Alan Marchiori's avatar
Alan Marchiori committed
94
    logging.info('configuration read from {}'.format(cf.read(options.config)))
Gilbert Kim's avatar
Gilbert Kim committed
95

Alan Marchiori's avatar
Alan Marchiori committed
96 97 98 99 100 101
    if cf.has_option('pakbus', 'logfile'):
        fh = logging.FileHandler(cf.get('pakbus', 'logfile'))
        fmt = logging.Formatter(LOGFMT)
        fh.setFormatter(fmt)
        logging.getLogger('').addHandler(fh)
        
Gilbert Kim's avatar
Gilbert Kim committed
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
    # get host and port
    host = cf.get('pakbus', 'host')
    port = cf.get('pakbus', 'port')

    # get timeout
    timeout = cf.getint('pakbus', 'timeout')
    
    # get my_node_id
    my_node_id = int(cf.get('pakbus', 'my_node_id'), base = 0)

    # get addition_size
    addition_size = int(cf.get('pakbus', 'addition_size'), base = 0)


    # get time frequency to update database
    database_update_time_gap = int(cf.get('pakbus', 'database_update_time_gap'), base = 0)

    # get time frequency to update database connections
    connection_retry_interval = int(cf.get('pakbus', 'connection_retry_interval'), base = 0)

122
    localdb = cf.get('pakbus', 'localdb')
Gilbert Kim's avatar
Gilbert Kim committed
123

Alan Marchiori's avatar
Alan Marchiori committed
124 125 126
    dataloggers = json.loads(cf.get('pakbus', 'datalogger_pakbus_id'))
    sec_codes = json.loads(cf.get('pakbus', 'datalogger_security_code'))
    
Gilbert Kim's avatar
Gilbert Kim committed
127
    # get pakbus_id and metric_id information for dataloggers
Alan Marchiori's avatar
Alan Marchiori committed
128 129 130 131 132 133 134
    #pakbus_ids = cf.get('pakbus', 'pakbus_ids').split(',')
    #print "pakbus_ids: " + str(pakbus_ids)
    #metric_ids = cf.get('pakbus', 'metric_ids').split(',')
    #print "metric_ids: " + str(metric_ids)
    
    #exit()
    logging.debug('Using dataloggers = {}'.format( dataloggers))
Gilbert Kim's avatar
Gilbert Kim committed
135

Alan Marchiori's avatar
Alan Marchiori committed
136 137 138 139 140 141 142 143 144 145 146 147 148 149
    logging.getLogger("SensorAPI_API").setLevel(logging.INFO)

    logging.getLogger("ZeroMQLayer.ZeroMQClient").setLevel(logging.DEBUG)    
    try:
        db = None
        db = DatabaseAL()
        db.start()
        
        while True:
            try:
                
                logging.info('MAKING CONNECTION')
                
                # open socket
150 151 152 153 154 155
                skt = None
                while skt == None:
                    skt = pakbus.open_socket(host, port, timeout)
                    if skt == None:
                        self.log.error("Failed to open socket, retry")
                
Alan Marchiori's avatar
Alan Marchiori committed
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
                    
                #if (len(pakbus_ids) != len(metric_ids)):
                    #logging.error("'pakbus_ids' and 'metric_ids' need to have the same number of elements")
            
                #logging.info("Finished reading config file. Now making datalogger instances...")
            
                # create list datalogger instances
                logging.info( 'MAKING DATALOGGERS')
            
                #datalogger_list = []
                #for i in range(len(pakbus_ids)):
                #    print metric_ids[i]
                #    tmp = [DataLogger(i, socket, my_node_id, metric_ids[i], int(pakbus_ids[i], base = 0))]
                #    print len(tmp)
                #    datalogger_list += [DataLogger(i, socket, my_node_id, metric_ids[i], int(pakbus_ids[i], base = 0))]
            
                #replace the pakbus address with an instance of datalogger
                for metric, address in dataloggers.iteritems():
174 175
                    dataloggers[metric] = DataLogger(localdb,
                                                     skt, 
Alan Marchiori's avatar
Alan Marchiori committed
176 177 178 179 180 181 182 183 184 185 186 187 188 189
                                                     my_node_id,
                                                     metric,
                                                     address,
                                                     db.append,
                                                     sec_codes[metric],
                                                     )
            
                #logging.info("Finished making datalogger instances. Now collecting data...")
            
                logging.info( 'COLLECTING DATA')
                    
                while True:
                        
                    for metric, dl in dataloggers.iteritems():                                    
190
                        dl.collect(skt)
Alan Marchiori's avatar
Alan Marchiori committed
191
                    
Alan Marchiori's avatar
Alan Marchiori committed
192 193
                    #logging.warn("all done, exiting")    
                    #exit();
Alan Marchiori's avatar
Alan Marchiori committed
194
                    
195 196
                    #shutdown connection
                    skt.shutdown(socket.SHUT_RDWR)
197 198
                    skt.close()
                    skt = None                    
Alan Marchiori's avatar
Alan Marchiori committed
199
                    time.sleep(database_update_time_gap)
200
                    
201 202 203 204 205
                    # reopen socket                
                    while skt == None:
                        skt = pakbus.open_socket(host, port, timeout)
                        if skt == None:
                            self.log.error("Failed to reopen socket, retry")
206
                    
Alan Marchiori's avatar
Alan Marchiori committed
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
            except socket.error as msg:
                logging.error("Socket died with: {}".format(msg))
                skt.close()
                skt = None
                            
                if 0:
                    
                    up_to_date = False;
                    count = 0
                    while True:
                        if up_to_date:
                            print "UP TO DATE; now waiting"
                            time.sleep(database_update_time_gap)
                        try:
                            if (count == connection_retry_interval):
                                datalogger_list = []
                                for i in range(len(pakbus_ids)):
                ##                    print metric_ids[i]
                                    datalogger_list += [DataLogger(i, socket, my_node_id, metric_ids[i], int(pakbus_ids[i], base = 0))]
                                count = 0
                            up_to_date = True
                            for i in range(len(datalogger_list)):
                                print datalogger_list[i].metric_id
                                up_to_date = datalogger_list[i].collectData(addition_size, len(datalogger_list)) and up_to_date
                        except:
                            continue
                        count += 1
    finally:
        if db:
            db.stop()
Gilbert Kim's avatar
Gilbert Kim committed
237 238 239 240 241 242
            
# want addition_size to be greater than the number of records taken during database_update_time_gap

        
    
if __name__ == "__main__":
Alan Marchiori's avatar
Alan Marchiori committed
243
    
Gilbert Kim's avatar
Gilbert Kim committed
244 245
    main()