Commit 4f2aa7b0 authored by Alan Marchiori's avatar Alan Marchiori
Browse files

working on threaded socket io"

parent df77b337
Loading
Loading
Loading
Loading
(7 KiB)

File changed.

No diff preview for this file type.

+33 −121
Original line number Original line Diff line number Diff line
@@ -71,17 +71,17 @@ if not vars().has_key('transact'):




#
#
# Send packet over PakBus
# Calculate signature for PakBus packets
#
# - add signature nullifier
# - quote \xBC and \xBD characters
# - frame packet with \xBD characters
#
#
def send(s, pkt):
def calcSigFor(buff, seed = 0xAAAA):
    # s: socket object
    sig = seed
    # pkt: unquoted, unframed PakBus packet (just header + message)
    for x in buff:
    frame = quote(pkt + calcSigNullifier(calcSigFor(pkt)))
        x = ord(x)
    s.send('\xBD' + frame + '\xBD')
        j = sig
        sig = (sig <<1) & 0x1FF
        if sig >= 0x100: sig += 1
        sig = ((((sig + (j >>8) + x) & 0xFF) | (j <<8))) & 0xFFFF
    return sig




#
#
@@ -156,47 +156,9 @@ def PakBus_hdr(DstNodeId, SrcNodeId, HiProtoCode = 0x1, ExpMoreCode = 0x0, LinkS
#
#
################################################################################
################################################################################


#
# Calculate signature for PakBus packets
#
def calcSigFor(buff, seed = 0xAAAA):
    sig = seed
    for x in buff:
        x = ord(x)
        j = sig
        sig = (sig <<1) & 0x1FF
        if sig >= 0x100: sig += 1
        sig = ((((sig + (j >>8) + x) & 0xFF) | (j <<8))) & 0xFFFF
    return sig


#
# Calculate signature nullifier needed to create valid PakBus packets
#
def calcSigNullifier(sig):
    nulb = nullif = ''
    for i in 1,2:
        sig = calcSigFor(nulb, sig)
        sig2 = (sig<<1) & 0x1FF
        if sig2 >= 0x100: sig2 += 1
        nulb = chr((0x100 - (sig2 + (sig >>8))) & 0xFF)
        nullif += nulb
    return nullif


#
# Quote PakBus packet
#
def quote(pkt):
    pkt = string.replace(pkt, '\xBC', '\xBC\xDC') # quote \xBC characters
    pkt = string.replace(pkt, '\xBD', '\xBC\xDD') # quote \xBD characters
    return pkt


#
# Unquote PakBus packet
#
def unquote(pkt):
    pkt = string.replace(pkt, '\xBC\xDD', '\xBD') # unquote \xBD characters
    pkt = string.replace(pkt, '\xBC\xDC', '\xBC') # unquote \xBC characters
    return pkt




################################################################################
################################################################################
@@ -1056,7 +1018,9 @@ def decode_pkt(pkt):
    # pkt: buffer containing unquoted packet, signature nullifier stripped
    # pkt: buffer containing unquoted packet, signature nullifier stripped


    # Initialize output variables
    # Initialize output variables
    hdr = {'LinkState': None, 'DstPhyAddr': None, 'ExpMoreCode': None, 'Priority': None, 'SrcPhyAddr': None, 'HiProtoCode': None, 'DstNodeId': None, 'HopCnt': None, 'SrcNodeId': None}
    hdr = {'LinkState': None, 'DstPhyAddr': None, 'ExpMoreCode': None, 
           'Priority': None, 'SrcPhyAddr': None, 'HiProtoCode': None, 
           'DstNodeId': None, 'HopCnt': None, 'SrcNodeId': None}
    msg = {'MsgType': None, 'TranNbr': None, 'raw': None}
    msg = {'MsgType': None, 'TranNbr': None, 'raw': None}


    try:
    try:
@@ -1250,9 +1214,9 @@ def clock_sync(s, DstNodeId, SrcNodeId, SecurityCode = 0x1111, min_adjust = 0.1,
    for j in range(10):
    for j in range(10):
        pkt, TranNbr = pkt_clock_cmd(DstNodeId, SrcNodeId)
        pkt, TranNbr = pkt_clock_cmd(DstNodeId, SrcNodeId)
        t1 = time.time() # timestamp directly before sending clock command
        t1 = time.time() # timestamp directly before sending clock command
        send(s, pkt)
        s.send(pkt)
        reftime = time.time() # reference time (UTC)
        reftime = time.time() # reference time (UTC)
        hdr, msg = wait_pkt(s, DstNodeId, SrcNodeId, TranNbr)
        hdr, msg = s.wait_pkt(DstNodeId, SrcNodeId, TranNbr)
        t2 = time.time() # timestamp directly after receiving clock response
        t2 = time.time() # timestamp directly after receiving clock response


        # Calculate time difference
        # Calculate time difference
@@ -1281,8 +1245,8 @@ def clock_sync(s, DstNodeId, SrcNodeId, SecurityCode = 0x1111, min_adjust = 0.1,
            # Adjust clock
            # Adjust clock
            adjust = max(min(-tdiff, max_adjust), -max_adjust)
            adjust = max(min(-tdiff, max_adjust), -max_adjust)
            pkt, TranNbr = pkt_clock_cmd(DstNodeId, SrcNodeId, time_to_nsec(adjust, epoch = 0))
            pkt, TranNbr = pkt_clock_cmd(DstNodeId, SrcNodeId, time_to_nsec(adjust, epoch = 0))
            send(s, pkt)
            s.send(pkt)
            hdr, msg = wait_pkt(s, DstNodeId, SrcNodeId, TranNbr)
            hdr, msg = s.wait_pkt(DstNodeId, SrcNodeId, TranNbr)
        else:
        else:
            adjust = 0
            adjust = 0


@@ -1301,60 +1265,7 @@ def clock_sync(s, DstNodeId, SrcNodeId, SecurityCode = 0x1111, min_adjust = 0.1,
#
#
################################################################################
################################################################################


#
# Wait for an incoming packet
#
### added here: changed timeout from 5 to 10
def wait_pkt(s, SrcNodeId, DstNodeId, TranNbr, timeout = 10):
    # s:            socket object
    # SrcNodeId:    source node ID (12-bit int)
    # DstNodeId:    destination node ID (12-bit int)
    # TranNbr:      expected transaction number
    # timeout:      timeout in seconds

    import time, socket
    max_time = time.time() + 0.9 * timeout


    # remember current timeout setting
    s_timeout = s.gettimeout()

    # Loop until timeout is reached
    while time.time() < max_time:
        s.settimeout(timeout)
        try:
            rcv = recv(s)
        except socket.timeout:
            rcv = ''
        hdr, msg = decode_pkt(rcv)

        # ignore packets that are not for us
        if hdr['DstNodeId'] != DstNodeId or hdr['SrcNodeId'] != SrcNodeId:
            continue

        # Respond to incoming hello command packets
        if msg['MsgType'] == 0x09:
            pkt = pkt_hello_response(hdr['SrcNodeId'], hdr['DstNodeId'], msg['TranNbr'])
            send(s, pkt)
            continue

        # Handle "please wait" packets
        if msg['TranNbr'] == TranNbr and msg['MsgType'] == 0xa1:
            timeout = msg['WaitSec']
            max_time += timeout
            continue

        # this should be the packet we are waiting for
        if msg['TranNbr'] == TranNbr:
            break

    else:
        hdr = {}
        msg = {}

    # restore previous timeout setting
    s.settimeout(s_timeout)

    return hdr, msg




#
#
@@ -1384,8 +1295,8 @@ def filedownload(s, DstNodeId, SrcNodeId, FileName, FileData, SecurityCode = 0x1


        # Download Swath bytes after FileOffset from FileData
        # Download Swath bytes after FileOffset from FileData
        pkt, TranNbr = pkt_filedownload_cmd(DstNodeId, SrcNodeId, FileName, FileData[FileOffset:FileOffset+Swath], FileOffset = FileOffset, TranNbr = TranNbr, CloseFlag = CloseFlag)
        pkt, TranNbr = pkt_filedownload_cmd(DstNodeId, SrcNodeId, FileName, FileData[FileOffset:FileOffset+Swath], FileOffset = FileOffset, TranNbr = TranNbr, CloseFlag = CloseFlag)
        send(s, pkt)
        s.send(pkt)
        hdr, msg = wait_pkt(s, DstNodeId, SrcNodeId, TranNbr)
        hdr, msg = s.wait_pkt(DstNodeId, SrcNodeId, TranNbr)


        try:
        try:
            RespCode = msg['RespCode']
            RespCode = msg['RespCode']
@@ -1421,8 +1332,8 @@ def fileupload(s, DstNodeId, SrcNodeId, FileName, SecurityCode = 0x1111):


        # Upload chunk from file starting at FileOffset
        # Upload chunk from file starting at FileOffset
        pkt, TranNbr = pkt_fileupload_cmd(DstNodeId, SrcNodeId, FileName, FileOffset = FileOffset, TranNbr = TranNbr, CloseFlag = 0x00)
        pkt, TranNbr = pkt_fileupload_cmd(DstNodeId, SrcNodeId, FileName, FileOffset = FileOffset, TranNbr = TranNbr, CloseFlag = 0x00)
        send(s, pkt)
        s.send(pkt)
        hdr, msg = wait_pkt(s, DstNodeId, SrcNodeId, TranNbr)
        hdr, msg = s.wait_pkt(DstNodeId, SrcNodeId, TranNbr)


        #print 'msg = ', msg, '\n'
        #print 'msg = ', msg, '\n'


@@ -1459,8 +1370,8 @@ def getvalues(s, DstNodeId, SrcNodeId, TableName, Type, FieldName, Swath = 1, Se
##        i = 0
##        i = 0
##        while i < 100:
##        while i < 100:
        pkt, TranNbr = pkt_getvalues_cmd(DstNodeId, SrcNodeId, TableName, Type, FieldName, Swath, SecurityCode)
        pkt, TranNbr = pkt_getvalues_cmd(DstNodeId, SrcNodeId, TableName, Type, FieldName, Swath, SecurityCode)
        send(s, pkt)
        s.send(pkt)
        hdr, msg = wait_pkt(s, DstNodeId, SrcNodeId, TranNbr)
        hdr, msg = s.wait_pkt(DstNodeId, SrcNodeId, TranNbr)
        #print 'hdr = ', hdr
        #print 'hdr = ', hdr
        #print 'msg = ', msg
        #print 'msg = ', msg
        #print '\n'
        #print '\n'
@@ -1523,8 +1434,8 @@ def collect_data(s, DstNodeId, SrcNodeId, TableDef, TableName, FieldNames = [],


    # Send collect data request
    # Send collect data request
    pkt, TranNbr = pkt_collectdata_cmd(DstNodeId, SrcNodeId, tablenbr, tabledefsig, FieldNbr = fieldnbr, CollectMode = CollectMode, P1 = P1, P2 = P2, SecurityCode = SecurityCode)
    pkt, TranNbr = pkt_collectdata_cmd(DstNodeId, SrcNodeId, tablenbr, tabledefsig, FieldNbr = fieldnbr, CollectMode = CollectMode, P1 = P1, P2 = P2, SecurityCode = SecurityCode)
    send(s, pkt)
    s.send(pkt)
    hdr, msg = wait_pkt(s, DstNodeId, SrcNodeId, TranNbr)
    hdr, msg = s.wait_pkt(DstNodeId, SrcNodeId, TranNbr)
    try:
    try:
        RecData, MoreRecsExist = parse_collectdata(msg['RecData'], TableDef, FieldNbr = fieldnbr)
        RecData, MoreRecsExist = parse_collectdata(msg['RecData'], TableDef, FieldNbr = fieldnbr)
    except:
    except:
@@ -1612,15 +1523,16 @@ def get_cr1000_serial(s, DstNodeId, SrcNodeId, SecurityCode = 0x1111):
                                                  1, #1 = serial number
                                                  1, #1 = serial number
                                                  1, #1 = serial number
                                                  1, #1 = serial number
                                                  SecurityCode)
                                                  SecurityCode)
    send(s, pkt)
    s.send(pkt)
    hdr, msg = wait_pkt(s, DstNodeId, SrcNodeId, TranNbr)
    hdr, msg = s.wait_pkt(DstNodeId, SrcNodeId, TranNbr)
    
    
    if msg['Outcome'] == 1:
    if 'Outcome' in msg and msg['Outcome'] == 1:
        # success
        # success
        return decode_bin(['UInt4'], msg['Settings'][0]['SettingValue'])[0][0]
        return decode_bin(['UInt4'], msg['Settings'][0]['SettingValue'])[0][0]
    else:
    elif 'Outcome' in msg:
        raise PakBusException("Failed to get serial, outcome was: {}".format(msg['Outcome']))
        raise PakBusException("Failed to get serial, outcome was: {}".format(msg['Outcome']))
        
    else:
        raise PakBusException("Failed to get serial number.")
#
#
# Check if remote host is available
# Check if remote host is available
#
#
@@ -1631,7 +1543,7 @@ def ping_node(s, DstNodeId, SrcNodeId):


    # send hello command and wait for response packet
    # send hello command and wait for response packet
    pkt, TranNbr = pkt_hello_cmd(DstNodeId, SrcNodeId)
    pkt, TranNbr = pkt_hello_cmd(DstNodeId, SrcNodeId)
    send(s, pkt)
    s.send(pkt)
    hdr, msg = wait_pkt(s, DstNodeId, SrcNodeId, TranNbr)
    hdr, msg = s.wait_pkt(DstNodeId, SrcNodeId, TranNbr)


    return msg
    return msg
+13 −7
Original line number Original line Diff line number Diff line
@@ -14,6 +14,7 @@ import pakbus
import time
import time
import json
import json
import socket
import socket
import paksock


from API import SensorClient
from API import SensorClient


@@ -149,9 +150,12 @@ def main():
                # open socket
                # open socket
                skt = None
                skt = None
                while skt == None:
                while skt == None:
                    skt = pakbus.open_socket(host, port, timeout)
                    #skt = pakbus.open_socket(host, port, timeout)
                    skt = paksock.PakSock(host, port, timeout)
                    if skt == None:
                    if skt == None:
                        logging.error("Failed to open socket, retry")
                        logging.error("Failed to open socket, retry")
                    logging.info(" ... waiting for connect")
                    skt.have_socket_evt.wait()
                
                
                    
                    
                #if (len(pakbus_ids) != len(metric_ids)):
                #if (len(pakbus_ids) != len(metric_ids)):
@@ -193,16 +197,18 @@ def main():
                    #exit();
                    #exit();
                    
                    
                    #shutdown connection
                    #shutdown connection
                    skt.shutdown(socket.SHUT_RDWR)
                    #skt.shutdown(socket.SHUT_RDWR)
                    skt.close()
                    #skt.close()
                    skt = None                    
                    #skt = None
                    skt.shutdown()                    
                    time.sleep(database_update_time_gap)
                    time.sleep(database_update_time_gap)
                    
                    
                    # reopen socket                
                    # reopen socket                
                    while skt == None:
                    while not skt.have_socket:
                        skt = pakbus.open_socket(host, port, timeout)
                        skt.open()
                        if skt == None:
                        if not skt.have_socket:
                            logging.error("Failed to reopen socket, retry")
                            logging.error("Failed to reopen socket, retry")
                            time.sleep(5)
                    
                    
            except socket.error as msg:
            except socket.error as msg:
                logging.error("Socket died with: {}".format(msg))
                logging.error("Socket died with: {}".format(msg))
+1 −1
Original line number Original line Diff line number Diff line
Table
CR800_2
 No newline at end of file
 No newline at end of file