blob: 05bc22a73c4e0a547c8abe06afb676da11fb4d19 [file] [log] [blame]
#ifndef CAFFE2_UTILS_ZMQ_HELPER_H_
#define CAFFE2_UTILS_ZMQ_HELPER_H_
#include <zmq.h>
#include "caffe2/core/logging.h"
namespace caffe2 {
class ZmqContext {
public:
explicit ZmqContext(int io_threads) : ptr_(zmq_ctx_new()) {
CAFFE_ENFORCE(ptr_ != nullptr, "Failed to create zmq context.");
int rc = zmq_ctx_set(ptr_, ZMQ_IO_THREADS, io_threads);
CAFFE_ENFORCE_EQ(rc, 0);
rc = zmq_ctx_set(ptr_, ZMQ_MAX_SOCKETS, ZMQ_MAX_SOCKETS_DFLT);
CAFFE_ENFORCE_EQ(rc, 0);
}
~ZmqContext() {
int rc = zmq_ctx_destroy(ptr_);
CAFFE_ENFORCE_EQ(rc, 0);
}
void* ptr() { return ptr_; }
private:
void* ptr_;
C10_DISABLE_COPY_AND_ASSIGN(ZmqContext);
};
class ZmqMessage {
public:
ZmqMessage() {
int rc = zmq_msg_init(&msg_);
CAFFE_ENFORCE_EQ(rc, 0);
}
~ZmqMessage() {
int rc = zmq_msg_close(&msg_);
CAFFE_ENFORCE_EQ(rc, 0);
}
zmq_msg_t* msg() { return &msg_; }
void* data() { return zmq_msg_data(&msg_); }
size_t size() { return zmq_msg_size(&msg_); }
private:
zmq_msg_t msg_;
C10_DISABLE_COPY_AND_ASSIGN(ZmqMessage);
};
class ZmqSocket {
public:
explicit ZmqSocket(int type)
: context_(1), ptr_(zmq_socket(context_.ptr(), type)) {
CAFFE_ENFORCE(ptr_ != nullptr, "Failed to create zmq socket.");
}
~ZmqSocket() {
int rc = zmq_close(ptr_);
CAFFE_ENFORCE_EQ(rc, 0);
}
void Bind(const string& addr) {
int rc = zmq_bind(ptr_, addr.c_str());
CAFFE_ENFORCE_EQ(rc, 0);
}
void Unbind(const string& addr) {
int rc = zmq_unbind(ptr_, addr.c_str());
CAFFE_ENFORCE_EQ(rc, 0);
}
void Connect(const string& addr) {
int rc = zmq_connect(ptr_, addr.c_str());
CAFFE_ENFORCE_EQ(rc, 0);
}
void Disconnect(const string& addr) {
int rc = zmq_disconnect(ptr_, addr.c_str());
CAFFE_ENFORCE_EQ(rc, 0);
}
int Send(const string& msg, int flags) {
int nbytes = zmq_send(ptr_, msg.c_str(), msg.size(), flags);
if (nbytes) {
return nbytes;
} else if (zmq_errno() == EAGAIN) {
return 0;
} else {
LOG(FATAL) << "Cannot send zmq message. Error number: "
<< zmq_errno();
return 0;
}
}
int SendTillSuccess(const string& msg, int flags) {
CAFFE_ENFORCE(msg.size(), "You cannot send an empty message.");
int nbytes = 0;
do {
nbytes = Send(msg, flags);
} while (nbytes == 0);
return nbytes;
}
int Recv(ZmqMessage* msg) {
int nbytes = zmq_msg_recv(msg->msg(), ptr_, 0);
if (nbytes >= 0) {
return nbytes;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
return 0;
} else {
LOG(FATAL) << "Cannot receive zmq message. Error number: "
<< zmq_errno();
return 0;
}
}
int RecvTillSuccess(ZmqMessage* msg) {
int nbytes = 0;
do {
nbytes = Recv(msg);
} while (nbytes == 0);
return nbytes;
}
private:
ZmqContext context_;
void* ptr_;
};
} // namespace caffe2
#endif // CAFFE2_UTILS_ZMQ_HELPER_H_