Commit ddd6e127 authored by wx002's avatar wx002

finish rdt project

parent 07aa3d1b
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
./tmp
./files
# misc
.idea/
This diff is collapsed.
This source diff could not be displayed because it is too large. You can view the blob instead.
import matplotlib.pyplot as plt
import numpy as np
x = ['Ideal','Good','Medium']
y = [1410.0857887050586, 135.1386281553791, 70.00680633241944]
title = 'Transfer rate in bytes per second benchmarks for RDT'
y_pos = np.arange(len(x))
plt.bar(y_pos, y, align='center')
plt.xticks(y_pos, x)
plt.ylabel('Transfer rate in bytes per second')
plt.xlabel('Network Conditions')
plt.title(title)
plt.show()
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
This is implementation of RDT (Reliable Data Transfer) protocol over UDP.
Running instructions:
0. Run udp_box.py with respective commandline arguments. Addr=127.0.0.1, port=8880, remote_port=8888
1. Run recv.py. Parameters: --addr (the address to listen to, default=127.0.0.1), --port (the port to listen to, default=8888)
2. run sender.py. Parameters: --remote_addr(the remote address to send file to, default=127.0.0.1), --remote_port (the remote port to send file to, default=8880), --file (the file that will be sending, default=sendData/alice.txt)
The default configuration is meant to run with the udp_box, in the following configuration:
sender.py <-> udp_box.py <-> recv.py
Basic benchmark using udp_box:
conditions:
- 50kbs bandwidth
- Sending file: Alice.txt, size=167546 bytes
- parameters: loss rate, out of order rate(ooo_rate), duplicate packet rate(dupe_rate), byte error rate(ber)
Tested with 4 conditions (Ideal, good, medium, and terrible)
Ideal: (--loss_rate 0 --ooo_rate 0 --dupe_rate 0 --ber 0)
- time: 118.819721s
- transfer rate: 1410.0857887050586 bytes per second
Good: (--loss_rate 0.01 --ooo_rate 0.01 --dupe_rate 0.01 --ber 1e-8)
- time: 1239.808353s
- transfer rate: 135.1386281553791 bytes per second
Medium: (--loss_rate 0.02 --ooo_rate 0.02 --dupe_rate 0.02 --ber 1e-6):
- time: 2393.281579s
- transfer rate: 70.00680633241944 bytes per second
Terrible: (--loss_rate 0.1 --ooo_rate 0.03 --dupe_rate 0.05 --ber 1e-3):
- time: Infinity
- transfer rate: SLOW
Pros:
- Ensures data is correctly transferee
- There is almost no chance of data error
- Packets arrive in sequential order
Cons:
- It is not using the maximum bandwidth at all
- Transfer rate is very low since is sending lines
- It require insane amount of time with non ideal conditions
- Under terrible conditions, the transfer will last indefinitely
Protocol sequence diagram:
Sender Receiver
| ---------------line 0------------------> |
| <--------------ACK for line 0----------- |
| ---------------line 1------------------> |
| <--------------ACK for line 1----------- |
| ---------------line 2------------------> |
| <--------------ACK for line 2----------- |
| ---------------line 3------------------> |
| <--------------ACK for line 3----------- |
| ---------------line 4------------------> |
| <--------------ACK for line 4----------- |
| ---------------line 4------------------> |
| <--------------ACK for line 4----------- |
| ---------------line 5------------------> |
| <--------------ACK for line 5----------- |
| ---------------line 6------------------> |
| <--------------ACK for line 6----------- |
| ---------------line 7------------------> |
| <--------------ACK for line 7----------- |
| ---------------line 8------------------> |
| <--------------ACK for line 8----------- |
| ---------------END ------------------> |
| <--------------END---------------------- |
Sender Receiver
Packet structure:
Each packet is representing a line of the file. Header is the index. All packets arrive at sequential order.
structure: encoded Header + \t + encoded string + \t + raw data string
Encoded Header:
- Index of the packet, encoded using base64
Encoded String:
- The content of the line, encoded using base64
Verify Method:
- Header must arrive in order, discard packet if is out of order and wait for resend
- The decoded encoded string must match the raw data string. Otherwise, packet is corrupted and ask for resend
......@@ -3,8 +3,30 @@ import re
import base64
import linecache
import sys
from datetime import datetime
import argparse
dest = ('127.0.0.1', 8888)
def get_argus():
'''
gets the arguements
:return:
'''
parser = argparse.ArgumentParser(
description="Receiver For RDT stop and wait with checksum"
)
parser.add_argument('--addr',
default='127.0.0.1',
help='The address of the sender to listen to')
parser.add_argument('--port',
default=8888, type=int,
help='The port of the sender to listen to')
return parser.parse_args()
argus = get_argus()
addr = argus.addr
port = argus.port
dest = (addr, port)
currentIndex = -1
sock = socket(AF_INET, SOCK_DGRAM)
......@@ -13,7 +35,13 @@ sock.bind(dest)
fileList = []
lineStr = ''
def PrintException():
'''
Print detail exception details
taken from: https://stackoverflow.com/questions/14519177/python-exception-handling-line-number
:return:
'''
exc_type, exc_obj, tb = sys.exc_info()
f = tb.tb_frame
lineno = tb.tb_lineno
......@@ -50,7 +78,7 @@ def verify_packet_content(stringList):
return False
startTime = datetime.now()
while True:
data, addr = sock.recvfrom(1048)
......@@ -82,6 +110,7 @@ while True:
packetList[2]))
print('sending ACK...')
sock.sendto(b'ACK\t'+ base64.b64encode(str(index).encode()), addr)
# print('recvived 1 line!')
else:
# corrupted content, ask for the same line again
......@@ -90,21 +119,20 @@ while True:
elif index == currentIndex and len(packetList) == 3: # duplicated packet, resend ack
print('resending ACK for packet index {}, next packet index is {}'.format(index, currentIndex+1))
sock.sendto(b'ACK\t'+base64.b64encode(str(currentIndex).encode()), addr)
continue
else:
print('packet error for index {} and current index {}! Ask for resend!',format(index, currentIndex))
print('packet error for index {} and current index {}! Ask for resend!',
format(str(index), str(currentIndex)))
sock.sendto(b'NNNN', addr)
lineStr = ''
except:
print(index, packetList)
PrintException()
sock.sendto(b'NNNN', addr)
# print(fileList)
lineStr = ''
#lineStr = ''
# data = ''
#print(fileList)
continue
if data == b'END':
print("got all contents!")
......@@ -114,4 +142,10 @@ while True:
file = open('recvData/recv.txt', 'w+')
file.writelines(fileList)
file.close()
endTime = datetime.now()
# print total time in sec
total_time = (endTime-startTime).total_seconds()
print('total time: {}'.format(total_time))
file_size = 167546
print('Transfer rate: {} bytes per second'.format(file_size/total_time))
exit(0)
"""
Reliable Reciever
The goal is to create a reliable transport protocol. This file is the side
that waits for a client to transfer files (server).
Data is transmitted over UDP. A separate udp_box can enforce data rate
limits and/or packet loss or corruption.
(c) Alan Marchiori 2019
"""
import argparse
import socket
import logging
import logging.config
import sys
import json
import algs
from algs.texcept import TransferFailed
# custom exception
class AlgorithmNotImplementedError(NotImplementedError): pass
def en_logging():
"setup loggers"
#https://docs.python.org/3/howto/logging-cookbook.html
logging.config.dictConfig(json.load(open('log_cfg.json', 'r')))
def parse_args():
"Sets up the argument parser"
parser = argparse.ArgumentParser(
description="Reliable Server (starter code)."
)
parser.add_argument('--outdir',
default='./tmp',
help="Directory to store recieved files [./tmp].")
parser.add_argument('--mtu',
default=100,
type=int,
help='Maximum transmition unit (MTU) (bytes) [100].')
parser.add_argument('--addr',
default = '0.0.0.0',
help='Local addres to listen on [0.0.0.0].')
parser.add_argument('--port',
help='Port to listen on [8888].',
default=8888,
type=int)
# add additional algorithms here.
parser.add_argument('--alg',
help='The algorithm to use [sw].',
default='sw',
choices=['sw', 'yours'])
return parser.parse_args()
if __name__ == "__main__":
en_logging()
args = parse_args()
# look at those args
logging.debug("Got args {}".format(args))
if args.alg == 'sw': # stop and wait protocol
# the server should never stop...
try:
algs.sw.run_server(
outdir=args.outdir,
addr=(args.addr, args.port),
mtu=args.mtu)
except Exception as x:
logging.error("Server died: {}".format(x))
raise(x)
sys.exit(-15)
else:
raise AlgorithmNotImplementedError()
"""
Reliable Sender
The goal is to create a reliable transport protocol. This file is the side
that initiates a transfer (client). The rserver.py file has the server codeself.
Data is transmitted over UDP. A separate udproxy server can enforce data rate
limits and/or packet loss or corruption.
(c) Alan Marchiori 2019
"""
import argparse
import socket
import logging
import logging.config
import sys
import json
import algs
from algs.texcept import TransferFailed
# custom exception
class AlgorithmNotImplementedError(NotImplementedError): pass
def en_logging():
"setup loggers"
#https://docs.python.org/3/howto/logging-cookbook.html
logging.config.dictConfig(json.load(open('log_cfg.json', 'r')))
def parse_args():
"Sets up the argument parser"
parser = argparse.ArgumentParser(
description="Reliable Sender (starter code)."
)
parser.add_argument('files',
default='alice.txt', nargs="+",
help="File(s) to send [alice.txt].")
parser.add_argument('--mtu',
default=100,
type=int,
help='Maximum transmition unit (MTU) (bytes) [100].')
parser.add_argument('--dst',
default = 'localhost',
help='Destination host, eg. hostname or 192.168.3.101 [localhost].')
parser.add_argument('--port',
help='Port on destination host [8888].',
default=8888,
type=int)
# add additional algorithms here.
parser.add_argument('--alg',
help='The algorithm to use [sw].',
default='sw',
choices=['sw', 'yours'])
return parser.parse_args()
if __name__ == "__main__":
en_logging()
# uncomment this to see every packet.
#logging.getLogger("algs.udp_wrapper").setLevel(logging.DEBUG)
args = parse_args()
# look at those args
logging.debug("Got args {}".format(args))
if args.alg == 'sw': # stop and wait protocol
# map files list into multiple calls
try:
result = list(map(lambda x: algs.sw.send_file(
filename=x,
dest=(args.dst, args.port),
mtu=args.mtu),
args.files))
except TransferFailed as x:
logging.error("Transfer failed: {}".format(x))
raise(x)
sys.exit(-5)
else:
raise AlgorithmNotImplementedError()
......@@ -3,7 +3,7 @@ import base64
import re
import linecache
import sys
import argparse
def PrintException():
exc_type, exc_obj, tb = sys.exc_info()
......@@ -84,10 +84,30 @@ def rdt_sendFile(network, dest, filename, size=65536):
network.close()
def get_argus():
'''
gets the arguements
:return:
'''
parser = argparse.ArgumentParser(
description="Sender For RDT stop and wait with checksum"
)
parser.add_argument('--remote_addr',
default='127.0.0.1',
help='The remote address to send to.')
parser.add_argument('--remote_port',
default=8880, type=int,
help='The remote address port')
parser.add_argument('--file',
default='sendData/alice.txt',
help='The file to send')
return parser.parse_args()
if __name__ == '__main__':
ip = '127.0.0.1'
port = 8880
argus = get_argus()
ip = argus.remote_addr
port = argus.remote_port
dest = (ip, port)
timeOut = 10
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
......
This source diff could not be displayed because it is too large. You can view the blob instead.
......@@ -27,6 +27,7 @@ from algs.udp_wrapper import UdpWrapper
import select
from datetime import datetime, timedelta
import random
import time
def en_logging():
"setup loggers"
......
"""
Recieve over UDP with zmodem
"""
import argparse
import subprocess
import shlex
import socket
import select
import logging
import logging.config
import json
import fcntl
import sys
import os
logging.config.dictConfig(json.load(open('log_cfg.json', 'r')))
log = logging.getLogger()
parser = argparse.ArgumentParser(
description="Zmodem UDP receiver."
)
parser.add_argument('--addr',
help='Address to listen on [0.0.0.0].',
default='0.0.0.0')
parser.add_argument('--port',
help='Port to listen on [8888].',
default=8888,
type=int)
args = parser.parse_args()
addr = (args.addr, args.port)
log.info("Zrecieve waiting for files.")
# -y is for clobber existing files.
args = shlex.split('/usr/bin/rz -vv -y')
zproc = subprocess.Popen(args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
sock.setblocking(False)
sock.bind(addr)
# set zproc stdout to nonblocking
# https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
fl = fcntl.fcntl(zproc.stdout, fcntl.F_GETFL)
fcntl.fcntl(zproc.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
remote_addr = None
while True:
r,w,e = select.select([zproc.stdout, sock], [],[])
if sock in r:
# read from the socket ---> zproc (stdin)
d, remote_addr = sock.recvfrom(4096)
#log.debug("s->z: {}".format(d))
wr = zproc.stdin.write(d)
if (wr != len(d)):
log.error("Failed to write all bytes to zproc.stdin! [{}/{}]".format(
wr, len(d)
))
zproc.stdin.flush()
if zproc.stdout in r:
# read from zproc (stdout) ---> socket
d = zproc.stdout.read(4096)
if len(d) == 0: # is eof?
zproc.wait()
rc = zproc.returncode
if rc == 0:
log.info("Zrecv SUCCESS.")
else:
log.error("Zrecv FAILED [{}]".format(
rc
))
sys.exit(rc)
break
if remote_addr:
#log.debug("z->s: {}".format(d))
sock.sendto(d, remote_addr)
#else drop the data, zmodem can deal with this.
sock.close()
"""
Recieve over UDP with zmodem
"""
import argparse
import subprocess
import shlex
import socket
import select
import logging
import logging.config
import json
import fcntl
import sys
import os
logging.config.dictConfig(json.load(open('log_cfg.json', 'r')))
log = logging.getLogger()
parser = argparse.ArgumentParser(
description="Zmodem UDP sender."
)
parser.add_argument('files', nargs="+",
help='files to send.')
parser.add_argument('--dst',
default = 'localhost',
help='Destination host, eg. hostname or 192.168.3.101 [localhost].')
parser.add_argument('--port',
help='Port on destination host [8888].',
default=8888,
type=int)
args = parser.parse_args()
dest = (args.dst, args.port)
for fname in args.files:
log.info("ZSending {} to {}.".format(fname, dest))
args = shlex.split('/usr/bin/sz -vv {}'.format(fname))
zproc = subprocess.Popen(args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
sock.setblocking(False)
# set zproc stdout to nonblocking
# https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
fl = fcntl.fcntl(zproc.stdout, fcntl.F_GETFL)
fcntl.fcntl(zproc.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
while True:
r,w,e = select.select([zproc.stdout, sock], [],[])
if sock in r:
# read from the socket ---> zproc (stdin)
d = sock.recv(4096)
#log.debug("s->z: {}".format(d))
wr = zproc.stdin.write(d)
if (wr != len(d)):
log.error("Failed to write all bytes to zproc.stdin! [{}/{}]".format(
wr, len(d)
))
zproc.stdin.flush()
if zproc.stdout in r:
# read from zproc (stdout) ---> socket
d = zproc.stdout.read(4096)
if len(d) == 0: # is eof?
# need to wait for the process to end
# communicate does this.
#zproc.communicate()
zproc.wait()
rc = zproc.returncode
if rc == 0:
log.info("ZSend SUCCESS, {} sent to {}".format(
fname, dest
))
else:
log.error("ZSend FAILED [{}], {} likely NOT sent to {}.".format(
rc, fname, dest
))
sys.exit(rc)
break
#log.debug("z->s: {}".format(d))
sock.sendto(d, dest)
sock.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment