tcp_pump_client.cpp 5.01 KB
Newer Older
1 2 3 4
#include "tcp_pump_client.h"

using namespace CLA;

5
tcp_pump_client::tcp_pump_client(std::shared_ptr<CLA::LOGGER>& logger) : m_logger(logger) {
6 7 8 9 10 11 12
  /* we can only talk to rosbrige server using json strings; here we want to
   * 'subscribe' to 'chatter topic' publishing messages of type 'std_msgs/String'
   */
    json_message = "{\"op\":\"subscribe\", \"topic\":\"/chatter\", \"type\":\"std_msgs/Float64\"}";
    m_host = "localhost"; /* rosbridge server runs on localhost */
    port = 9090; /* rosbridge server uses this port number */
    SOCKET_TIMEOUT_SEC = 1;
13
    SOCKET_TIMEOUT_uSEC = 0;
14 15 16 17 18 19
    m_bpdrug_last_sent_rate = 0;
    m_last_sent_rate = 0;
    m_new_rate = 0;
    m_current_rate = 0;
}

20 21 22
tcp_pump_client::tcp_pump_client(const string& topic, const string& type,
std::shared_ptr<CLA::LOGGER>& logger): m_topic(move(topic)), m_type(move(type)),
m_logger(logger) {
23 24 25 26
    json_message = "{\"op\":\"subscribe\", \"topic\": \""+m_topic+"\", \"type\": \""+m_type+"\"}";
    m_host = "localhost"; /* rosbridge server runs on localhost */
    port = 9090; /* rosbridge server uses this port number */
    SOCKET_TIMEOUT_SEC = 1;
27
    SOCKET_TIMEOUT_uSEC = 0;
28 29 30 31 32 33
    m_bpdrug_last_sent_rate = 0;
    m_last_sent_rate = 0;
    m_new_rate = 0;
    m_current_rate = 0;
}

34
void tcp_pump_client::setup_connection() {
35 36
  /* create a UDP socket */
  fd = socket(AF_INET, SOCK_STREAM, 0);
37
  m_logger->debug("Connecting to rosbridge server for pump data ...");
38
  if (fd < 0) {
39 40
    std::cerr <<"can not create socket"<<std::endl;
    m_logger->error("Failed to create socket");
41 42 43 44 45
    exit(EXIT_FAILURE);
  }
  /* look up the address of the server given its name */
  hp = gethostbyname(m_host);
  if(!hp) {
46
    std::cerr << "could not obtain address " << m_host << std::endl;
47 48 49 50 51 52 53 54 55 56 57 58 59
    exit(EXIT_FAILURE);
  }
  /* initialize sockaddr_in servaddr*/
  memset((char*)&servaddr, 0, sizeof(servaddr));
  /* connect to remote host (rosbridge_server in our case)*/
  servaddr.sin_family = AF_INET;
  servaddr.sin_port = htons(port); /* convert to network byte order */
  /* put host's address into th server address struct */
  memcpy((void*)&servaddr.sin_addr, hp->h_addr_list[0], hp->h_length);
  addrlen = sizeof(servaddr);
  /* initialize buffer s*/
  memset(buff, 0 , sizeof(buff));
  if (connect(fd, (struct sockaddr*)&servaddr, addrlen) < 0) {
60
  /*connect to server */
61
     cout << "Connection Failed \n";
62
     m_logger->error("Connection Failed");
63 64 65
     exit(EXIT_FAILURE);

  }
66
  m_logger->debug("Connection established");
67 68 69
  /* send json message to server */
  int n = write(fd, json_message.c_str(), strlen(json_message.c_str()));
  if (n < 0) {
70 71
      cerr <<"Can not write to socket\n";
      m_logger->error("Can not write to socket");
72
  }
73
  m_logger->debug("subscription to "+ m_topic+" request sent to rosbridge server");
74 75 76 77 78 79 80 81 82
}

double tcp_pump_client::getCurrentInfusionRate(const string& substance) {
  timeout.tv_sec = SOCKET_TIMEOUT_SEC;
  timeout.tv_usec = SOCKET_TIMEOUT_uSEC;
  FD_ZERO(&set); /* initialize set */
  FD_SET(fd, &set);/* add descriptor to the read set */
  rec_value = select(FD_SETSIZE, &set, NULL, NULL, &timeout);
  if (rec_value == -1 ) {
83 84
    cerr << "socket error\n";
    m_logger->error("socket error");
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
    exit(EXIT_FAILURE);
  }
  else if (rec_value == 0) {
    /* on timeout -> keep calling the waiting_for_new_rate function
     * to return the last sent value until topic publishes a new rate
     */
    m_current_rate = waiting_for_new_rate(substance);
  }
  else {
    m_current_rate = get_new_rate(substance);
  }
  return m_current_rate;
}

double tcp_pump_client::get_new_rate(const string& substance) {
  recvlen = read(fd, buff, BUFF_SIZE);
  if (recvlen > 0) {
    buff[recvlen]  = '\0'; /* adding EOL */
    document.Parse(buff);
    if (substance == "Norepinephrine") {
      m_new_rate = document["msg"]["data"].GetDouble();
      m_bpdrug_last_sent_rate = m_new_rate;
107 108
      if (m_new_rate)
        m_logger->debug("New infusion rate for Norepinephrine received: "+to_string(int(m_new_rate))+" mL/hr");
109 110 111 112
    }
    else if (substance == "Saline") {
      m_new_rate = document["msg"]["data"].GetDouble();
      m_saline_last_sent_rate = m_new_rate;
113 114
      if (m_new_rate > 0)
        m_logger->info("New infusion rate for Saline received "+to_string(int(m_new_rate))+" mL/hr");
115 116
    }
  }
117 118 119 120 121
  else {
    m_logger->error("Socket connection to rosbridge server lost");
    exit(EXIT_FAILURE);
  }

122 123 124 125 126 127 128 129
  return m_new_rate;
}

double tcp_pump_client::waiting_for_new_rate(const string& substance) {
  /* while topic is not publishing, keep
   * returning the last sent rate value
   */
  if (substance == "Norepinephrine") {
130
    m_logger->debug(substance+": waiting for new infusion rate ...");
131 132 133
    m_last_sent_rate = m_bpdrug_last_sent_rate;
  }
  else if (substance == "Saline") {
134
    m_logger->debug(substance +": waiting for new infusion rate ...");
135 136 137 138 139 140 141 142 143 144
    m_last_sent_rate = m_saline_last_sent_rate;
  }
  return m_last_sent_rate;
}

void tcp_pump_client::clear() {
  hp = nullptr;
}

tcp_pump_client::~tcp_pump_client() {
145
  m_logger->debug("Shutting down socket connection");
146 147 148 149
  close(fd);
  clear();
  delete hp;
}