aboutsummaryrefslogtreecommitdiff
path: root/src/heartbeat-worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/heartbeat-worker.c')
-rw-r--r--src/heartbeat-worker.c159
1 files changed, 159 insertions, 0 deletions
diff --git a/src/heartbeat-worker.c b/src/heartbeat-worker.c
new file mode 100644
index 0000000..e3e4acd
--- /dev/null
+++ b/src/heartbeat-worker.c
@@ -0,0 +1,159 @@
+#include "heartbeat-worker.h"
+#include "dvault.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+
+#define DECL_CTX_PTR(p) hb_w_ctx_t*ctx=(hb_w_ctx_t*)p
+
+typedef struct hb_w_ctx hb_w_ctx_t;
+
+struct hb_w_ctx {
+ int fd;
+ int domain;
+ uint8_t rcv_buf[256];
+ bool finalised;
+};
+
+static const uint16_t HEARTBEAT_DEFAULT_BIND_PORT = 55420;
+
+static void heartbeat_worker_free (void *in_ctx) {
+ DECL_CTX_PTR(in_ctx);
+ close(ctx->fd);
+ free(ctx);
+}
+
+static void heartbeat_worker_fin (void *in_ctx) {
+ DECL_CTX_PTR(in_ctx);
+ ctx->finalised = true;
+}
+
+static void heartbeat_worker_work (void *in_ctx, const prne_worker_sched_info_t *sched_info, prne_worker_sched_req_t *sched_req) {
+ DECL_CTX_PTR(in_ctx);
+
+ if (sched_req->pollfd_ready) {
+ const short revents = sched_req->pollfd_arr[0].revents;
+
+ if (revents & (POLLERR | POLLHUP | POLLNVAL)) {
+ ctx->finalised = true;
+ sched_req->flags = PRNE_WORKER_SCHED_FLAG_NONE;
+ return;
+ }
+ if (revents & POLLIN) {
+ socklen_t addr_len;
+
+ // TODO
+
+ if (ctx->domain == AF_INET) {
+ struct sockaddr_in remote_addr;
+
+ addr_len = sizeof(struct sockaddr_in);
+ if (recvfrom(ctx->fd, ctx->rcv_buf, sizeof(ctx->rcv_buf), 0, (struct sockaddr*)&remote_addr, &addr_len) == sizeof(ctx->rcv_buf)) {
+ prne_dvault_invert_mem(sizeof(ctx->rcv_buf) - 1, ctx->rcv_buf + 1, ctx->rcv_buf[0]);
+ sendto(ctx->fd, ctx->rcv_buf + 1, sizeof(ctx->rcv_buf) - 1, 0, (const struct sockaddr*)&remote_addr, addr_len);
+ }
+ }
+ else {
+ struct sockaddr_in6 remote_addr;
+
+ addr_len = sizeof(struct sockaddr_in6);
+ if (recvfrom(ctx->fd, ctx->rcv_buf, sizeof(ctx->rcv_buf), 0, (struct sockaddr*)&remote_addr, &addr_len) == sizeof(ctx->rcv_buf)) {
+ prne_dvault_invert_mem(sizeof(ctx->rcv_buf) - 1, ctx->rcv_buf + 1, ctx->rcv_buf[0]);
+ sendto(ctx->fd, ctx->rcv_buf + 1, sizeof(ctx->rcv_buf) - 1, 0, (const struct sockaddr*)&remote_addr, addr_len);
+ }
+ }
+ }
+ }
+
+ sched_req->flags = PRNE_WORKER_SCHED_FLAG_POLL;
+ sched_req->mem_func.alloc(sched_req, 1);
+ sched_req->pollfd_arr[0].fd = ctx->fd;
+ sched_req->pollfd_arr[0].events = POLLIN;
+}
+
+static bool heartbeat_worker_has_finalised (void *in_ctx) {
+ DECL_CTX_PTR(in_ctx);
+ return ctx->finalised;
+}
+
+
+bool prne_alloc_heartbeat_worker (prne_worker_t *w) {
+ bool ret = true;
+ hb_w_ctx_t *ctx = NULL;
+
+ ctx = (hb_w_ctx_t*)malloc(sizeof(hb_w_ctx_t));
+ if (ctx == NULL) {
+ ret = false;
+ goto END;
+ }
+ ctx->fd = -1;
+ ctx->domain = 0;
+ ctx->finalised = false;
+
+ ctx->fd = socket(AF_INET6, SOCK_DGRAM, 0);
+ if (ctx->fd < 0) {
+ ctx->fd = socket(AF_INET, SOCK_DGRAM, 0);
+
+ if (ctx->fd < 0) {
+ ret = false;
+ goto END;
+ }
+ ctx->domain = AF_INET;
+ }
+ else {
+ ctx->domain = AF_INET6;
+ }
+
+ if (fcntl(ctx->fd, F_SETFL, O_NONBLOCK) < 0) {
+ ret = false;
+ goto END;
+ }
+
+ if (ctx->domain == AF_INET) {
+ struct sockaddr_in local_addr;
+
+ memset(&local_addr, 0, sizeof(struct sockaddr_in));
+ local_addr.sin_family = AF_INET;
+ local_addr.sin_port = htons(HEARTBEAT_DEFAULT_BIND_PORT);
+ local_addr.sin_addr.s_addr = INADDR_ANY;
+
+ if (bind(ctx->fd, (const struct sockaddr*)&local_addr, sizeof(struct sockaddr_in)) < 0) {
+ ret = false;
+ goto END;
+ }
+ }
+ else {
+ struct sockaddr_in6 local_addr;
+
+ memset(&local_addr, 0, sizeof(struct sockaddr_in6));
+ local_addr.sin6_family = AF_INET6;
+ local_addr.sin6_port = htons(HEARTBEAT_DEFAULT_BIND_PORT);
+
+ if (bind(ctx->fd, (const struct sockaddr*)&local_addr, sizeof(struct sockaddr_in6)) < 0) {
+ ret = false;
+ goto END;
+ }
+ }
+
+ w->ctx = ctx;
+ w->free = heartbeat_worker_free;
+ w->fin = heartbeat_worker_fin;
+ w->work = heartbeat_worker_work;
+ w->has_finalised = heartbeat_worker_has_finalised;
+
+END:
+ if (!ret) {
+ if (ctx != NULL) {
+ close(ctx->fd);
+ }
+ free(ctx);
+ }
+
+ return ret;
+}