# HG changeset patch # User Christian Brabandt # Date 1503082805 -7200 # Node ID 71e10b81226d793881ce7f7fed508c3296f631f3 # Parent 7264e03147e132894efbdf5aca8ae4a11460b1c6 patch 8.0.0957: a terminal job can deadlock when sending many keys commit https://github.com/vim/vim/commit/97bd5e6527bf2b48acdd1550acba161e82a5bc99 Author: Bram Moolenaar Date: Fri Aug 18 20:50:30 2017 +0200 patch 8.0.0957: a terminal job can deadlock when sending many keys Problem: When term_sendkeys() sends many keys it may get stuck in writing to the job. Solution: Make the write non-blocking, buffer keys to be sent. diff --git a/src/channel.c b/src/channel.c --- a/src/channel.c +++ b/src/channel.c @@ -1373,7 +1373,7 @@ can_write_buf_line(channel_T *channel) } /* - * Write any lines to the input channel. + * Write any buffer lines to the input channel. */ static void channel_write_in(channel_T *channel) @@ -1446,6 +1446,25 @@ channel_buffer_free(buf_T *buf) } /* + * Write any lines waiting to be written to "channel". + */ + static void +channel_write_input(channel_T *channel) +{ + chanpart_T *in_part = &channel->ch_part[PART_IN]; + + if (in_part->ch_writeque.wq_next != NULL) + channel_send(channel, PART_IN, (char_u *)"", 0, "channel_write_input"); + else if (in_part->ch_bufref.br_buf != NULL) + { + if (in_part->ch_buf_append) + channel_write_new_lines(in_part->ch_bufref.br_buf); + else + channel_write_in(channel); + } +} + +/* * Write any lines waiting to be written to a channel. */ void @@ -1454,17 +1473,7 @@ channel_write_any_lines(void) channel_T *channel; for (channel = first_channel; channel != NULL; channel = channel->ch_next) - { - chanpart_T *in_part = &channel->ch_part[PART_IN]; - - if (in_part->ch_bufref.br_buf != NULL) - { - if (in_part->ch_buf_append) - channel_write_new_lines(in_part->ch_bufref.br_buf); - else - channel_write_in(channel); - } - } + channel_write_input(channel); } /* @@ -2984,7 +2993,9 @@ channel_fill_wfds(int maxfd_arg, fd_set { chanpart_T *in_part = &ch->ch_part[PART_IN]; - if (in_part->ch_fd != INVALID_FD && in_part->ch_bufref.br_buf != NULL) + if (in_part->ch_fd != INVALID_FD + && (in_part->ch_bufref.br_buf != NULL + || in_part->ch_writeque.wq_next != NULL)) { FD_SET((int)in_part->ch_fd, wfds); if ((int)in_part->ch_fd >= maxfd) @@ -3530,6 +3541,31 @@ channel_handle_events(void) # endif /* + * Set "channel"/"part" to non-blocking. + */ + void +channel_set_nonblock(channel_T *channel, ch_part_T part) +{ + chanpart_T *ch_part = &channel->ch_part[part]; + int fd = ch_part->ch_fd; + + if (fd != INVALID_FD) + { +#ifdef _WIN32 + if (part == PART_SOCK) + { + u_long val = 1; + + ioctlsocket(fd, FIONBIO, &val); + } + else +#endif + fcntl(fd, F_SETFL, O_NONBLOCK); + ch_part->ch_nonblocking = TRUE; + } +} + +/* * Write "buf" (NUL terminated string) to "channel"/"part". * When "fun" is not NULL an error message might be given. * Return FAIL or OK. @@ -3538,14 +3574,16 @@ channel_handle_events(void) channel_send( channel_T *channel, ch_part_T part, - char_u *buf, - int len, + char_u *buf_arg, + int len_arg, char *fun) { int res; sock_T fd; - - fd = channel->ch_part[part].ch_fd; + chanpart_T *ch_part = &channel->ch_part[part]; + int did_use_queue = FALSE; + + fd = ch_part->ch_fd; if (fd == INVALID_FD) { if (!channel->ch_error && fun != NULL) @@ -3561,29 +3599,145 @@ channel_send( { ch_log_lead("SEND ", channel); fprintf(log_fd, "'"); - ignored = (int)fwrite(buf, len, 1, log_fd); + ignored = (int)fwrite(buf_arg, len_arg, 1, log_fd); fprintf(log_fd, "'\n"); fflush(log_fd); did_log_msg = TRUE; } - if (part == PART_SOCK) - res = sock_write(fd, (char *)buf, len); - else - res = fd_write(fd, (char *)buf, len); - if (res != len) + for (;;) { - if (!channel->ch_error && fun != NULL) + writeq_T *wq = &ch_part->ch_writeque; + char_u *buf; + int len; + + if (wq->wq_next != NULL) + { + /* first write what was queued */ + buf = wq->wq_next->wq_ga.ga_data; + len = wq->wq_next->wq_ga.ga_len; + did_use_queue = TRUE; + } + else + { + if (len_arg == 0) + /* nothing to write, called from channel_select_check() */ + return OK; + buf = buf_arg; + len = len_arg; + } + + if (part == PART_SOCK) + res = sock_write(fd, (char *)buf, len); + else + res = fd_write(fd, (char *)buf, len); + if (res < 0 && (errno == EWOULDBLOCK +#ifdef EAGAIN + || errno == EAGAIN +#endif + )) + res = 0; /* nothing got written */ + + if (res >= 0 && ch_part->ch_nonblocking) { - ch_error(channel, "%s(): write failed", fun); - EMSG2(_("E631: %s(): write failed"), fun); + writeq_T *entry = wq->wq_next; + + if (did_use_queue) + ch_log(channel, "Sent %d bytes now", res); + if (res == len) + { + /* Wrote all the buf[len] bytes. */ + if (entry != NULL) + { + /* Remove the entry from the write queue. */ + ga_clear(&entry->wq_ga); + wq->wq_next = entry->wq_next; + if (wq->wq_next == NULL) + wq->wq_prev = NULL; + else + wq->wq_next->wq_prev = NULL; + continue; + } + if (did_use_queue) + ch_log(channel, "Write queue empty"); + } + else + { + /* Wrote only buf[res] bytes, can't write more now. */ + if (entry != NULL) + { + if (res > 0) + { + /* Remove the bytes that were written. */ + mch_memmove(entry->wq_ga.ga_data, + (char *)entry->wq_ga.ga_data + res, + len - res); + entry->wq_ga.ga_len -= res; + } + buf = buf_arg; + len = len_arg; + } + else + { + buf += res; + len -= res; + } + ch_log(channel, "Adding %d bytes to the write queue", len); + + /* Append the not written bytes of the argument to the write + * buffer. Limit entries to 4000 bytes. */ + if (wq->wq_prev != NULL + && wq->wq_prev->wq_ga.ga_len + len < 4000) + { + writeq_T *last = wq->wq_prev; + + /* append to the last entry */ + if (ga_grow(&last->wq_ga, len) == OK) + { + mch_memmove((char *)last->wq_ga.ga_data + + last->wq_ga.ga_len, + buf, len); + last->wq_ga.ga_len += len; + } + } + else + { + writeq_T *last = (writeq_T *)alloc((int)sizeof(writeq_T)); + + if (last != NULL) + { + ch_log(channel, "Creating new entry"); + last->wq_prev = wq->wq_prev; + last->wq_next = NULL; + if (wq->wq_prev == NULL) + wq->wq_next = last; + else + wq->wq_prev->wq_next = last; + wq->wq_prev = last; + ga_init2(&last->wq_ga, 1, 1000); + if (ga_grow(&last->wq_ga, len) == OK) + { + mch_memmove(last->wq_ga.ga_data, buf, len); + last->wq_ga.ga_len = len; + } + } + } + } } - channel->ch_error = TRUE; - return FAIL; + else if (res != len) + { + if (!channel->ch_error && fun != NULL) + { + ch_error(channel, "%s(): write failed", fun); + EMSG2(_("E631: %s(): write failed"), fun); + } + channel->ch_error = TRUE; + return FAIL; + } + + channel->ch_error = FALSE; + return OK; } - - channel->ch_error = FALSE; - return OK; } /* @@ -3873,13 +4027,7 @@ channel_select_check(int ret_in, void *r if (ret > 0 && in_part->ch_fd != INVALID_FD && FD_ISSET(in_part->ch_fd, wfds)) { - if (in_part->ch_buf_append) - { - if (in_part->ch_bufref.br_buf != NULL) - channel_write_new_lines(in_part->ch_bufref.br_buf); - } - else - channel_write_in(channel); + channel_write_input(channel); --ret; } } diff --git a/src/proto/channel.pro b/src/proto/channel.pro --- a/src/proto/channel.pro +++ b/src/proto/channel.pro @@ -35,6 +35,7 @@ char_u *channel_read_block(channel_T *ch void common_channel_read(typval_T *argvars, typval_T *rettv, int raw); channel_T *channel_fd2channel(sock_T fd, ch_part_T *partp); void channel_handle_events(void); +void channel_set_nonblock(channel_T *channel, ch_part_T part); int channel_send(channel_T *channel, ch_part_T part, char_u *buf, int len, char *fun); void ch_expr_common(typval_T *argvars, typval_T *rettv, int eval); void ch_raw_common(typval_T *argvars, typval_T *rettv, int eval); diff --git a/src/structs.h b/src/structs.h --- a/src/structs.h +++ b/src/structs.h @@ -1196,6 +1196,7 @@ typedef struct partial_S partial_T; typedef struct jobvar_S job_T; typedef struct readq_S readq_T; +typedef struct writeq_S writeq_T; typedef struct jsonq_S jsonq_T; typedef struct cbq_S cbq_T; typedef struct channel_S channel_T; @@ -1512,6 +1513,13 @@ struct readq_S readq_T *rq_prev; }; +struct writeq_S +{ + garray_T wq_ga; + writeq_T *wq_next; + writeq_T *wq_prev; +}; + struct jsonq_S { typval_T *jq_value; @@ -1601,6 +1609,8 @@ typedef struct { #endif int ch_block_write; /* for testing: 0 when not used, -1 when write * does not block, 1 simulate blocking */ + int ch_nonblocking; /* write() is non-blocking */ + writeq_T ch_writeque; /* header for write queue */ cbq_T ch_cb_head; /* dummy node for per-request callbacks */ char_u *ch_callback; /* call when a msg is not handled */ diff --git a/src/terminal.c b/src/terminal.c --- a/src/terminal.c +++ b/src/terminal.c @@ -400,6 +400,10 @@ term_start(typval_T *argvar, jobopt_T *o vterm_get_size(term->tl_vterm, &term->tl_rows, &term->tl_cols); term_report_winsize(term, term->tl_rows, term->tl_cols); + /* Make sure we don't get stuck on sending keys to the job, it leads to + * a deadlock if the job is waiting for Vim to read. */ + channel_set_nonblock(term->tl_job->jv_channel, PART_IN); + if (old_curbuf != NULL) { --curbuf->b_nwindows; diff --git a/src/testdir/test_terminal.vim b/src/testdir/test_terminal.vim --- a/src/testdir/test_terminal.vim +++ b/src/testdir/test_terminal.vim @@ -450,3 +450,16 @@ func Test_terminal_list_args() exe buf . 'bwipe!' call assert_equal("", bufname(buf)) endfunction + +func Test_terminal_noblock() + let buf = term_start(&shell) + + for c in ['a','b','c','d','e','f','g','h','i','j','k'] + call term_sendkeys(buf, 'echo ' . repeat(c, 5000) . "\") + endfor + + let g:job = term_getjob(buf) + call Stop_shell_in_terminal(buf) + call term_wait(buf) + bwipe +endfunc diff --git a/src/version.c b/src/version.c --- a/src/version.c +++ b/src/version.c @@ -770,6 +770,8 @@ static char *(features[]) = static int included_patches[] = { /* Add new patch number below this line */ /**/ + 957, +/**/ 956, /**/ 955,