changeset 12154:71e10b81226d v8.0.0957

patch 8.0.0957: a terminal job can deadlock when sending many keys commit https://github.com/vim/vim/commit/97bd5e6527bf2b48acdd1550acba161e82a5bc99 Author: Bram Moolenaar <Bram@vim.org> Date: Fri Aug 18 20:50:30 2017 +0200 patch 8.0.0957: a terminal job can deadlock when sending many keys Problem: When term_sendkeys() sends many keys it may get stuck in writing to the job. Solution: Make the write non-blocking, buffer keys to be sent.
author Christian Brabandt <cb@256bit.org>
date Fri, 18 Aug 2017 21:00:05 +0200
parents 7264e03147e1
children e5bb354c7177
files src/channel.c src/proto/channel.pro src/structs.h src/terminal.c src/testdir/test_terminal.vim src/version.c
diffstat 6 files changed, 216 insertions(+), 38 deletions(-) [+]
line wrap: on
line diff
--- a/src/channel.c
+++ b/src/channel.c
@@ -1373,7 +1373,7 @@ can_write_buf_line(channel_T *channel)
 }
 
 /*
- * Write any lines to the input channel.
+ * Write any buffer lines to the input channel.
  */
     static void
 channel_write_in(channel_T *channel)
@@ -1446,6 +1446,25 @@ channel_buffer_free(buf_T *buf)
 }
 
 /*
+ * Write any lines waiting to be written to "channel".
+ */
+    static void
+channel_write_input(channel_T *channel)
+{
+    chanpart_T	*in_part = &channel->ch_part[PART_IN];
+
+    if (in_part->ch_writeque.wq_next != NULL)
+	channel_send(channel, PART_IN, (char_u *)"", 0, "channel_write_input");
+    else if (in_part->ch_bufref.br_buf != NULL)
+    {
+	if (in_part->ch_buf_append)
+	    channel_write_new_lines(in_part->ch_bufref.br_buf);
+	else
+	    channel_write_in(channel);
+    }
+}
+
+/*
  * Write any lines waiting to be written to a channel.
  */
     void
@@ -1454,17 +1473,7 @@ channel_write_any_lines(void)
     channel_T	*channel;
 
     for (channel = first_channel; channel != NULL; channel = channel->ch_next)
-    {
-	chanpart_T  *in_part = &channel->ch_part[PART_IN];
-
-	if (in_part->ch_bufref.br_buf != NULL)
-	{
-	    if (in_part->ch_buf_append)
-		channel_write_new_lines(in_part->ch_bufref.br_buf);
-	    else
-		channel_write_in(channel);
-	}
-    }
+	channel_write_input(channel);
 }
 
 /*
@@ -2984,7 +2993,9 @@ channel_fill_wfds(int maxfd_arg, fd_set 
     {
 	chanpart_T  *in_part = &ch->ch_part[PART_IN];
 
-	if (in_part->ch_fd != INVALID_FD && in_part->ch_bufref.br_buf != NULL)
+	if (in_part->ch_fd != INVALID_FD
+		&& (in_part->ch_bufref.br_buf != NULL
+		    || in_part->ch_writeque.wq_next != NULL))
 	{
 	    FD_SET((int)in_part->ch_fd, wfds);
 	    if ((int)in_part->ch_fd >= maxfd)
@@ -3530,6 +3541,31 @@ channel_handle_events(void)
 # endif
 
 /*
+ * Set "channel"/"part" to non-blocking.
+ */
+    void
+channel_set_nonblock(channel_T *channel, ch_part_T part)
+{
+    chanpart_T *ch_part = &channel->ch_part[part];
+    int fd = ch_part->ch_fd;
+
+    if (fd != INVALID_FD)
+    {
+#ifdef _WIN32
+	if (part == PART_SOCK)
+	{
+	    u_long	val = 1;
+
+	    ioctlsocket(fd, FIONBIO, &val);
+	}
+	else
+#endif
+	    fcntl(fd, F_SETFL, O_NONBLOCK);
+	ch_part->ch_nonblocking = TRUE;
+    }
+}
+
+/*
  * Write "buf" (NUL terminated string) to "channel"/"part".
  * When "fun" is not NULL an error message might be given.
  * Return FAIL or OK.
@@ -3538,14 +3574,16 @@ channel_handle_events(void)
 channel_send(
 	channel_T *channel,
 	ch_part_T part,
-	char_u	  *buf,
-	int	  len,
+	char_u	  *buf_arg,
+	int	  len_arg,
 	char	  *fun)
 {
     int		res;
     sock_T	fd;
-
-    fd = channel->ch_part[part].ch_fd;
+    chanpart_T	*ch_part = &channel->ch_part[part];
+    int		did_use_queue = FALSE;
+
+    fd = ch_part->ch_fd;
     if (fd == INVALID_FD)
     {
 	if (!channel->ch_error && fun != NULL)
@@ -3561,29 +3599,145 @@ channel_send(
     {
 	ch_log_lead("SEND ", channel);
 	fprintf(log_fd, "'");
-	ignored = (int)fwrite(buf, len, 1, log_fd);
+	ignored = (int)fwrite(buf_arg, len_arg, 1, log_fd);
 	fprintf(log_fd, "'\n");
 	fflush(log_fd);
 	did_log_msg = TRUE;
     }
 
-    if (part == PART_SOCK)
-	res = sock_write(fd, (char *)buf, len);
-    else
-	res = fd_write(fd, (char *)buf, len);
-    if (res != len)
+    for (;;)
     {
-	if (!channel->ch_error && fun != NULL)
+	writeq_T    *wq = &ch_part->ch_writeque;
+	char_u	    *buf;
+	int	    len;
+
+	if (wq->wq_next != NULL)
+	{
+	    /* first write what was queued */
+	    buf = wq->wq_next->wq_ga.ga_data;
+	    len = wq->wq_next->wq_ga.ga_len;
+	    did_use_queue = TRUE;
+	}
+	else
+	{
+	    if (len_arg == 0)
+		/* nothing to write, called from channel_select_check() */
+		return OK;
+	    buf = buf_arg;
+	    len = len_arg;
+	}
+
+	if (part == PART_SOCK)
+	    res = sock_write(fd, (char *)buf, len);
+	else
+	    res = fd_write(fd, (char *)buf, len);
+	if (res < 0 && (errno == EWOULDBLOCK
+#ifdef EAGAIN
+			|| errno == EAGAIN
+#endif
+		    ))
+	    res = 0; /* nothing got written */
+
+	if (res >= 0 && ch_part->ch_nonblocking)
 	{
-	    ch_error(channel, "%s(): write failed", fun);
-	    EMSG2(_("E631: %s(): write failed"), fun);
+	    writeq_T *entry = wq->wq_next;
+
+	    if (did_use_queue)
+		ch_log(channel, "Sent %d bytes now", res);
+	    if (res == len)
+	    {
+		/* Wrote all the buf[len] bytes. */
+		if (entry != NULL)
+		{
+		    /* Remove the entry from the write queue. */
+		    ga_clear(&entry->wq_ga);
+		    wq->wq_next = entry->wq_next;
+		    if (wq->wq_next == NULL)
+			wq->wq_prev = NULL;
+		    else
+			wq->wq_next->wq_prev = NULL;
+		    continue;
+		}
+		if (did_use_queue)
+		    ch_log(channel, "Write queue empty");
+	    }
+	    else
+	    {
+		/* Wrote only buf[res] bytes, can't write more now. */
+		if (entry != NULL)
+		{
+		    if (res > 0)
+		    {
+			/* Remove the bytes that were written. */
+			mch_memmove(entry->wq_ga.ga_data,
+				    (char *)entry->wq_ga.ga_data + res,
+				    len - res);
+			entry->wq_ga.ga_len -= res;
+		    }
+		    buf = buf_arg;
+		    len = len_arg;
+		}
+		else
+		{
+		    buf += res;
+		    len -= res;
+		}
+		ch_log(channel, "Adding %d bytes to the write queue", len);
+
+		/* Append the not written bytes of the argument to the write
+		 * buffer.  Limit entries to 4000 bytes. */
+		if (wq->wq_prev != NULL
+			&& wq->wq_prev->wq_ga.ga_len + len < 4000)
+		{
+		    writeq_T *last = wq->wq_prev;
+
+		    /* append to the last entry */
+		    if (ga_grow(&last->wq_ga, len) == OK)
+		    {
+			mch_memmove((char *)last->wq_ga.ga_data
+							  + last->wq_ga.ga_len,
+				    buf, len);
+			last->wq_ga.ga_len += len;
+		    }
+		}
+		else
+		{
+		    writeq_T *last = (writeq_T *)alloc((int)sizeof(writeq_T));
+
+		    if (last != NULL)
+		    {
+		ch_log(channel, "Creating new entry");
+			last->wq_prev = wq->wq_prev;
+			last->wq_next = NULL;
+			if (wq->wq_prev == NULL)
+			    wq->wq_next = last;
+			else
+			    wq->wq_prev->wq_next = last;
+			wq->wq_prev = last;
+			ga_init2(&last->wq_ga, 1, 1000);
+			if (ga_grow(&last->wq_ga, len) == OK)
+			{
+			    mch_memmove(last->wq_ga.ga_data, buf, len);
+			    last->wq_ga.ga_len = len;
+			}
+		    }
+		}
+	    }
 	}
-	channel->ch_error = TRUE;
-	return FAIL;
+	else if (res != len)
+	{
+	    if (!channel->ch_error && fun != NULL)
+	    {
+		ch_error(channel, "%s(): write failed", fun);
+		EMSG2(_("E631: %s(): write failed"), fun);
+	    }
+	    channel->ch_error = TRUE;
+	    return FAIL;
+	}
+
+	channel->ch_error = FALSE;
+	return OK;
     }
-
-    channel->ch_error = FALSE;
-    return OK;
 }
 
 /*
@@ -3873,13 +4027,7 @@ channel_select_check(int ret_in, void *r
 	if (ret > 0 && in_part->ch_fd != INVALID_FD
 					    && FD_ISSET(in_part->ch_fd, wfds))
 	{
-	    if (in_part->ch_buf_append)
-	    {
-		if (in_part->ch_bufref.br_buf != NULL)
-		    channel_write_new_lines(in_part->ch_bufref.br_buf);
-	    }
-	    else
-		channel_write_in(channel);
+	    channel_write_input(channel);
 	    --ret;
 	}
     }
--- a/src/proto/channel.pro
+++ b/src/proto/channel.pro
@@ -35,6 +35,7 @@ char_u *channel_read_block(channel_T *ch
 void common_channel_read(typval_T *argvars, typval_T *rettv, int raw);
 channel_T *channel_fd2channel(sock_T fd, ch_part_T *partp);
 void channel_handle_events(void);
+void channel_set_nonblock(channel_T *channel, ch_part_T part);
 int channel_send(channel_T *channel, ch_part_T part, char_u *buf, int len, char *fun);
 void ch_expr_common(typval_T *argvars, typval_T *rettv, int eval);
 void ch_raw_common(typval_T *argvars, typval_T *rettv, int eval);
--- a/src/structs.h
+++ b/src/structs.h
@@ -1196,6 +1196,7 @@ typedef struct partial_S partial_T;
 
 typedef struct jobvar_S job_T;
 typedef struct readq_S readq_T;
+typedef struct writeq_S writeq_T;
 typedef struct jsonq_S jsonq_T;
 typedef struct cbq_S cbq_T;
 typedef struct channel_S channel_T;
@@ -1512,6 +1513,13 @@ struct readq_S
     readq_T	*rq_prev;
 };
 
+struct writeq_S
+{
+    garray_T	wq_ga;
+    writeq_T	*wq_next;
+    writeq_T	*wq_prev;
+};
+
 struct jsonq_S
 {
     typval_T	*jq_value;
@@ -1601,6 +1609,8 @@ typedef struct {
 #endif
     int		ch_block_write;	/* for testing: 0 when not used, -1 when write
 				 * does not block, 1 simulate blocking */
+    int		ch_nonblocking;	/* write() is non-blocking */
+    writeq_T	ch_writeque;	/* header for write queue */
 
     cbq_T	ch_cb_head;	/* dummy node for per-request callbacks */
     char_u	*ch_callback;	/* call when a msg is not handled */
--- a/src/terminal.c
+++ b/src/terminal.c
@@ -400,6 +400,10 @@ term_start(typval_T *argvar, jobopt_T *o
 	vterm_get_size(term->tl_vterm, &term->tl_rows, &term->tl_cols);
 	term_report_winsize(term, term->tl_rows, term->tl_cols);
 
+	/* Make sure we don't get stuck on sending keys to the job, it leads to
+	 * a deadlock if the job is waiting for Vim to read. */
+	channel_set_nonblock(term->tl_job->jv_channel, PART_IN);
+
 	if (old_curbuf != NULL)
 	{
 	    --curbuf->b_nwindows;
--- a/src/testdir/test_terminal.vim
+++ b/src/testdir/test_terminal.vim
@@ -450,3 +450,16 @@ func Test_terminal_list_args()
   exe buf . 'bwipe!'
   call assert_equal("", bufname(buf))
 endfunction
+
+func Test_terminal_noblock()
+  let buf = term_start(&shell)
+
+  for c in ['a','b','c','d','e','f','g','h','i','j','k']
+    call term_sendkeys(buf, 'echo ' . repeat(c, 5000) . "\<cr>")
+  endfor
+
+  let g:job = term_getjob(buf)
+  call Stop_shell_in_terminal(buf)
+  call term_wait(buf)
+  bwipe
+endfunc
--- a/src/version.c
+++ b/src/version.c
@@ -770,6 +770,8 @@ static char *(features[]) =
 static int included_patches[] =
 {   /* Add new patch number below this line */
 /**/
+    957,
+/**/
     956,
 /**/
     955,