diff options
author | David Timber <mieabby@gmail.com> | 2021-04-09 22:41:00 +1000 |
---|---|---|
committer | David Timber <mieabby@gmail.com> | 2021-04-09 22:56:57 +1000 |
commit | f91419f6b3400a6b73f7d2b3c7b8044b6aa490db (patch) | |
tree | b1a029a8d31a9ae79d6988d7b1cb64875ec0b8a6 /src/proone-hostinfod.c | |
parent | d99b1309b1c699103c26b316a373a64f5342b905 (diff) |
Impl hostinfod, htbt
* htbt: Call `htbt_cncp_do_probe()` right after start
* hostinfod: impl DB thread
Diffstat (limited to 'src/proone-hostinfod.c')
-rw-r--r-- | src/proone-hostinfod.c | 641 |
1 files changed, 384 insertions, 257 deletions
diff --git a/src/proone-hostinfod.c b/src/proone-hostinfod.c index 0940258..f9e2d89 100644 --- a/src/proone-hostinfod.c +++ b/src/proone-hostinfod.c @@ -42,6 +42,11 @@ typedef enum { } client_state_t; typedef struct { + prne_htbt_host_info_t hi; + struct sockaddr_in6 sa; +} db_qe_t; + +typedef struct { struct timespec last_op; prne_iobuf_t ib[2]; int sck; @@ -67,8 +72,18 @@ typedef struct { bool term; } th_ctx_t; +typedef struct { + prne_llist_t q; + pthread_t th; + pthread_mutex_t lock; + pthread_cond_t cv; + MYSQL c; + bool term; +} db_ctx_t; + static const uint16_t DEFCONF_DB_PORT = 3306; static const size_t DEFCONF_MAX_CONN = SIZE_MAX; +static const size_t DEFCONF_DB_Q_SIZE = SIZE_MAX; static const struct timespec DEFCONF_REPORT_INT = { 60, 0 }; static const struct timespec DEFCONF_SCK_OP_TIMEOUT = { 5, 0 }; static const unsigned int DEFCONF_BACKLOG = 10; @@ -92,6 +107,7 @@ struct { char *key_pw; } ssl; size_t max_conn; + size_t db_q_size; struct timespec report_int; struct timespec sck_op_timeout; unsigned int nb_thread; @@ -101,10 +117,7 @@ struct { } prog_conf; struct { - struct { - MYSQL c; - pthread_mutex_t lock; - } db; + db_ctx_t db_ctx; struct { mbedtls_x509_crt ca; mbedtls_x509_crt crt; @@ -142,6 +155,7 @@ static void set_def_prog_param (void) { prog_conf.db.tbl_pfx = prne_dup_str("prne-"); prog_conf.db.port = DEFCONF_DB_PORT; prog_conf.max_conn = DEFCONF_MAX_CONN; + prog_conf.db_q_size = DEFCONF_DB_Q_SIZE; prog_conf.report_int = DEFCONF_REPORT_INT; prog_conf.sck_op_timeout = DEFCONF_SCK_OP_TIMEOUT; prog_conf.nb_thread = get_hwconc(); @@ -254,6 +268,9 @@ static bool yaml_parse_handle_node ( else if (strcmp(ctx->path, "/hostinfod/max_conn") == 0) { ctx->err = sscanf((const char*)val, "%zu", &prog_conf.max_conn) != 1; } + else if (strcmp(ctx->path, "/hostinfod/db_q_size") == 0) { + ctx->err = sscanf((const char*)val, "%zu", &prog_conf.db_q_size) != 1; + } else if (strcmp(ctx->path, "/hostinfod/report_int") == 0) { unsigned long tmp; @@ -445,6 +462,7 @@ static int setup_conf (const char *conf_path) { prne_nstrlen(prog_conf.ssl.path_key) > 0); ERR_BREAK("`ssl.dh` not set", prne_nstrlen(prog_conf.ssl.path_dh) > 0); ERR_BREAK("invalid `max_conn`", prog_conf.max_conn > 0); + ERR_BREAK("invalid `db_q_size`", prog_conf.db_q_size > 0); ERR_BREAK( "invalid `sck_op_timeout`", prne_cmp_timespec(prog_conf.sck_op_timeout, ZERO_TIMESPEC) > 0); @@ -515,9 +533,9 @@ ERR: return -1; } -static void report_mysql_err (void) { +static void report_mysql_err (MYSQL *c) { pthread_mutex_lock(&prog_g.stdio_lock); - fprintf(stderr, "* MySQL: %s\n", mysql_error(&prog_g.db.c)); + fprintf(stderr, "* MySQL: %s\n", mysql_error(c)); pthread_mutex_unlock(&prog_g.stdio_lock); } @@ -546,10 +564,10 @@ static int init_global (void) { } bov = true; - mysql_options(&prog_g.db.c, MYSQL_OPT_RECONNECT, &bov); - mysql_options(&prog_g.db.c, MYSQL_SET_CHARSET_NAME, "utf8"); + mysql_options(&prog_g.db_ctx.c, MYSQL_OPT_RECONNECT, &bov); + mysql_options(&prog_g.db_ctx.c, MYSQL_SET_CHARSET_NAME, "utf8"); if (mysql_real_connect( - &prog_g.db.c, + &prog_g.db_ctx.c, prog_conf.db.host, prog_conf.db.user, prog_conf.db.pw, @@ -559,7 +577,7 @@ static int init_global (void) { CLIENT_MULTI_STATEMENTS) == NULL) { if (prog_conf.verbose >= PRNE_VL_FATAL) { - report_mysql_err(); + report_mysql_err(&prog_g.db_ctx.c); } return 1; } @@ -666,142 +684,6 @@ static void init_signals (void) { signal(SIGTERM, handle_termsig); } -static void free_client_ctx (client_ctx_t *ctx) { - prne_free_iobuf(ctx->ib + 0); - prne_free_iobuf(ctx->ib + 1); - prne_close(ctx->sck); - mbedtls_ssl_free(&ctx->ssl); -} - -static void incre_conn_ctr (const ssize_t n) { - pthread_mutex_lock(&prog_g.conn_ctr.lock); - prog_g.conn_ctr.cnt += n; - pthread_mutex_unlock(&prog_g.conn_ctr.lock); -} - -static prne_llist_entry_t *pop_client_ctx ( - th_ctx_t *ctx, - prne_llist_entry_t *e) -{ - client_ctx_t *c = (client_ctx_t*)e->element; - prne_llist_entry_t *ret; - - free_client_ctx(c); - prne_free(c); - ret = prne_llist_erase(&ctx->c_list, e); - incre_conn_ctr(-1); - - return ret; -} - -static bool resize_pfd_arr (th_ctx_t *ctx, const size_t ny_size) { - void *ny = prne_realloc( - ctx->pfd, - sizeof(struct pollfd), - ny_size); - - if (ny_size > 0 && ny == NULL) { - if (prog_conf.verbose >= PRNE_VL_ERR) { - sync_perror("** resize_pfd_arr()"); - } - return false; - } - ctx->pfd = (struct pollfd*)ny; - return true; -} - -static void client_sync_msg (client_ctx_t *c, const char *msg) { - pthread_mutex_lock(&prog_g.stdio_lock); - fprintf(stderr, "client@%"PRIxPTR": %s\n", (uintptr_t)c, msg); - pthread_mutex_unlock(&prog_g.stdio_lock); -} - -static void client_sync_perror (client_ctx_t *c, const char *msg) { - pthread_mutex_lock(&prog_g.stdio_lock); - fprintf( - stderr, - "client@%"PRIxPTR" %s: %s\n", - (uintptr_t)c, - msg, - strerror(errno)); - pthread_mutex_unlock(&prog_g.stdio_lock); -} - -static void client_sync_mbedtls_err ( - const int err, - const char *msg, - const uintptr_t c) -{ - char str[256]; - - str[0] = 0; - mbedtls_strerror(err, str, sizeof(str)); - pthread_mutex_lock(&prog_g.stdio_lock); - fprintf( - stderr, - "client@%"PRIxPTR" %s: %s\n", - c, - msg, - str); - pthread_mutex_unlock(&prog_g.stdio_lock); -} - -static bool fab_client_status_rsp ( - client_ctx_t *c, - const uint16_t id, - const prne_htbt_status_code_t code, - const int32_t err) -{ - prne_htbt_msg_head_t mh; - prne_htbt_status_t status; - size_t msg_len = 0; - size_t actual; - bool ret; - - prne_htbt_init_msg_head(&mh); - prne_htbt_init_status(&status); - mh.is_rsp = true; - mh.id = id; - mh.op = PRNE_HTBT_OP_STATUS; - status.code = code; - status.err = err; - - prne_htbt_ser_msg_head(NULL, 0, &actual, &mh); - msg_len += actual; - prne_htbt_ser_status(NULL, 0, &actual, &status); - msg_len += actual; - - if (prne_alloc_iobuf(c->ib + 1, msg_len)) { - prne_iobuf_zero(c->ib + 1); - - prne_htbt_ser_msg_head( - c->ib[1].m + c->ib[1].len, - c->ib[1].avail, - &actual, - &mh); - prne_iobuf_shift(c->ib + 1, actual); - prne_htbt_ser_status( - c->ib[1].m + c->ib[1].len, - c->ib[1].avail, - &actual, - &status); - prne_iobuf_shift(c->ib + 1, actual); - - ret = true; - } - else { - ret = false; - if (prog_conf.verbose >= PRNE_VL_ERR) { - client_sync_perror(c, "** proc_client_stream()"); - } - } - - prne_htbt_free_msg_head(&mh); - prne_htbt_free_status(&status); - - return ret; -} - static int build_hostinfo_query_str ( const prne_htbt_host_info_t *hi, const struct sockaddr_in6 *sa, @@ -938,9 +820,15 @@ static int build_hostinfo_query_str ( prog_conf.db.tbl_pfx); } -static bool handle_hostinfo ( - client_ctx_t *client, - const prne_htbt_host_info_t *hi) +static void db_sync_msg (db_ctx_t *c, const char *msg) { + pthread_mutex_lock(&prog_g.stdio_lock); + fprintf(stderr, "db@%"PRIxPTR": %s\n", (uintptr_t)c, msg); + pthread_mutex_unlock(&prog_g.stdio_lock); +} + +static bool handle_db_qe ( + db_ctx_t *ctx, + const db_qe_t *e) { struct { char *cred_id; @@ -959,8 +847,8 @@ static bool handle_hostinfo ( // TRY if (prne_dec_host_cred( - hi->host_cred, - hi->host_cred_len, + e->hi.host_cred, + e->hi.host_cred_len, &hc) == PRNE_HTBT_SER_RC_OK) { unsigned long len; @@ -981,7 +869,7 @@ static bool handle_hostinfo ( qv.cred_id[0] = '\''; len = mysql_real_escape_string( - &prog_g.db.c, + &ctx->c, qv.cred_id + 1, hc.id, cred_l[0]); @@ -990,7 +878,7 @@ static bool handle_hostinfo ( qv.cred_pw[0] = '\''; len = mysql_real_escape_string( - &prog_g.db.c, + &ctx->c, qv.cred_pw + 1, hc.pw, cred_l[1]); @@ -1005,7 +893,7 @@ static bool handle_hostinfo ( } } - arch_str = prne_arch_tostr(hi->arch); + arch_str = prne_arch_tostr(e->hi.arch); if (arch_str != NULL) { const char *sb[] = { "'", arch_str, "'" }; qv.arch = prne_build_str(sb, sizeof(sb)/sizeof(const char*)); @@ -1036,7 +924,7 @@ static bool handle_hostinfo ( pthread_mutex_lock(&prog_g.stdio_lock); fprintf( stderr, - "client@%"PRIxPTR" [%s]:%"PRIu16" hostinfo(" + "db@%"PRIxPTR": hostinfo(" "parent_uptime = %"PRIu64", " "child_uptime = %"PRIu64", " "bne_cnt = %"PRIu64", " @@ -1050,73 +938,71 @@ static bool handle_hostinfo ( "host_cred.pw = %s, " "crash_cnt = %"PRIu32", " "arch = '%s')\n", - (uintptr_t)client, - client->ipaddr_str, - ntohs(client->sa.sin6_port), - hi->parent_uptime, - hi->child_uptime, - hi->bne_cnt, - hi->infect_cnt, - hi->parent_pid, - hi->child_pid, - hi->prog_ver[0], - hi->prog_ver[1], - hi->prog_ver[2], - hi->prog_ver[3], - hi->prog_ver[4], - hi->prog_ver[5], - hi->prog_ver[6], - hi->prog_ver[7], - hi->prog_ver[8], - hi->prog_ver[9], - hi->prog_ver[10], - hi->prog_ver[11], - hi->prog_ver[12], - hi->prog_ver[13], - hi->prog_ver[14], - hi->prog_ver[15], - hi->boot_id[0], - hi->boot_id[1], - hi->boot_id[2], - hi->boot_id[3], - hi->boot_id[4], - hi->boot_id[5], - hi->boot_id[6], - hi->boot_id[7], - hi->boot_id[8], - hi->boot_id[9], - hi->boot_id[10], - hi->boot_id[11], - hi->boot_id[12], - hi->boot_id[13], - hi->boot_id[14], - hi->boot_id[15], - hi->instance_id[0], - hi->instance_id[1], - hi->instance_id[2], - hi->instance_id[3], - hi->instance_id[4], - hi->instance_id[5], - hi->instance_id[6], - hi->instance_id[7], - hi->instance_id[8], - hi->instance_id[9], - hi->instance_id[10], - hi->instance_id[11], - hi->instance_id[12], - hi->instance_id[13], - hi->instance_id[14], - hi->instance_id[15], + (uintptr_t)ctx, + e->hi.parent_uptime, + e->hi.child_uptime, + e->hi.bne_cnt, + e->hi.infect_cnt, + e->hi.parent_pid, + e->hi.child_pid, + e->hi.prog_ver[0], + e->hi.prog_ver[1], + e->hi.prog_ver[2], + e->hi.prog_ver[3], + e->hi.prog_ver[4], + e->hi.prog_ver[5], + e->hi.prog_ver[6], + e->hi.prog_ver[7], + e->hi.prog_ver[8], + e->hi.prog_ver[9], + e->hi.prog_ver[10], + e->hi.prog_ver[11], + e->hi.prog_ver[12], + e->hi.prog_ver[13], + e->hi.prog_ver[14], + e->hi.prog_ver[15], + e->hi.boot_id[0], + e->hi.boot_id[1], + e->hi.boot_id[2], + e->hi.boot_id[3], + e->hi.boot_id[4], + e->hi.boot_id[5], + e->hi.boot_id[6], + e->hi.boot_id[7], + e->hi.boot_id[8], + e->hi.boot_id[9], + e->hi.boot_id[10], + e->hi.boot_id[11], + e->hi.boot_id[12], + e->hi.boot_id[13], + e->hi.boot_id[14], + e->hi.boot_id[15], + e->hi.instance_id[0], + e->hi.instance_id[1], + e->hi.instance_id[2], + e->hi.instance_id[3], + e->hi.instance_id[4], + e->hi.instance_id[5], + e->hi.instance_id[6], + e->hi.instance_id[7], + e->hi.instance_id[8], + e->hi.instance_id[9], + e->hi.instance_id[10], + e->hi.instance_id[11], + e->hi.instance_id[12], + e->hi.instance_id[13], + e->hi.instance_id[14], + e->hi.instance_id[15], pr[0], pr[1], - hi->crash_cnt, + e->hi.crash_cnt, qv.arch); pthread_mutex_unlock(&prog_g.stdio_lock); } f_ret = build_hostinfo_query_str( - hi, - &client->sa, + &e->hi, + &e->sa, qv.cred_id, qv.cred_pw, qv.arch, @@ -1131,42 +1017,38 @@ static bool handle_hostinfo ( } q_len = (size_t)f_ret; build_hostinfo_query_str( - hi, - &client->sa, + &e->hi, + &e->sa, qv.cred_id, qv.cred_pw, qv.arch, q_str, q_len + 1); if (prog_conf.verbose >= PRNE_VL_DBG0 + 2) { - client_sync_msg(client, q_str); + db_sync_msg(ctx, q_str); } ret = true; - pthread_mutex_lock(&prog_g.db.lock); - { - if (mysql_real_query(&prog_g.db.c, q_str, q_len)) { - goto SQL_ERR; + if (mysql_real_query(&ctx->c, q_str, q_len)) { + goto SQL_ERR; + } + while (true) { + f_ret = mysql_next_result(&ctx->c); + if (f_ret == 0) { + continue; } - while (true) { - f_ret = mysql_next_result(&prog_g.db.c); - if (f_ret == 0) { - continue; - } - else if (f_ret == -1) { - break; - } - else { - goto SQL_ERR; - } + else if (f_ret == -1) { + break; } - sql_err = false; -SQL_ERR: - if (sql_err && prog_conf.verbose >= PRNE_VL_ERR) { - report_mysql_err(); + else { + goto SQL_ERR; } } - pthread_mutex_unlock(&prog_g.db.lock); + sql_err = false; +SQL_ERR: + if (sql_err && prog_conf.verbose >= PRNE_VL_ERR) { + report_mysql_err(&ctx->c); + } END: // CATCH prne_free_host_cred(&hc); @@ -1178,6 +1060,233 @@ END: // CATCH return ret; } +static void *db_thread_main (void *ctx_p) { + db_ctx_t *ctx = (db_ctx_t*)ctx_p; + + assert(!mysql_thread_init()); + + if (prog_conf.verbose >= PRNE_VL_DBG0) { + db_sync_msg(ctx, "Loop start."); + } + + while (true) { + db_qe_t *e; + + pthread_mutex_lock(&ctx->lock); + if (ctx->q.size == 0) { + if (ctx->term) { + pthread_mutex_unlock(&ctx->lock); + break; + } + pthread_cond_wait(&ctx->cv, &ctx->lock); + } + if (ctx->q.head != NULL) { + e = (db_qe_t*)ctx->q.head->element; + prne_llist_erase(&ctx->q, ctx->q.head); + } + else { + e = NULL; + } + pthread_mutex_unlock(&ctx->lock); + + if (e != NULL) { + handle_db_qe(ctx, e); + + prne_htbt_free_host_info(&e->hi); + prne_free(e); + } + } + + if (prog_conf.verbose >= PRNE_VL_DBG0) { + db_sync_msg(ctx, "Loop end."); + } + + mysql_thread_end(); + + return NULL; +} + +static void free_client_ctx (client_ctx_t *ctx) { + prne_free_iobuf(ctx->ib + 0); + prne_free_iobuf(ctx->ib + 1); + prne_close(ctx->sck); + mbedtls_ssl_free(&ctx->ssl); +} + +static void incre_conn_ctr (const ssize_t n) { + pthread_mutex_lock(&prog_g.conn_ctr.lock); + prog_g.conn_ctr.cnt += n; + pthread_mutex_unlock(&prog_g.conn_ctr.lock); +} + +static prne_llist_entry_t *pop_client_ctx ( + th_ctx_t *ctx, + prne_llist_entry_t *e) +{ + client_ctx_t *c = (client_ctx_t*)e->element; + prne_llist_entry_t *ret; + + free_client_ctx(c); + prne_free(c); + ret = prne_llist_erase(&ctx->c_list, e); + incre_conn_ctr(-1); + + return ret; +} + +static bool resize_pfd_arr (th_ctx_t *ctx, const size_t ny_size) { + void *ny = prne_realloc( + ctx->pfd, + sizeof(struct pollfd), + ny_size); + + if (ny_size > 0 && ny == NULL) { + if (prog_conf.verbose >= PRNE_VL_ERR) { + sync_perror("** resize_pfd_arr()"); + } + return false; + } + ctx->pfd = (struct pollfd*)ny; + return true; +} + +static void client_sync_msg (client_ctx_t *c, const char *msg) { + pthread_mutex_lock(&prog_g.stdio_lock); + fprintf(stderr, "client@%"PRIxPTR": %s\n", (uintptr_t)c, msg); + pthread_mutex_unlock(&prog_g.stdio_lock); +} + +static void client_sync_perror (client_ctx_t *c, const char *msg) { + pthread_mutex_lock(&prog_g.stdio_lock); + fprintf( + stderr, + "client@%"PRIxPTR" %s: %s\n", + (uintptr_t)c, + msg, + strerror(errno)); + pthread_mutex_unlock(&prog_g.stdio_lock); +} + +static void client_sync_mbedtls_err ( + const int err, + const char *msg, + const uintptr_t c) +{ + char str[256]; + + str[0] = 0; + mbedtls_strerror(err, str, sizeof(str)); + pthread_mutex_lock(&prog_g.stdio_lock); + fprintf( + stderr, + "client@%"PRIxPTR" %s: %s\n", + c, + msg, + str); + pthread_mutex_unlock(&prog_g.stdio_lock); +} + +static bool fab_client_status_rsp ( + client_ctx_t *c, + const uint16_t id, + const prne_htbt_status_code_t code, + const int32_t err) +{ + prne_htbt_msg_head_t mh; + prne_htbt_status_t status; + size_t msg_len = 0; + size_t actual; + bool ret; + + prne_htbt_init_msg_head(&mh); + prne_htbt_init_status(&status); + mh.is_rsp = true; + mh.id = id; + mh.op = PRNE_HTBT_OP_STATUS; + status.code = code; + status.err = err; + + prne_htbt_ser_msg_head(NULL, 0, &actual, &mh); + msg_len += actual; + prne_htbt_ser_status(NULL, 0, &actual, &status); + msg_len += actual; + + if (prne_alloc_iobuf(c->ib + 1, msg_len)) { + prne_iobuf_zero(c->ib + 1); + + prne_htbt_ser_msg_head( + c->ib[1].m + c->ib[1].len, + c->ib[1].avail, + &actual, + &mh); + prne_iobuf_shift(c->ib + 1, actual); + prne_htbt_ser_status( + c->ib[1].m + c->ib[1].len, + c->ib[1].avail, + &actual, + &status); + prne_iobuf_shift(c->ib + 1, actual); + + ret = true; + } + else { + ret = false; + if (prog_conf.verbose >= PRNE_VL_ERR) { + client_sync_perror(c, "** proc_client_stream()"); + } + } + + prne_htbt_free_msg_head(&mh); + prne_htbt_free_status(&status); + + return ret; +} + +static bool queue_hostinfo ( + client_ctx_t *client, + prne_htbt_host_info_t *hi) +{ + bool ret = false; + db_qe_t *qe = (db_qe_t*)prne_malloc(sizeof(db_qe_t), 1); + pthread_mutex_t *lock = NULL; + + if (qe == NULL) { + goto END; + } + + memcpy(&qe->hi, hi, sizeof(prne_htbt_host_info_t)); + memcpy(&qe->sa, &client->sa, sizeof(struct sockaddr_in6)); + + pthread_mutex_lock(&prog_g.db_ctx.lock); + lock = &prog_g.db_ctx.lock; + if (prog_g.db_ctx.q.size > prog_conf.db_q_size) { + if (prog_conf.verbose >= PRNE_VL_WARN) { + client_sync_msg(client, "** DB queue full!"); + } + goto END; + } + if (prne_llist_append(&prog_g.db_ctx.q, (prne_llist_element_t)qe) == NULL) { + if (prog_conf.verbose >= PRNE_VL_ERR) { + client_sync_perror(client, "prne_llist_append()"); + } + goto END; + } + pthread_cond_broadcast(&prog_g.db_ctx.cv); + pthread_mutex_unlock(lock); + lock = NULL; + + qe = NULL; + prne_htbt_init_host_info(hi); + ret = true; +END: + if (lock != NULL) { + pthread_mutex_unlock(lock); + } + prne_free(qe); + + return ret; +} + static int proc_client_hostinfo ( th_ctx_t *ctx, client_ctx_t *c, @@ -1204,9 +1313,9 @@ static int proc_client_hostinfo ( prne_iobuf_shift(c->ib + 0, -(off + actual)); c->proto_state.hi_received = true; c->con_state = CS_CLOSE; - if (!handle_hostinfo(c, &hi)) { + if (!queue_hostinfo(c, &hi)) { if (prog_conf.verbose >= PRNE_VL_ERR) { - client_sync_perror(c, "** handle_hostinfo"); + client_sync_perror(c, "** queue_hostinfo()"); } } break; @@ -1459,7 +1568,7 @@ static int serve_client ( return ret; } -static void thread_tick (th_ctx_t *ctx) { +static void client_thread_tick (th_ctx_t *ctx) { const struct timespec now = prne_gettime(CLOCK_MONOTONIC); nfds_t pfd_ptr; int f_ret; @@ -1621,7 +1730,7 @@ static void thread_tick (th_ctx_t *ctx) { if (f_ret < 0) { if (errno != EINTR) { if (prog_conf.verbose >= PRNE_VL_FATAL) { - sync_perror("*** poll()@thread_tick()"); + sync_perror("*** poll()@client_thread_tick()"); } abort(); } @@ -1691,15 +1800,14 @@ ERR: // CATCH prne_llist_clear(&ctx->p_list); } -static void *thread_main (void *ctx_p) { +static void *client_thread_main (void *ctx_p) { th_ctx_t *ctx = (th_ctx_t*)ctx_p; - assert(!mysql_thread_init()); if (prog_conf.verbose >= PRNE_VL_DBG0) { pthread_mutex_lock(&prog_g.stdio_lock); fprintf( stderr, - "th@%"PRIxPTR" initialised. Loop start.\n", + "c_th@%"PRIxPTR": Loop start.\n", (uintptr_t)ctx); pthread_mutex_unlock(&prog_g.stdio_lock); } @@ -1721,14 +1829,14 @@ static void *thread_main (void *ctx_p) { abort(); } - thread_tick(ctx); + client_thread_tick(ctx); } if (prog_conf.verbose >= PRNE_VL_DBG0) { pthread_mutex_lock(&prog_g.stdio_lock); fprintf( stderr, - "th@%"PRIxPTR" loop end.\n", + "c_th@%"PRIxPTR": Loop end.\n", (uintptr_t)ctx); pthread_mutex_unlock(&prog_g.stdio_lock); } @@ -1749,6 +1857,15 @@ static int init_threads ( return 1; } + if ((errno = pthread_create( + &prog_g.db_ctx.th, + NULL, + db_thread_main, + &prog_g.db_ctx)) != 0) + { + return 1; + } + for (size_t i = 0; i < in_cnt; i += 1) { th_ctx_t *th_ctx = arr + i; @@ -1780,7 +1897,7 @@ static int init_threads ( if ((errno = pthread_create( &th_ctx->th, NULL, - thread_main, + client_thread_main, th_ctx)) != 0) { return 1; @@ -1800,7 +1917,6 @@ static void join_threads (th_ctx_t **arr, const size_t cnt) { write(ctx->ihcp[1], &sewage, 1); pthread_mutex_unlock(&ctx->lock); } - for (size_t i = 0; i < cnt; i += 1) { th_ctx_t *ctx = *arr + i; @@ -1811,9 +1927,14 @@ static void join_threads (th_ctx_t **arr, const size_t cnt) { prne_free_llist(&ctx->c_list); prne_free_rnd(&ctx->rnd); } - prne_free(*arr); *arr = NULL; + + pthread_mutex_lock(&prog_g.db_ctx.lock); + prog_g.db_ctx.term = true; + pthread_cond_broadcast(&prog_g.db_ctx.cv); + pthread_mutex_unlock(&prog_g.db_ctx.lock); + pthread_join(prog_g.db_ctx.th, NULL); } static void pass_client_conn ( @@ -1902,7 +2023,7 @@ static void pass_client_conn ( fprintf( stderr, "New client from [%s]:%"PRIu16" " - "client@%"PRIxPTR", th@%"PRIxPTR", fd:%d\n", + "client@%"PRIxPTR", c_th@%"PRIxPTR", fd:%d\n", c_ctx->ipaddr_str, ntohs(sa->sin6_port), (uintptr_t)c_ctx, @@ -1946,7 +2067,8 @@ int main (const int argc, const char **args) { return 1; } - mysql_init(&prog_g.db.c); + mysql_init(&prog_g.db_ctx.c); + prne_init_llist(&prog_g.db_ctx.q); mbedtls_x509_crt_init(&prog_g.ssl.ca); mbedtls_x509_crt_init(&prog_g.ssl.crt); mbedtls_pk_init(&prog_g.ssl.key); @@ -1955,9 +2077,11 @@ int main (const int argc, const char **args) { mbedtls_entropy_init(&prog_g.ssl.entropy); mbedtls_ctr_drbg_init(&prog_g.ssl.ctr_drbg); - if ((errno = pthread_mutex_init(&prog_g.db.lock, NULL)) != 0 || + if ((errno = pthread_mutex_init(&prog_g.db_ctx.lock, NULL)) != 0 || (errno = pthread_mutex_init(&prog_g.stdio_lock, NULL)) != 0 || - (errno = pthread_mutex_init(&prog_g.conn_ctr.lock, NULL)) != 0) + (errno = pthread_mutex_init(&prog_g.conn_ctr.lock, NULL)) != 0 || + (errno = pthread_mutex_init(&prog_g.db_ctx.lock, NULL)) != 0 || + (errno = pthread_cond_init(&prog_g.db_ctx.cv, NULL)) != 0) { if (prog_conf.verbose >= PRNE_VL_FATAL) { perror("*** pthread_mutex_init()"); @@ -1992,7 +2116,7 @@ int main (const int argc, const char **args) { if (prog_conf.verbose >= PRNE_VL_DBG0) { pthread_mutex_lock(&prog_g.stdio_lock); - fprintf(stderr, "Initialisation complete. Loop start.\n"); + fprintf(stderr, "Initialisation complete. Servicing ...\n"); pthread_mutex_unlock(&prog_g.stdio_lock); } @@ -2048,10 +2172,12 @@ END: // CATCH prne_close(fd); - mysql_close(&prog_g.db.c); - pthread_mutex_destroy(&prog_g.db.lock); + mysql_close(&prog_g.db_ctx.c); + pthread_mutex_destroy(&prog_g.db_ctx.lock); pthread_mutex_destroy(&prog_g.conn_ctr.lock); pthread_mutex_destroy(&prog_g.stdio_lock); + pthread_mutex_destroy(&prog_g.db_ctx.lock); + pthread_cond_destroy(&prog_g.db_ctx.cv); mbedtls_ssl_config_free(&prog_g.ssl.conf); mbedtls_x509_crt_free(&prog_g.ssl.ca); mbedtls_x509_crt_free(&prog_g.ssl.crt); @@ -2059,6 +2185,7 @@ END: // CATCH mbedtls_dhm_free(&prog_g.ssl.dh); mbedtls_ctr_drbg_free(&prog_g.ssl.ctr_drbg); mbedtls_entropy_free(&prog_g.ssl.entropy); + prne_free_llist(&prog_g.db_ctx.q); free_conf(); prne_close(sigpipe[0]); |