tcp_pump_client.cpp 4.72 KB
Newer Older
1 2 3 4
#include "tcp_pump_client.h"

using namespace CLA;

5
tcp_pump_client::tcp_pump_client(std::shared_ptr<CLA::LOGGER>& logger) : m_logger(logger) {
6 7 8 9 10 11
  /* we can only talk to rosbrige server using json strings; here we want to
   * 'subscribe' to 'chatter topic' publishing messages of type 'std_msgs/String'
   */
    json_message = "{\"op\":\"subscribe\", \"topic\":\"/chatter\", \"type\":\"std_msgs/Float64\"}";
    m_host = "localhost"; /* rosbridge server runs on localhost */
    port = 9090; /* rosbridge server uses this port number */
12 13 14
    SOCKET_TIMEOUT_SEC = 0;
    SOCKET_TIMEOUT_uSEC = 100000;
    m_substance_last_sent_rate = 0;
15 16 17 18 19
    m_last_sent_rate = 0;
    m_new_rate = 0;
    m_current_rate = 0;
}

20 21 22 23
tcp_pump_client::tcp_pump_client(const string& topic, const string& type, \
  const string& substance, std::shared_ptr<CLA::LOGGER>& logger): \
  m_topic(move(topic)), m_type(move(type)), m_logger(logger), \
  m_substance(substance) {
24 25 26
    json_message = "{\"op\":\"subscribe\", \"topic\": \""+m_topic+"\", \"type\": \""+m_type+"\"}";
    m_host = "localhost"; /* rosbridge server runs on localhost */
    port = 9090; /* rosbridge server uses this port number */
27 28 29
    SOCKET_TIMEOUT_SEC = 0;
    SOCKET_TIMEOUT_uSEC = 100000;
    m_substance_last_sent_rate = 0;
30 31 32 33 34
    m_last_sent_rate = 0;
    m_new_rate = 0;
    m_current_rate = 0;
}

35
void tcp_pump_client::setup_connection() {
36 37
  /* create a UDP socket */
  fd = socket(AF_INET, SOCK_STREAM, 0);
38 39
    m_logger->debug("Connecting to rosbridge server for "+m_substance+" pump data ...");

40
  if (fd < 0) {
41
    m_logger->error("Failed to create socket for "+m_substance+" pump");
42 43 44 45 46
    exit(EXIT_FAILURE);
  }
  /* look up the address of the server given its name */
  hp = gethostbyname(m_host);
  if(!hp) {
47
    std::cerr << "could not obtain address " << m_host << std::endl;
48 49 50 51 52 53 54 55 56 57 58 59
    exit(EXIT_FAILURE);
  }
  /* initialize sockaddr_in servaddr*/
  memset((char*)&servaddr, 0, sizeof(servaddr));
  /* connect to remote host (rosbridge_server in our case)*/
  servaddr.sin_family = AF_INET;
  servaddr.sin_port = htons(port); /* convert to network byte order */
  /* put host's address into th server address struct */
  memcpy((void*)&servaddr.sin_addr, hp->h_addr_list[0], hp->h_length);
  addrlen = sizeof(servaddr);
  /* initialize buffer s*/
  memset(buff, 0 , sizeof(buff));
60
  /*connect to server */
61 62 63
  if (connect(fd, (struct sockaddr*)&servaddr, addrlen) < 0) {
    m_logger->debug("Connection to "+m_substance+" pump failed");
    exit(EXIT_FAILURE);
64
  }
65
  m_logger->info("Connection to "+m_substance+" pump established");
66 67 68
  /* send json message to server */
  int n = write(fd, json_message.c_str(), strlen(json_message.c_str()));
  if (n < 0) {
69
      m_logger->error("Can not write to "+m_substance+" pump socket");
70
  }
71
  m_logger->debug("subscription to "+m_topic+" request sent to rosbridge server");
72 73
}

74
double tcp_pump_client::getCurrentInfusionRate(const string& substance_name) {
75 76 77 78 79 80
  timeout.tv_sec = SOCKET_TIMEOUT_SEC;
  timeout.tv_usec = SOCKET_TIMEOUT_uSEC;
  FD_ZERO(&set); /* initialize set */
  FD_SET(fd, &set);/* add descriptor to the read set */
  rec_value = select(FD_SETSIZE, &set, NULL, NULL, &timeout);
  if (rec_value == -1 ) {
81
    m_logger->error(m_substance+" connection socket error");
82 83 84 85 86 87
    exit(EXIT_FAILURE);
  }
  else if (rec_value == 0) {
    /* on timeout -> keep calling the waiting_for_new_rate function
     * to return the last sent value until topic publishes a new rate
     */
88
    m_current_rate = waiting_for_new_rate(substance_name);
89 90
  }
  else {
91
    m_current_rate = get_new_rate(substance_name);
92 93 94 95
  }
  return m_current_rate;
}

96
double tcp_pump_client::get_new_rate(const string& substance_name) {
97 98 99 100
  recvlen = read(fd, buff, BUFF_SIZE);
  if (recvlen > 0) {
    buff[recvlen]  = '\0'; /* adding EOL */
    document.Parse(buff);
101
    if (substance_name == m_substance) {
102
      m_new_rate = document["msg"]["data"].GetDouble();
103
      m_substance_last_sent_rate = m_new_rate;
104
      if (m_new_rate)
105 106
        m_logger->info("New infusion rate for "+m_substance+" received: "\
        +to_string(int(m_new_rate))+" mL/hr");
107 108
    }
  }
109
  else {
110 111
    m_logger->error("Socket connection for "+m_substance+" data to rosbridge \
    server lost");
112 113 114
    exit(EXIT_FAILURE);
  }

115 116 117
  return m_new_rate;
}

118
double tcp_pump_client::waiting_for_new_rate(const string& substance_name) {
119 120 121
  /* while topic is not publishing, keep
   * returning the last sent rate value
   */
122 123 124
  if (substance_name == m_substance) {
    m_logger->debug(m_substance+": waiting for new infusion rate ...");
    m_last_sent_rate = m_substance_last_sent_rate;
125
  }
126

127 128 129 130 131 132 133 134
  return m_last_sent_rate;
}

void tcp_pump_client::clear() {
  hp = nullptr;
}

tcp_pump_client::~tcp_pump_client() {
135
  m_logger->debug("Shutting down "+m_substance+" pump connection");
136 137 138 139
  close(fd);
  clear();
  delete hp;
}