tcp_pump_client.cpp 4.28 KB
Newer Older
1 2 3 4
#include "tcp_pump_client.h"

using namespace CLA;

5 6 7 8
tcp_pump_client::tcp_pump_client(const string& topic, const string& type, \
  const string& substance, std::shared_ptr<CLA::LOGGER>& logger): \
  m_topic(move(topic)), m_type(move(type)), m_logger(logger), \
  m_substance(substance) {
fmg005's avatar
fmg005 committed
9
    /* we can only talk to rosbrige server using json strings; here we want to
10
     * 'subscribe' to 'chatter topic' publishing messages of type 'std_msgs/Float64'
fmg005's avatar
fmg005 committed
11
     */
12
    json_message = "{\"op\":\"subscribe\", \"topic\": \""+m_topic+"\", \"type\": \""+m_type+"\"}";
13
    m_host = "cmdmaster"; /* rosbridge server runs hostname */
14
    port = 9090; /* rosbridge server uses this port number */
15
    SOCKET_TIMEOUT_SEC = 0;
fmg005's avatar
fmg005 committed
16
    SOCKET_TIMEOUT_uSEC = 100000; /* 100 millisecond*/
17
    m_substance_last_sent_rate = 0;
18 19 20 21 22
    m_last_sent_rate = 0;
    m_new_rate = 0;
    m_current_rate = 0;
}

23
void tcp_pump_client::setup_connection() {
fmg005's avatar
fmg005 committed
24
  /* create a TCP socket */
25
  fd = socket(AF_INET, SOCK_STREAM, 0);
26
    m_logger->debug("Trying to connect to "+m_substance+" pump through rosbridge server");
27

28
  if (fd < 0) {
29
    m_logger->error("Failed to create socket for "+m_substance+" pump");
30 31 32 33 34
    exit(EXIT_FAILURE);
  }
  /* look up the address of the server given its name */
  hp = gethostbyname(m_host);
  if(!hp) {
fmg005's avatar
fmg005 committed
35
    m_logger->error("could not obtain address for "+m_substance+" pump");
36 37 38 39 40 41 42 43 44 45 46 47
    exit(EXIT_FAILURE);
  }
  /* initialize sockaddr_in servaddr*/
  memset((char*)&servaddr, 0, sizeof(servaddr));
  /* connect to remote host (rosbridge_server in our case)*/
  servaddr.sin_family = AF_INET;
  servaddr.sin_port = htons(port); /* convert to network byte order */
  /* put host's address into th server address struct */
  memcpy((void*)&servaddr.sin_addr, hp->h_addr_list[0], hp->h_length);
  addrlen = sizeof(servaddr);
  /* initialize buffer s*/
  memset(buff, 0 , sizeof(buff));
48
  /*connect to server */
49 50 51
  if (connect(fd, (struct sockaddr*)&servaddr, addrlen) < 0) {
    m_logger->debug("Connection to "+m_substance+" pump failed");
    exit(EXIT_FAILURE);
52
  }
53
  m_logger->info("Connection to "+m_substance+" pump established");
54 55 56
  /* send json message to server */
  int n = write(fd, json_message.c_str(), strlen(json_message.c_str()));
  if (n < 0) {
57
      m_logger->error("Can not write to "+m_substance+" pump socket");
58
  }
59
  m_logger->debug("subscription to "+m_topic+" request sent to rosbridge server");
60 61
}

62
double tcp_pump_client::getCurrentInfusionRate(const string& substance_name) {
63 64 65 66 67 68
  timeout.tv_sec = SOCKET_TIMEOUT_SEC;
  timeout.tv_usec = SOCKET_TIMEOUT_uSEC;
  FD_ZERO(&set); /* initialize set */
  FD_SET(fd, &set);/* add descriptor to the read set */
  rec_value = select(FD_SETSIZE, &set, NULL, NULL, &timeout);
  if (rec_value == -1 ) {
fmg005's avatar
fmg005 committed
69
    m_logger->error(m_substance+" pump connection socket error");
70 71 72 73 74 75
    exit(EXIT_FAILURE);
  }
  else if (rec_value == 0) {
    /* on timeout -> keep calling the waiting_for_new_rate function
     * to return the last sent value until topic publishes a new rate
     */
76
    m_current_rate = waiting_for_new_rate(substance_name);
77 78
  }
  else {
79
    m_current_rate = get_new_rate(substance_name);
80 81 82 83
  }
  return m_current_rate;
}

84
double tcp_pump_client::get_new_rate(const string& substance_name) {
85 86 87 88
  recvlen = read(fd, buff, BUFF_SIZE);
  if (recvlen > 0) {
    buff[recvlen]  = '\0'; /* adding EOL */
    document.Parse(buff);
89
    if (substance_name == m_substance) {
90
      m_new_rate = document["msg"]["data"].GetDouble();
91
      m_substance_last_sent_rate = m_new_rate;
92
      if (m_new_rate)
93 94
        m_logger->info("New infusion rate for "+m_substance+" received: "\
        +to_string(int(m_new_rate))+" mL/hr");
95 96
    }
  }
97
  else {
98 99
    m_logger->error("Socket connection for "+m_substance+" data to rosbridge \
    server lost");
100 101 102
    exit(EXIT_FAILURE);
  }

103 104 105
  return m_new_rate;
}

106
double tcp_pump_client::waiting_for_new_rate(const string& substance_name) {
107 108 109
  /* while topic is not publishing, keep
   * returning the last sent rate value
   */
110 111 112
  if (substance_name == m_substance) {
    m_logger->debug(m_substance+": waiting for new infusion rate ...");
    m_last_sent_rate = m_substance_last_sent_rate;
113 114 115 116 117 118 119 120 121
  }
  return m_last_sent_rate;
}

void tcp_pump_client::clear() {
  hp = nullptr;
}

tcp_pump_client::~tcp_pump_client() {
122
  m_logger->debug("Shutting down "+m_substance+" pump connection");
123 124 125 126
  close(fd);
  clear();
  delete hp;
}