Commit 7364d741 authored by djk032's avatar djk032

added tcp client for publishing control messages to algorithm

parent 923f2bf2
#ifndef TCP_CONTROL_CLIENT_H
#define TCP_CONTROL_CLIENT_H
#include <sys/types.h>
#include <sys/socket.h> /* socket functions */
#include <sys/time.h> /* FD_SET, FD_ISSET, FD_ZERO, timeval, fd_set */
#include <sys/select.h>/* select() */
#include <stdio.h>
#include <stdlib.h> /* exit, EXIT_FAILURE */
#include <string.h> /* memcpy */
#include <unistd.h>
#include <netdb.h> /* gethostbyname */
#include <netinet/in.h>/* sockaddr_in */
#include <iostream>
#include <memory>
#include "CLA_Logger.h"
#include "rapidjson/document.h"
#include "rapidjson/writer.h"
#include "rapidjson/stringbuffer.h"
using namespace rapidjson; /* third party library for parsing json data */
using namespace std;
namespace CLA {
class tcp_control_client {
/* impementation of a non-blocking tcp client to talk
* to the rosbridge udp server
*/
private:
struct hostent *hp; /* host information */
struct sockaddr_in servaddr; /* server address information */
struct timeval timeout;
string json_message;
string m_topic;
string m_type;
const char* m_host;
int port;
int fd, rec_value,recvlen;
int SOCKET_TIMEOUT_SEC;
int SOCKET_TIMEOUT_uSEC;
socklen_t addrlen;
Document document; /* to parse json data */
Value m_patient_data;
Value _topic;
StringBuffer strbuf;
fd_set set; /* descriptor read set*/
std::shared_ptr<CLA::LOGGER> m_logger;
public:
tcp_control_client(std::shared_ptr<CLA::LOGGER>&, const string&, const string&);
~tcp_control_client();
void setup_connection();
void send_data_to_server(const string&);
void publish_control_message(const string&);
void waiting_to_send();
void clear();
};
}
#endif
#include "tcp_control_client.h"
using namespace CLA;
tcp_control_client::tcp_control_client(std::shared_ptr<CLA::LOGGER>& logger,
const string& topic, const string& type): m_topic(move(topic)), m_type(move(type)),
m_logger(logger) {
/* we can only talk to rosbrige server using json strings; here we want to
* `publish` to `control topic` -> messages of type `std_msgs/Float64`
*/
json_message = "{\"op\":\"advertise\", \"topic\": \""+m_topic+"\", \"type\": \""+m_type+"\"}";
m_host = "cmdmaster"; /* rosbridge server hostname */
port = 9090; /* rosbridge server uses this port number */
SOCKET_TIMEOUT_SEC = 0;
SOCKET_TIMEOUT_uSEC = 500000;/* block for 0.5s */
}
void tcp_control_client::setup_connection() {
/* create a TCP socket */
fd = socket(AF_INET, SOCK_STREAM, 0);
m_logger->debug("Trying to connect to rosbridge server to publish control data");
if (fd < 0) {
m_logger->error("Failed to create socket for publishing control data");
exit(EXIT_FAILURE);
}
/* look up the address of the server given its name */
hp = gethostbyname(m_host);
if(!hp) {
m_logger->error("Could not obtain address");
exit(EXIT_FAILURE);
}
/* initialize sockaddr_in servaddr*/
memset((char*)&servaddr, 0, sizeof(servaddr));
/* connect to remote host (rosbridge_server in our case)*/
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(port); /* convert to network byte order */
/* put host's address into th server address struct */
memcpy((void*)&servaddr.sin_addr, hp->h_addr_list[0], hp->h_length);
addrlen = sizeof(servaddr);
/*connect to server */
if (connect(fd, (struct sockaddr*)&servaddr, addrlen) < 0) {
m_logger->error("Connection to rosbridge server Failed, can't publish control data");
exit(EXIT_FAILURE);
}
m_logger->info("Connection to rosbrige server established, ready to publish control data");
/* send json message to server */
int n = write(fd, json_message.c_str(), strlen(json_message.c_str()));
if (n < 0) {
m_logger->error("Can not write control data to socket");
}
m_logger->debug("Advertise request to "+m_topic+" sent to rosbridge server");
}
void tcp_control_client::publish_control_message(const string& c_data) {
timeout.tv_sec = SOCKET_TIMEOUT_SEC;
timeout.tv_usec = SOCKET_TIMEOUT_uSEC;
FD_ZERO(&set); /* initialize set */
FD_SET(fd, &set);/* add descriptor to the write set */
rec_value = select(FD_SETSIZE, NULL, &set, NULL, &timeout);
if (rec_value == -1 ) {
m_logger->error("socket error while connecting to "+m_topic);
exit(EXIT_FAILURE);
}
else if (rec_value == 0) {
/* on timeout -> keep calling the waiting_to_send function
*/
waiting_to_send();
}
else {
m_logger->info("publishing control message: " + c_data);
send_data_to_server(c_data);
}
}
void tcp_control_client::send_data_to_server(const string& c_data) {
/* rapidjson storing reference to string */
m_patient_data = StringRef(c_data.c_str(), strlen(c_data.c_str()));
_topic = StringRef(m_topic.c_str(), strlen(m_topic.c_str()));
document.SetObject();
Document::AllocatorType& allocator = document.GetAllocator();
Value object(kObjectType);
document.AddMember("op", "publish", allocator);
document.AddMember("topic", _topic, allocator);
object.AddMember("data", m_patient_data, allocator);
document.AddMember("msg", object, allocator);
Writer<StringBuffer> writer(strbuf);
document.Accept(writer);
string json_msg = strbuf.GetString();
int i = send(fd, json_msg.c_str(), strlen(json_msg.c_str()), 0);
if (i < 0) {
m_logger->error("Can't write to socket to publish "+m_topic+" data");
}
strbuf.Clear();
m_logger->debug("Publishing control message to "+m_topic+" topic " );
}
void tcp_control_client::waiting_to_send() {
m_logger->info("Waiting for new control message to send ...");
}
void tcp_control_client::clear() {
hp = nullptr;
}
tcp_control_client::~tcp_control_client() {
m_logger->debug("Shutting down control socket connection");
close(fd);
clear();
delete hp;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment