changeset 10259:a09db7a4afe0 v8.0.0027

commit https://github.com/vim/vim/commit/dc0ccaee68ca24d10050117fbec757ad33590a17 Author: Bram Moolenaar <Bram@vim.org> Date: Sun Oct 9 17:28:01 2016 +0200 patch 8.0.0027 Problem: A channel is closed when reading on stderr or stdout fails, but there may still be something to read on another part. Solution: Turn ch_to_be_closed into a bitfield. (Ozaki Kiichi)
author Christian Brabandt <cb@256bit.org>
date Sun, 09 Oct 2016 17:30:04 +0200
parents a99c7e02b61a
children 2fff06882079
files src/channel.c src/eval.c src/proto/channel.pro src/structs.h src/testdir/test_channel.vim src/version.c
diffstat 6 files changed, 161 insertions(+), 135 deletions(-) [+]
line wrap: on
line diff
--- a/src/channel.c
+++ b/src/channel.c
@@ -54,7 +54,7 @@
 # define fd_close(sd) close(sd)
 #endif
 
-static void channel_read(channel_T *channel, int part, char *func);
+static void channel_read(channel_T *channel, ch_part_T part, char *func);
 
 /* Whether a redraw is needed for appending a line to a buffer. */
 static int channel_need_redraw = FALSE;
@@ -309,7 +309,7 @@ static int next_ch_id = 0;
     channel_T *
 add_channel(void)
 {
-    int		part;
+    ch_part_T	part;
     channel_T	*channel = (channel_T *)alloc_clear((int)sizeof(channel_T));
 
     if (channel == NULL)
@@ -318,7 +318,7 @@ add_channel(void)
     channel->ch_id = next_ch_id++;
     ch_log(channel, "Created channel");
 
-    for (part = PART_SOCK; part <= PART_IN; ++part)
+    for (part = PART_SOCK; part < PART_COUNT; ++part)
     {
 	channel->ch_part[part].ch_fd = INVALID_FD;
 #ifdef FEAT_GUI_X11
@@ -421,9 +421,7 @@ channel_free(channel_T *channel)
     if (!in_free_unref_items)
     {
 	if (safe_to_invoke_callback == 0)
-	{
 	    channel->ch_to_be_freed = TRUE;
-	}
 	else
 	{
 	    channel_free_contents(channel);
@@ -511,7 +509,7 @@ free_unused_channels(int copyID, int mas
 channel_read_fd(int fd)
 {
     channel_T	*channel;
-    int		part;
+    ch_part_T	part;
 
     channel = channel_fd2channel(fd, &part);
     if (channel == NULL)
@@ -557,7 +555,7 @@ messageFromServer(gpointer clientData,
 #endif
 
     static void
-channel_gui_register_one(channel_T *channel, int part)
+channel_gui_register_one(channel_T *channel, ch_part_T part)
 {
     if (!CH_HAS_GUI)
 	return;
@@ -627,7 +625,7 @@ channel_gui_register_all(void)
 }
 
     static void
-channel_gui_unregister_one(channel_T *channel, int part)
+channel_gui_unregister_one(channel_T *channel, ch_part_T part)
 {
 # ifdef FEAT_GUI_X11
     if (channel->ch_part[part].ch_inputHandler != (XtInputId)NULL)
@@ -653,7 +651,7 @@ channel_gui_unregister_one(channel_T *ch
     static void
 channel_gui_unregister(channel_T *channel)
 {
-    int	    part;
+    ch_part_T	part;
 
     for (part = PART_SOCK; part < PART_IN; ++part)
 	channel_gui_unregister_one(channel, part);
@@ -928,6 +926,7 @@ channel_open(
     channel->ch_nb_close_cb = nb_close_cb;
     channel->ch_hostname = (char *)vim_strsave((char_u *)hostname);
     channel->ch_port = port_in;
+    channel->ch_to_be_closed |= (1 << PART_SOCK);
 
 #ifdef FEAT_GUI
     channel_gui_register_one(channel, PART_SOCK);
@@ -998,12 +997,19 @@ theend:
 }
 
     static void
-may_close_part(sock_T *fd)
+ch_close_part(channel_T *channel, ch_part_T part)
 {
+    sock_T *fd = &channel->ch_part[part].ch_fd;
+
     if (*fd != INVALID_FD)
     {
-	fd_close(*fd);
+	if (part == PART_SOCK)
+	    sock_close(*fd);
+	else
+	    fd_close(*fd);
 	*fd = INVALID_FD;
+
+	channel->ch_to_be_closed &= ~(1 << part);
     }
 }
 
@@ -1012,7 +1018,7 @@ channel_set_pipes(channel_T *channel, so
 {
     if (in != INVALID_FD)
     {
-	may_close_part(&channel->CH_IN_FD);
+	ch_close_part(channel, PART_IN);
 	channel->CH_IN_FD = in;
     }
     if (out != INVALID_FD)
@@ -1020,8 +1026,9 @@ channel_set_pipes(channel_T *channel, so
 # if defined(FEAT_GUI)
 	channel_gui_unregister_one(channel, PART_OUT);
 # endif
-	may_close_part(&channel->CH_OUT_FD);
+	ch_close_part(channel, PART_OUT);
 	channel->CH_OUT_FD = out;
+	channel->ch_to_be_closed |= (1 << PART_OUT);
 # if defined(FEAT_GUI)
 	channel_gui_register_one(channel, PART_OUT);
 # endif
@@ -1031,8 +1038,9 @@ channel_set_pipes(channel_T *channel, so
 # if defined(FEAT_GUI)
 	channel_gui_unregister_one(channel, PART_ERR);
 # endif
-	may_close_part(&channel->CH_ERR_FD);
+	ch_close_part(channel, PART_ERR);
 	channel->CH_ERR_FD = err;
+	channel->ch_to_be_closed |= (1 << PART_ERR);
 # if defined(FEAT_GUI)
 	channel_gui_register_one(channel, PART_ERR);
 # endif
@@ -1151,10 +1159,10 @@ set_callback(
     void
 channel_set_options(channel_T *channel, jobopt_T *opt)
 {
-    int		part;
+    ch_part_T	part;
 
     if (opt->jo_set & JO_MODE)
-	for (part = PART_SOCK; part <= PART_IN; ++part)
+	for (part = PART_SOCK; part < PART_COUNT; ++part)
 	    channel->ch_part[part].ch_mode = opt->jo_mode;
     if (opt->jo_set & JO_IN_MODE)
 	channel->ch_part[PART_IN].ch_mode = opt->jo_in_mode;
@@ -1164,7 +1172,7 @@ channel_set_options(channel_T *channel, 
 	channel->ch_part[PART_ERR].ch_mode = opt->jo_err_mode;
 
     if (opt->jo_set & JO_TIMEOUT)
-	for (part = PART_SOCK; part <= PART_IN; ++part)
+	for (part = PART_SOCK; part < PART_COUNT; ++part)
 	    channel->ch_part[part].ch_timeout = opt->jo_timeout;
     if (opt->jo_set & JO_OUT_TIMEOUT)
 	channel->ch_part[PART_OUT].ch_timeout = opt->jo_out_timeout;
@@ -1282,7 +1290,7 @@ channel_set_options(channel_T *channel, 
     void
 channel_set_req_callback(
 	channel_T   *channel,
-	int	    part,
+	ch_part_T   part,
 	char_u	    *callback,
 	partial_T   *partial,
 	int	    id)
@@ -1448,7 +1456,7 @@ channel_write_in(channel_T *channel)
 	ch_log(channel, "Finished writing all lines to channel");
 
 	/* Close the pipe/socket, so that the other side gets EOF. */
-	may_close_part(&channel->CH_IN_FD);
+	ch_close_part(channel, PART_IN);
     }
     else
 	ch_logn(channel, "Still %d more lines to write",
@@ -1462,10 +1470,10 @@ channel_write_in(channel_T *channel)
 channel_buffer_free(buf_T *buf)
 {
     channel_T	*channel;
-    int		part;
+    ch_part_T	part;
 
     for (channel = first_channel; channel != NULL; channel = channel->ch_next)
-	for (part = PART_SOCK; part <= PART_IN; ++part)
+	for (part = PART_SOCK; part < PART_COUNT; ++part)
 	{
 	    chanpart_T  *ch_part = &channel->ch_part[part];
 
@@ -1574,7 +1582,7 @@ invoke_callback(channel_T *channel, char
  * Returns NULL if there is nothing.
  */
     readq_T *
-channel_peek(channel_T *channel, int part)
+channel_peek(channel_T *channel, ch_part_T part)
 {
     readq_T *head = &channel->ch_part[part].ch_head;
 
@@ -1604,7 +1612,7 @@ channel_first_nl(readq_T *node)
  * Returns NULL if there is nothing.
  */
     char_u *
-channel_get(channel_T *channel, int part)
+channel_get(channel_T *channel, ch_part_T part)
 {
     readq_T *head = &channel->ch_part[part].ch_head;
     readq_T *node = head->rq_next;
@@ -1628,7 +1636,7 @@ channel_get(channel_T *channel, int part
  * Replaces NUL bytes with NL.
  */
     static char_u *
-channel_get_all(channel_T *channel, int part)
+channel_get_all(channel_T *channel, ch_part_T part)
 {
     readq_T *head = &channel->ch_part[part].ch_head;
     readq_T *node = head->rq_next;
@@ -1677,7 +1685,7 @@ channel_get_all(channel_T *channel, int 
  * Caller must check these bytes are available.
  */
     void
-channel_consume(channel_T *channel, int part, int len)
+channel_consume(channel_T *channel, ch_part_T part, int len)
 {
     readq_T *head = &channel->ch_part[part].ch_head;
     readq_T *node = head->rq_next;
@@ -1693,7 +1701,7 @@ channel_consume(channel_T *channel, int 
  * When "want_nl" is TRUE collapse more buffers until a NL is found.
  */
     int
-channel_collapse(channel_T *channel, int part, int want_nl)
+channel_collapse(channel_T *channel, ch_part_T part, int want_nl)
 {
     readq_T *head = &channel->ch_part[part].ch_head;
     readq_T *node = head->rq_next;
@@ -1753,7 +1761,7 @@ channel_collapse(channel_T *channel, int
  * Returns OK or FAIL.
  */
     static int
-channel_save(channel_T *channel, int part, char_u *buf, int len,
+channel_save(channel_T *channel, ch_part_T part, char_u *buf, int len,
 						      int prepend, char *lead)
 {
     readq_T *node;
@@ -1828,7 +1836,7 @@ channel_save(channel_T *channel, int par
 channel_fill(js_read_T *reader)
 {
     channel_T	*channel = (channel_T *)reader->js_cookie;
-    int		part = reader->js_cookie_arg;
+    ch_part_T	part = reader->js_cookie_arg;
     char_u	*next = channel_get(channel, part);
     int		unused;
     int		len;
@@ -1866,7 +1874,7 @@ channel_fill(js_read_T *reader)
  * Return TRUE if there is more to read.
  */
     static int
-channel_parse_json(channel_T *channel, int part)
+channel_parse_json(channel_T *channel, ch_part_T part)
 {
     js_read_T	reader;
     typval_T	listtv;
@@ -2046,7 +2054,7 @@ remove_json_node(jsonq_T *head, jsonq_T 
  * Return FAIL otherwise.
  */
     static int
-channel_get_json(channel_T *channel, int part, int id, typval_T **rettv)
+channel_get_json(channel_T *channel, ch_part_T part, int id, typval_T **rettv)
 {
     jsonq_T   *head = &channel->ch_part[part].ch_json_head;
     jsonq_T   *item = head->jq_next;
@@ -2080,7 +2088,7 @@ channel_get_json(channel_T *channel, int
  * "argv[1]" etc. have further arguments, type is VAR_UNKNOWN if missing.
  */
     static void
-channel_exe_cmd(channel_T *channel, int part, typval_T *argv)
+channel_exe_cmd(channel_T *channel, ch_part_T part, typval_T *argv)
 {
     char_u  *cmd = argv[0].vval.v_string;
     char_u  *arg;
@@ -2237,7 +2245,7 @@ invoke_one_time_callback(
 }
 
     static void
-append_to_buffer(buf_T *buffer, char_u *msg, channel_T *channel, int part)
+append_to_buffer(buf_T *buffer, char_u *msg, channel_T *channel, ch_part_T part)
 {
     buf_T	*save_curbuf = curbuf;
     linenr_T    lnum = buffer->b_ml.ml_line_count;
@@ -2332,7 +2340,7 @@ append_to_buffer(buf_T *buffer, char_u *
 }
 
     static void
-drop_messages(channel_T *channel, int part)
+drop_messages(channel_T *channel, ch_part_T part)
 {
     char_u *msg;
 
@@ -2349,7 +2357,7 @@ drop_messages(channel_T *channel, int pa
  * Return TRUE when a message was handled, there might be another one.
  */
     static int
-may_invoke_callback(channel_T *channel, int part)
+may_invoke_callback(channel_T *channel, ch_part_T part)
 {
     char_u	*msg = NULL;
     typval_T	*listtv = NULL;
@@ -2596,7 +2604,7 @@ channel_is_open(channel_T *channel)
  * Return TRUE if "channel" has JSON or other typeahead.
  */
     static int
-channel_has_readahead(channel_T *channel, int part)
+channel_has_readahead(channel_T *channel, ch_part_T part)
 {
     ch_mode_T	ch_mode = channel->ch_part[part].ch_mode;
 
@@ -2617,7 +2625,7 @@ channel_has_readahead(channel_T *channel
     char *
 channel_status(channel_T *channel, int req_part)
 {
-    int part;
+    ch_part_T part;
     int has_readahead = FALSE;
 
     if (channel == NULL)
@@ -2640,7 +2648,7 @@ channel_status(channel_T *channel, int r
     {
 	if (channel_is_open(channel))
 	    return "open";
-	for (part = PART_SOCK; part <= PART_ERR; ++part)
+	for (part = PART_SOCK; part < PART_IN; ++part)
 	    if (channel_has_readahead(channel, part))
 	    {
 		has_readahead = TRUE;
@@ -2654,7 +2662,7 @@ channel_status(channel_T *channel, int r
 }
 
     static void
-channel_part_info(channel_T *channel, dict_T *dict, char *name, int part)
+channel_part_info(channel_T *channel, dict_T *dict, char *name, ch_part_T part)
 {
     chanpart_T *chanpart = &channel->ch_part[part];
     char	namebuf[20];  /* longest is "sock_timeout" */
@@ -2736,28 +2744,24 @@ channel_close(channel_T *channel, int in
     channel_gui_unregister(channel);
 #endif
 
-    if (channel->CH_SOCK_FD != INVALID_FD)
-    {
-	sock_close(channel->CH_SOCK_FD);
-	channel->CH_SOCK_FD = INVALID_FD;
-    }
-    may_close_part(&channel->CH_IN_FD);
-    may_close_part(&channel->CH_OUT_FD);
-    may_close_part(&channel->CH_ERR_FD);
+    ch_close_part(channel, PART_SOCK);
+    ch_close_part(channel, PART_IN);
+    ch_close_part(channel, PART_OUT);
+    ch_close_part(channel, PART_ERR);
 
     if (invoke_close_cb && channel->ch_close_cb != NULL)
     {
 	  typval_T	argv[1];
 	  typval_T	rettv;
 	  int		dummy;
-	  int		part;
+	  ch_part_T	part;
 
 	  /* Invoke callbacks before the close callback, since it's weird to
 	   * first invoke the close callback.  Increment the refcount to avoid
 	   * the channel being freed halfway. */
 	  ++channel->ch_refcount;
 	  ch_log(channel, "Invoking callbacks before closing");
-	  for (part = PART_SOCK; part <= PART_ERR; ++part)
+	  for (part = PART_SOCK; part < PART_IN; ++part)
 	      while (may_invoke_callback(channel, part))
 		  ;
 
@@ -2789,7 +2793,7 @@ channel_close(channel_T *channel, int in
 	  }
 
 	  /* any remaining messages are useless now */
-	  for (part = PART_SOCK; part <= PART_ERR; ++part)
+	  for (part = PART_SOCK; part < PART_IN; ++part)
 	      drop_messages(channel, part);
     }
 
@@ -2802,14 +2806,14 @@ channel_close(channel_T *channel, int in
     void
 channel_close_in(channel_T *channel)
 {
-    may_close_part(&channel->CH_IN_FD);
+    ch_close_part(channel, PART_IN);
 }
 
 /*
  * Clear the read buffer on "channel"/"part".
  */
     static void
-channel_clear_one(channel_T *channel, int part)
+channel_clear_one(channel_T *channel, ch_part_T part)
 {
     jsonq_T *json_head = &channel->ch_part[part].ch_json_head;
     cbq_T   *cb_head = &channel->ch_part[part].ch_cb_head;
@@ -3043,11 +3047,20 @@ channel_wait(channel_T *channel, sock_T 
 }
 
     static void
-channel_close_on_error(channel_T *channel, char *func)
+ch_close_part_on_error(
+	channel_T *channel, ch_part_T part, int is_err, char *func)
 {
-    /* Do not call emsg(), most likely the other end just exited. */
-    ch_errors(channel, "%s(): Cannot read from channel, will close it soon",
-									func);
+    char	msgbuf[80];
+
+    vim_snprintf(msgbuf, sizeof(msgbuf),
+	    "%%s(): Read %s from ch_part[%d], closing",
+					    (is_err ? "error" : "EOF"), part);
+
+    if (is_err)
+	/* Do not call emsg(), most likely the other end just exited. */
+	ch_errors(channel, msgbuf, func);
+    else
+	ch_logs(channel, msgbuf, func);
 
     /* Queue a "DETACH" netbeans message in the command queue in order to
      * terminate the netbeans session later. Do not end the session here
@@ -3064,21 +3077,20 @@ channel_close_on_error(channel_T *channe
 	channel_save(channel, PART_SOCK, (char_u *)DETACH_MSG_RAW,
 			      (int)STRLEN(DETACH_MSG_RAW), FALSE, "PUT ");
 
-    /* When reading from stdout is not possible, assume the other side has
-     * died.  Don't close the channel right away, it may be the wrong moment
-     * to invoke callbacks. */
-    channel->ch_to_be_closed = TRUE;
+    /* When reading is not possible close this part of the channel.  Don't
+     * close the channel yet, there may be something to read on another part. */
+    ch_close_part(channel, part);
 
 #ifdef FEAT_GUI
     /* Stop listening to GUI events right away. */
-    channel_gui_unregister(channel);
+    channel_gui_unregister_one(channel, part);
 #endif
 }
 
     static void
 channel_close_now(channel_T *channel)
 {
-    ch_log(channel, "Closing channel because of previous read error");
+    ch_log(channel, "Closing channel because all readable fds are closed");
     channel_close(channel, TRUE);
     if (channel->ch_nb_close_cb != NULL)
 	(*channel->ch_nb_close_cb)();
@@ -3090,7 +3102,7 @@ channel_close_now(channel_T *channel)
  * The data is put in the read queue.  No callbacks are invoked here.
  */
     static void
-channel_read(channel_T *channel, int part, char *func)
+channel_read(channel_T *channel, ch_part_T part, char *func)
 {
     static char_u	*buf = NULL;
     int			len = 0;
@@ -3098,14 +3110,11 @@ channel_read(channel_T *channel, int par
     sock_T		fd;
     int			use_socket = FALSE;
 
-    /* If we detected a read error don't try reading again. */
-    if (channel->ch_to_be_closed)
-	return;
-
     fd = channel->ch_part[part].ch_fd;
     if (fd == INVALID_FD)
     {
-	ch_error(channel, "channel_read() called while socket is closed");
+	ch_errors(channel, "channel_read() called while %s part is closed",
+							    part_names[part]);
 	return;
     }
     use_socket = fd == channel->CH_SOCK_FD;
@@ -3141,7 +3150,7 @@ channel_read(channel_T *channel, int par
 
     /* Reading a disconnection (readlen == 0), or an error. */
     if (readlen <= 0)
-	channel_close_on_error(channel, func);
+	ch_close_part_on_error(channel, part, (len < 0), func);
 
 #if defined(CH_HAS_GUI) && defined(FEAT_GUI_GTK)
     /* signal the main loop that there is something to read */
@@ -3157,7 +3166,7 @@ channel_read(channel_T *channel, int par
  * Returns NULL in case of error or timeout.
  */
     char_u *
-channel_read_block(channel_T *channel, int part, int timeout)
+channel_read_block(channel_T *channel, ch_part_T part, int timeout)
 {
     char_u	*buf;
     char_u	*msg;
@@ -3237,7 +3246,7 @@ channel_read_block(channel_T *channel, i
     int
 channel_read_json_block(
 	channel_T   *channel,
-	int	    part,
+	ch_part_T   part,
 	int	    timeout_arg,
 	int	    id,
 	typval_T    **rettv)
@@ -3323,7 +3332,7 @@ channel_read_json_block(
 common_channel_read(typval_T *argvars, typval_T *rettv, int raw)
 {
     channel_T	*channel;
-    int		part = -1;
+    ch_part_T	part = PART_COUNT;
     jobopt_T	opt;
     int		mode;
     int		timeout;
@@ -3344,7 +3353,7 @@ common_channel_read(typval_T *argvars, t
     channel = get_channel_arg(&argvars[0], TRUE, TRUE, part);
     if (channel != NULL)
     {
-	if (part < 0)
+	if (part == PART_COUNT)
 	    part = channel_part_read(channel);
 	mode = channel_get_mode(channel, part);
 	timeout = channel_get_timeout(channel, part);
@@ -3382,10 +3391,10 @@ theend:
  * Returns NULL when the socket isn't found.
  */
     channel_T *
-channel_fd2channel(sock_T fd, int *partp)
+channel_fd2channel(sock_T fd, ch_part_T *partp)
 {
     channel_T	*channel;
-    int		part;
+    ch_part_T	part;
 
     if (fd != INVALID_FD)
 	for (channel = first_channel; channel != NULL;
@@ -3411,17 +3420,13 @@ channel_fd2channel(sock_T fd, int *partp
 channel_handle_events(void)
 {
     channel_T	*channel;
-    int		part;
+    ch_part_T	part;
     sock_T	fd;
 
     for (channel = first_channel; channel != NULL; channel = channel->ch_next)
     {
-	/* If we detected a read error don't try reading again. */
-	if (channel->ch_to_be_closed)
-	    continue;
-
 	/* check the socket and pipes */
-	for (part = PART_SOCK; part <= PART_ERR; ++part)
+	for (part = PART_SOCK; part < PART_IN; ++part)
 	{
 	    fd = channel->ch_part[part].ch_fd;
 	    if (fd != INVALID_FD)
@@ -3431,7 +3436,8 @@ channel_handle_events(void)
 		if (r == CW_READY)
 		    channel_read(channel, part, "channel_handle_events");
 		else if (r == CW_ERROR)
-		    channel_close_on_error(channel, "channel_handle_events()");
+		    ch_close_part_on_error(channel, part, TRUE,
+						     "channel_handle_events");
 	    }
 	}
     }
@@ -3444,7 +3450,7 @@ channel_handle_events(void)
  * Return FAIL or OK.
  */
     int
-channel_send(channel_T *channel, int part, char_u *buf, int len, char *fun)
+channel_send(channel_T *channel, ch_part_T part, char_u *buf, int len, char *fun)
 {
     int		res;
     sock_T	fd;
@@ -3496,7 +3502,7 @@ channel_send(channel_T *channel, int par
  * Sets "part_read" to the read fd.
  * Otherwise returns NULL.
  */
-    channel_T *
+    static channel_T *
 send_common(
 	typval_T    *argvars,
 	char_u	    *text,
@@ -3504,10 +3510,10 @@ send_common(
 	int	    eval,
 	jobopt_T    *opt,
 	char	    *fun,
-	int	    *part_read)
+	ch_part_T   *part_read)
 {
     channel_T	*channel;
-    int		part_send;
+    ch_part_T	part_send;
 
     clear_job_options(opt);
     channel = get_channel_arg(&argvars[0], TRUE, FALSE, 0);
@@ -3550,8 +3556,8 @@ ch_expr_common(typval_T *argvars, typval
     channel_T	*channel;
     int		id;
     ch_mode_T	ch_mode;
-    int		part_send;
-    int		part_read;
+    ch_part_T	part_send;
+    ch_part_T	part_read;
     jobopt_T    opt;
     int		timeout;
 
@@ -3610,7 +3616,7 @@ ch_raw_common(typval_T *argvars, typval_
     char_u	buf[NUMBUFLEN];
     char_u	*text;
     channel_T	*channel;
-    int		part_read;
+    ch_part_T	part_read;
     jobopt_T    opt;
     int		timeout;
 
@@ -3644,7 +3650,7 @@ channel_poll_setup(int nfd_in, void *fds
     int		nfd = nfd_in;
     channel_T	*channel;
     struct	pollfd *fds = fds_in;
-    int		part;
+    ch_part_T	part;
 
     for (channel = first_channel; channel != NULL; channel = channel->ch_next)
     {
@@ -3678,7 +3684,7 @@ channel_poll_check(int ret_in, void *fds
     int		ret = ret_in;
     channel_T	*channel;
     struct	pollfd *fds = fds_in;
-    int		part;
+    ch_part_T	part;
     int		idx;
     chanpart_T	*in_part;
 
@@ -3725,7 +3731,7 @@ channel_select_setup(int maxfd_in, void 
     channel_T	*channel;
     fd_set	*rfds = rfds_in;
     fd_set	*wfds = wfds_in;
-    int		part;
+    ch_part_T	part;
 
     for (channel = first_channel; channel != NULL; channel = channel->ch_next)
     {
@@ -3757,7 +3763,7 @@ channel_select_check(int ret_in, void *r
     channel_T	*channel;
     fd_set	*rfds = rfds_in;
     fd_set	*wfds = wfds_in;
-    int		part;
+    ch_part_T	part;
     chanpart_T	*in_part;
 
     for (channel = first_channel; channel != NULL; channel = channel->ch_next)
@@ -3803,7 +3809,7 @@ channel_parse_messages(void)
     channel_T	*channel = first_channel;
     int		ret = FALSE;
     int		r;
-    int		part = PART_SOCK;
+    ch_part_T	part = PART_SOCK;
 
     ++safe_to_invoke_callback;
 
@@ -3816,9 +3822,9 @@ channel_parse_messages(void)
     }
     while (channel != NULL)
     {
-	if (channel->ch_to_be_closed)
+	if (channel->ch_to_be_closed == 0)
 	{
-	    channel->ch_to_be_closed = FALSE;
+	    channel->ch_to_be_closed = (1 << PART_COUNT);
 	    channel_close_now(channel);
 	    /* channel may have been freed, start over */
 	    channel = first_channel;
@@ -3840,7 +3846,7 @@ channel_parse_messages(void)
 	    continue;
 	}
 	if (channel->ch_part[part].ch_fd != INVALID_FD
-		|| channel_has_readahead(channel, part))
+				      || channel_has_readahead(channel, part))
 	{
 	    /* Increase the refcount, in case the handler causes the channel
 	     * to be unreferenced or closed. */
@@ -3899,7 +3905,7 @@ set_ref_in_channel(int copyID)
 /*
  * Return the "part" to write to for "channel".
  */
-    int
+    ch_part_T
 channel_part_send(channel_T *channel)
 {
     if (channel->CH_SOCK_FD == INVALID_FD)
@@ -3910,7 +3916,7 @@ channel_part_send(channel_T *channel)
 /*
  * Return the default "part" to read from for "channel".
  */
-    int
+    ch_part_T
 channel_part_read(channel_T *channel)
 {
     if (channel->CH_SOCK_FD == INVALID_FD)
@@ -3923,7 +3929,7 @@ channel_part_read(channel_T *channel)
  * If "channel" is invalid returns MODE_JSON.
  */
     ch_mode_T
-channel_get_mode(channel_T *channel, int part)
+channel_get_mode(channel_T *channel, ch_part_T part)
 {
     if (channel == NULL)
 	return MODE_JSON;
@@ -3934,7 +3940,7 @@ channel_get_mode(channel_T *channel, int
  * Return the timeout of "channel"/"part"
  */
     int
-channel_get_timeout(channel_T *channel, int part)
+channel_get_timeout(channel_T *channel, ch_part_T part)
 {
     return channel->ch_part[part].ch_timeout;
 }
@@ -3962,7 +3968,7 @@ handle_mode(typval_T *item, jobopt_T *op
 }
 
     static int
-handle_io(typval_T *item, int part, jobopt_T *opt)
+handle_io(typval_T *item, ch_part_T part, jobopt_T *opt)
 {
     char_u	*val = get_tv_string(item);
 
@@ -4045,7 +4051,7 @@ get_job_options(typval_T *tv, jobopt_T *
     dict_T	*dict;
     int		todo;
     hashitem_T	*hi;
-    int		part;
+    ch_part_T	part;
 
     opt->jo_set = 0;
     if (tv->v_type == VAR_UNKNOWN)
@@ -4343,10 +4349,10 @@ get_job_options(typval_T *tv, jobopt_T *
  * Returns NULL if the handle is invalid.
  * When "check_open" is TRUE check that the channel can be used.
  * When "reading" is TRUE "check_open" considers typeahead useful.
- * "part" is used to check typeahead, when -1 use the default part.
+ * "part" is used to check typeahead, when PART_COUNT use the default part.
  */
     channel_T *
-get_channel_arg(typval_T *tv, int check_open, int reading, int part)
+get_channel_arg(typval_T *tv, int check_open, int reading, ch_part_T part)
 {
     channel_T	*channel = NULL;
     int		has_readahead = FALSE;
@@ -4367,7 +4373,7 @@ get_channel_arg(typval_T *tv, int check_
     }
     if (channel != NULL && reading)
 	has_readahead = channel_has_readahead(channel,
-			       part >= 0 ? part : channel_part_read(channel));
+		       part != PART_COUNT ? part : channel_part_read(channel));
 
     if (check_open && (channel == NULL || (!channel_is_open(channel)
 					     && !(reading && has_readahead))))
@@ -4659,7 +4665,7 @@ job_start(typval_T *argvars)
     garray_T	ga;
 #endif
     jobopt_T	opt;
-    int		part;
+    ch_part_T	part;
 
     job = job_alloc();
     if (job == NULL)
@@ -4679,7 +4685,7 @@ job_start(typval_T *argvars)
 	goto theend;
 
     /* Check that when io is "file" that there is a file name. */
-    for (part = PART_OUT; part <= PART_IN; ++part)
+    for (part = PART_OUT; part < PART_COUNT; ++part)
 	if ((opt.jo_set & (JO_OUT_IO << (part - PART_OUT)))
 		&& opt.jo_io[part] == JIO_FILE
 		&& (!(opt.jo_set & (JO_OUT_NAME << (part - PART_OUT)))
--- a/src/eval.c
+++ b/src/eval.c
@@ -5622,7 +5622,7 @@ set_ref_in_item(
     else if (tv->v_type == VAR_CHANNEL)
     {
 	channel_T   *ch =tv->vval.v_channel;
-	int	    part;
+	ch_part_T   part;
 	typval_T    dtv;
 	jsonq_T	    *jq;
 	cbq_T	    *cq;
@@ -5630,7 +5630,7 @@ set_ref_in_item(
 	if (ch != NULL && ch->ch_copyID != copyID)
 	{
 	    ch->ch_copyID = copyID;
-	    for (part = PART_SOCK; part <= PART_IN; ++part)
+	    for (part = PART_SOCK; part < PART_COUNT; ++part)
 	    {
 		for (jq = ch->ch_part[part].ch_json_head.jq_next; jq != NULL;
 							     jq = jq->jq_next)
--- a/src/proto/channel.pro
+++ b/src/proto/channel.pro
@@ -14,15 +14,15 @@ channel_T *channel_open_func(typval_T *a
 void channel_set_pipes(channel_T *channel, sock_T in, sock_T out, sock_T err);
 void channel_set_job(channel_T *channel, job_T *job, jobopt_T *options);
 void channel_set_options(channel_T *channel, jobopt_T *opt);
-void channel_set_req_callback(channel_T *channel, int part, char_u *callback, partial_T *partial, int id);
+void channel_set_req_callback(channel_T *channel, ch_part_T part, char_u *callback, partial_T *partial, int id);
 void channel_buffer_free(buf_T *buf);
 void channel_write_any_lines(void);
 void channel_write_new_lines(buf_T *buf);
-readq_T *channel_peek(channel_T *channel, int part);
+readq_T *channel_peek(channel_T *channel, ch_part_T part);
 char_u *channel_first_nl(readq_T *node);
-char_u *channel_get(channel_T *channel, int part);
-void channel_consume(channel_T *channel, int part, int len);
-int channel_collapse(channel_T *channel, int part, int want_nl);
+char_u *channel_get(channel_T *channel, ch_part_T part);
+void channel_consume(channel_T *channel, ch_part_T part, int len);
+int channel_collapse(channel_T *channel, ch_part_T part, int want_nl);
 int channel_can_write_to(channel_T *channel);
 int channel_is_open(channel_T *channel);
 char *channel_status(channel_T *channel, int req_part);
@@ -31,13 +31,12 @@ void channel_close(channel_T *channel, i
 void channel_close_in(channel_T *channel);
 void channel_clear(channel_T *channel);
 void channel_free_all(void);
-char_u *channel_read_block(channel_T *channel, int part, int timeout);
-int channel_read_json_block(channel_T *channel, int part, int timeout_arg, int id, typval_T **rettv);
+char_u *channel_read_block(channel_T *channel, ch_part_T part, int timeout);
+int channel_read_json_block(channel_T *channel, ch_part_T part, int timeout_arg, int id, typval_T **rettv);
 void common_channel_read(typval_T *argvars, typval_T *rettv, int raw);
-channel_T *channel_fd2channel(sock_T fd, int *partp);
+channel_T *channel_fd2channel(sock_T fd, ch_part_T *partp);
 void channel_handle_events(void);
-int channel_send(channel_T *channel, int part, char_u *buf, int len, char *fun);
-channel_T *send_common(typval_T *argvars, char_u *text, int id, int eval, jobopt_T *opt, char *fun, int *part_read);
+int channel_send(channel_T *channel, ch_part_T part, char_u *buf, int len, char *fun);
 void ch_expr_common(typval_T *argvars, typval_T *rettv, int eval);
 void ch_raw_common(typval_T *argvars, typval_T *rettv, int eval);
 int channel_poll_setup(int nfd_in, void *fds_in);
@@ -46,14 +45,14 @@ int channel_select_setup(int maxfd_in, v
 int channel_select_check(int ret_in, void *rfds_in, void *wfds_in);
 int channel_parse_messages(void);
 int set_ref_in_channel(int copyID);
-int channel_part_send(channel_T *channel);
-int channel_part_read(channel_T *channel);
-ch_mode_T channel_get_mode(channel_T *channel, int part);
-int channel_get_timeout(channel_T *channel, int part);
+ch_part_T channel_part_send(channel_T *channel);
+ch_part_T channel_part_read(channel_T *channel);
+ch_mode_T channel_get_mode(channel_T *channel, ch_part_T part);
+int channel_get_timeout(channel_T *channel, ch_part_T part);
 void clear_job_options(jobopt_T *opt);
 void free_job_options(jobopt_T *opt);
 int get_job_options(typval_T *tv, jobopt_T *opt, int supported);
-channel_T *get_channel_arg(typval_T *tv, int check_open, int reading, int part);
+channel_T *get_channel_arg(typval_T *tv, int check_open, int reading, ch_part_T part);
 void job_free_all(void);
 int set_ref_in_job(int copyID);
 void job_unref(job_T *job);
--- a/src/structs.h
+++ b/src/structs.h
@@ -1499,19 +1499,21 @@ typedef enum {
 
 /* Ordering matters, it is used in for loops: IN is last, only SOCK/OUT/ERR
  * are polled. */
-#define PART_SOCK   0
+typedef enum {
+    PART_SOCK = 0,
 #define CH_SOCK_FD	ch_part[PART_SOCK].ch_fd
-
 #ifdef FEAT_JOB_CHANNEL
-# define INVALID_FD  (-1)
-
-# define PART_OUT   1
-# define PART_ERR   2
-# define PART_IN    3
+    PART_OUT,
 # define CH_OUT_FD	ch_part[PART_OUT].ch_fd
+    PART_ERR,
 # define CH_ERR_FD	ch_part[PART_ERR].ch_fd
+    PART_IN,
 # define CH_IN_FD	ch_part[PART_IN].ch_fd
 #endif
+    PART_COUNT
+} ch_part_T;
+
+#define INVALID_FD	(-1)
 
 /* The per-fd info for a channel. */
 typedef struct {
@@ -1566,14 +1568,14 @@ struct channel_S {
     int		ch_id;		/* ID of the channel */
     int		ch_last_msg_id;	/* ID of the last message */
 
-    chanpart_T	ch_part[4];	/* info for socket, out, err and in */
+    chanpart_T	ch_part[PART_COUNT]; /* info for socket, out, err and in */
 
     char	*ch_hostname;	/* only for socket, allocated */
     int		ch_port;	/* only for socket */
 
-    int		ch_to_be_closed; /* When TRUE reading or writing failed and
-				  * the channel must be closed when it's safe
-				  * to invoke callbacks. */
+    int		ch_to_be_closed; /* bitset of readable fds to be closed.
+				  * When all readable fds have been closed,
+				  * set to (1 << PART_COUNT). */
     int		ch_to_be_freed; /* When TRUE channel must be freed when it's
 				 * safe to invoke callbacks. */
     int		ch_error;	/* When TRUE an error was reported.  Avoids
--- a/src/testdir/test_channel.vim
+++ b/src/testdir/test_channel.vim
@@ -1505,6 +1505,23 @@ func Test_read_nonl_line()
   call assert_equal(3, g:linecount)
 endfunc
 
+func Test_read_from_terminated_job()
+  if !has('job')
+    return
+  endif
+
+  let g:linecount = 0
+  if has('win32')
+    " workaround: 'shellescape' does improper escaping double quotes 
+    let arg = 'import os,sys;os.close(1);sys.stderr.write(\"test\n\")'
+  else
+    let arg = 'import os,sys;os.close(1);sys.stderr.write("test\n")'
+  endif
+  call job_start([s:python, '-c', arg], {'callback': 'MyLineCountCb'})
+  call WaitFor('1 <= g:linecount')
+  call assert_equal(1, g:linecount)
+endfunc
+
 function Ch_test_close_lambda(port)
   let handle = ch_open('localhost:' . a:port, s:chopt)
   if ch_status(handle) == "fail"
--- a/src/version.c
+++ b/src/version.c
@@ -765,6 +765,8 @@ static char *(features[]) =
 static int included_patches[] =
 {   /* Add new patch number below this line */
 /**/
+    27,
+/**/
     26,
 /**/
     25,