Commit 020219d2 authored by Alan Marchiori's avatar Alan Marchiori

threading and working

parent 479d85fe
......@@ -91,7 +91,7 @@ class PakSock(threading.Thread):
"""
def __init__(self, host, port, timeout):
def __init__(self, host, port, timeout, my_node_id):
threading.Thread.__init__(self)
self.name = "Pakbus background comm. thread"
self.setDaemon(True)
......@@ -99,6 +99,7 @@ class PakSock(threading.Thread):
self.port = port
self.tx_queue = multiprocessing.Queue()
self.rx_queue = multiprocessing.Queue()
self.my_node_id = my_node_id
self.have_socket_evt = multiprocessing.Event()
self.shutdown_evt = multiprocessing.Event()
......@@ -112,7 +113,7 @@ class PakSock(threading.Thread):
def onPacket(self, pkt):
if calcSigFor(pkt):
self.log.warn(" <- Rx [{:3}] with bad signature: {}".format(len(pkt), hexdump(pkt)))
else:
else:
self.log.debug(" Rx [{:3}]: {}".format(len(pkt), hexdump(pkt)))
#self.rx_queue.put(pkt[:-2]) # trim signature off
......@@ -120,7 +121,14 @@ class PakSock(threading.Thread):
self.log.debug(" Head: {}".format(hdr))
self.log.debug(" Msg: {}".format(msg))
self.rx_queue.put( (hdr, msg) )
# Respond to incoming hello command packets
if msg['MsgType'] == 0x09 and hdr['DstNodeId'] == self.my_node_id:
self.log.info(" Responding to HELLO")
pkt = pakbus.pkt_hello_response(hdr['SrcNodeId'], self.my_node_id, msg['TranNbr'])
self.send(pkt)
else:
self.rx_queue.put( (hdr, msg) )
def run(self):
self.shutdown_evt.clear()
......@@ -128,7 +136,7 @@ class PakSock(threading.Thread):
prd = PakReader(self.onPacket)
while not self.shutdown_evt.is_set():
self.have_socket_evt.clear()
s = None
# get a socket.
for res in socket.getaddrinfo(self.host,
......@@ -166,11 +174,12 @@ class PakSock(threading.Thread):
if s in rlist:
# receive
b = s.recv(4096)
b = s.recv(2048)
prd.recv(b)
except socket.error as msg:
logging.error("Socket died with: {}".format(msg))
break
self.shutdown_complete_evt.set()
self.log.warn("thread shutdown")
......@@ -204,6 +213,7 @@ class PakSock(threading.Thread):
# pkt: unquoted, unframed PakBus packet (just header + message)
frame = quote(pkt + calcSigNullifier(calcSigFor(pkt)))
self.tx_queue.put('\xBD' + frame + '\xBD')
def shutdown(self):
#self.shutdown_evt.set()
......@@ -221,7 +231,8 @@ class PakSock(threading.Thread):
af, socktype, proto, canonname, sa = self.res
self.log.info("Opening pakbus at tcp://{}:{}".format(sa[0], sa[1]))
self.log.info("Opening pakbus connection to {} at tcp://{}:{}".format(self.host,
sa[0], sa[1]))
try:
s = socket.socket(af, socktype, proto)
except socket.error, msg:
......@@ -231,8 +242,10 @@ class PakSock(threading.Thread):
if s:
try:
# Set timeout and try to connect to socket
s.settimeout(self.timeout)
#s.settimeout(self.timeout)
s.setblocking(1)
s.connect(sa)
s.setblocking(0)
except KeyError:
s.close()
s = None
......@@ -268,12 +281,6 @@ class PakSock(threading.Thread):
if hdr['DstNodeId'] != DstNodeId or hdr['SrcNodeId'] != SrcNodeId:
continue
# Respond to incoming hello command packets
if msg['MsgType'] == 0x09:
pkt = pakbus.pkt_hello_response(hdr['SrcNodeId'], hdr['DstNodeId'], msg['TranNbr'])
self.send(pkt)
continue
# Handle "please wait" packets
if msg['TranNbr'] == TranNbr and msg['MsgType'] == 0xa1:
timeout = msg['WaitSec']
......
......@@ -151,7 +151,7 @@ def main():
skt = None
while skt == None:
#skt = pakbus.open_socket(host, port, timeout)
skt = paksock.PakSock(host, port, timeout)
skt = paksock.PakSock(host, port, timeout, my_node_id)
if skt == None:
logging.error("Failed to open socket, retry")
logging.info(" ... waiting for connect")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment