diff src/channel.c @ 10426:acfc83aca8ee v8.0.0107

commit https://github.com/vim/vim/commit/958dc6923d341390531888058495569d73c356c3 Author: Bram Moolenaar <Bram@vim.org> Date: Thu Dec 1 15:34:12 2016 +0100 patch 8.0.0107 Problem: When reading channel output in a timer, messages may go missing. (Skywind) Solution: Add the "drop" option. Write error messages in the channel log. Don't have ch_canread() check for the channel being open.
author Christian Brabandt <cb@256bit.org>
date Thu, 01 Dec 2016 15:45:04 +0100
parents e664ee056a84
children ea7fbae33285
line wrap: on
line diff
--- a/src/channel.c
+++ b/src/channel.c
@@ -1195,6 +1195,7 @@ channel_set_options(channel_T *channel, 
     if (opt->jo_set & JO_CLOSE_CALLBACK)
 	set_callback(&channel->ch_close_cb, &channel->ch_close_partial,
 		opt->jo_close_cb, opt->jo_close_partial);
+    channel->ch_drop_never = opt->jo_drop_never;
 
     if ((opt->jo_set & JO_OUT_IO) && opt->jo_io[PART_OUT] == JIO_BUFFER)
     {
@@ -1918,6 +1919,7 @@ channel_parse_json(channel_T *channel, c
 		clear_tv(&listtv);
 	    else
 	    {
+		item->jq_no_callback = FALSE;
 		item->jq_value = alloc_tv();
 		if (item->jq_value == NULL)
 		{
@@ -2050,11 +2052,17 @@ remove_json_node(jsonq_T *head, jsonq_T 
  * When "id" is positive it must match the first number in the list.
  * When "id" is zero or negative jut get the first message.  But not the one
  * with id ch_block_id.
+ * When "without_callback" is TRUE also get messages that were pushed back.
  * Return OK when found and return the value in "rettv".
  * Return FAIL otherwise.
  */
     static int
-channel_get_json(channel_T *channel, ch_part_T part, int id, typval_T **rettv)
+channel_get_json(
+	channel_T   *channel,
+	ch_part_T   part,
+	int	    id,
+	int	    without_callback,
+	typval_T    **rettv)
 {
     jsonq_T   *head = &channel->ch_part[part].ch_json_head;
     jsonq_T   *item = head->jq_next;
@@ -2064,10 +2072,11 @@ channel_get_json(channel_T *channel, ch_
 	list_T	    *l = item->jq_value->vval.v_list;
 	typval_T    *tv = &l->lv_first->li_tv;
 
-	if ((id > 0 && tv->v_type == VAR_NUMBER && tv->vval.v_number == id)
+	if ((without_callback || !item->jq_no_callback)
+	    && ((id > 0 && tv->v_type == VAR_NUMBER && tv->vval.v_number == id)
 	      || (id <= 0 && (tv->v_type != VAR_NUMBER
 		 || tv->vval.v_number == 0
-		 || tv->vval.v_number != channel->ch_part[part].ch_block_id)))
+		 || tv->vval.v_number != channel->ch_part[part].ch_block_id))))
 	{
 	    *rettv = item->jq_value;
 	    if (tv->v_type == VAR_NUMBER)
@@ -2080,6 +2089,65 @@ channel_get_json(channel_T *channel, ch_
     return FAIL;
 }
 
+/*
+ * Put back "rettv" into the JSON queue, there was no callback for it.
+ * Takes over the values in "rettv".
+ */
+    static void
+channel_push_json(channel_T *channel, ch_part_T part, typval_T *rettv)
+{
+    jsonq_T   *head = &channel->ch_part[part].ch_json_head;
+    jsonq_T   *item = head->jq_next;
+    jsonq_T   *newitem;
+
+    if (head->jq_prev != NULL && head->jq_prev->jq_no_callback)
+	/* last item was pushed back, append to the end */
+	item = NULL;
+    else while (item != NULL && item->jq_no_callback)
+	/* append after the last item that was pushed back */
+	item = item->jq_next;
+
+    newitem = (jsonq_T *)alloc((unsigned)sizeof(jsonq_T));
+    if (newitem == NULL)
+	clear_tv(rettv);
+    else
+    {
+	newitem->jq_value = alloc_tv();
+	if (newitem->jq_value == NULL)
+	{
+	    vim_free(newitem);
+	    clear_tv(rettv);
+	}
+	else
+	{
+	    newitem->jq_no_callback = FALSE;
+	    *newitem->jq_value = *rettv;
+	    if (item == NULL)
+	    {
+		/* append to the end */
+		newitem->jq_prev = head->jq_prev;
+		head->jq_prev = newitem;
+		newitem->jq_next = NULL;
+		if (newitem->jq_prev == NULL)
+		    head->jq_next = newitem;
+		else
+		    newitem->jq_prev->jq_next = newitem;
+	    }
+	    else
+	    {
+		/* append after "item" */
+		newitem->jq_prev = item;
+		newitem->jq_next = item->jq_next;
+		item->jq_next = newitem;
+		if (newitem->jq_next == NULL)
+		    head->jq_prev = newitem;
+		else
+		    newitem->jq_next->jq_prev = newitem;
+	    }
+	}
+    }
+}
+
 #define CH_JSON_MAX_ARGS 4
 
 /*
@@ -2410,11 +2478,11 @@ may_invoke_callback(channel_T *channel, 
 	int		argc = 0;
 
 	/* Get any json message in the queue. */
-	if (channel_get_json(channel, part, -1, &listtv) == FAIL)
+	if (channel_get_json(channel, part, -1, FALSE, &listtv) == FAIL)
 	{
 	    /* Parse readahead, return when there is still no message. */
 	    channel_parse_json(channel, part);
-	    if (channel_get_json(channel, part, -1, &listtv) == FAIL)
+	    if (channel_get_json(channel, part, -1, FALSE, &listtv) == FAIL)
 		return FALSE;
 	}
 
@@ -2454,7 +2522,7 @@ may_invoke_callback(channel_T *channel, 
 	{
 	    /* If there is a close callback it may use ch_read() to get the
 	     * messages. */
-	    if (channel->ch_close_cb == NULL)
+	    if (channel->ch_close_cb == NULL && !channel->ch_drop_never)
 		drop_messages(channel, part);
 	    return FALSE;
 	}
@@ -2531,7 +2599,7 @@ may_invoke_callback(channel_T *channel, 
     {
 	int	done = FALSE;
 
-	/* invoke the one-time callback with the matching nr */
+	/* JSON or JS mode: invoke the one-time callback with the matching nr */
 	for (cbitem = cbhead->cq_next; cbitem != NULL; cbitem = cbitem->cq_next)
 	    if (cbitem->cq_seq_nr == seq_nr)
 	    {
@@ -2540,7 +2608,17 @@ may_invoke_callback(channel_T *channel, 
 		break;
 	    }
 	if (!done)
-	    ch_logn(channel, "Dropping message %d without callback", seq_nr);
+	{
+	    if (channel->ch_drop_never)
+	    {
+		/* message must be read with ch_read() */
+		channel_push_json(channel, part, listtv);
+		listtv = NULL;
+	    }
+	    else
+		ch_logn(channel, "Dropping message %d without callback",
+								       seq_nr);
+	}
     }
     else if (callback != NULL || buffer != NULL)
     {
@@ -2567,7 +2645,7 @@ may_invoke_callback(channel_T *channel, 
 	}
     }
     else
-	ch_log(channel, "Dropping message");
+	ch_logn(channel, "Dropping message %d", seq_nr);
 
     if (listtv != NULL)
 	free_tv(listtv);
@@ -2792,9 +2870,10 @@ channel_close(channel_T *channel, int in
 	      redraw_after_callback();
 	  }
 
-	  /* any remaining messages are useless now */
-	  for (part = PART_SOCK; part < PART_IN; ++part)
-	      drop_messages(channel, part);
+	  if (!channel->ch_drop_never)
+	      /* any remaining messages are useless now */
+	      for (part = PART_SOCK; part < PART_IN; ++part)
+		  drop_messages(channel, part);
     }
 
     channel->ch_nb_close_cb = NULL;
@@ -3091,9 +3170,9 @@ ch_close_part_on_error(
 channel_close_now(channel_T *channel)
 {
     ch_log(channel, "Closing channel because all readable fds are closed");
-    channel_close(channel, TRUE);
     if (channel->ch_nb_close_cb != NULL)
 	(*channel->ch_nb_close_cb)();
+    channel_close(channel, TRUE);
 }
 
 /*
@@ -3243,7 +3322,7 @@ channel_read_block(channel_T *channel, c
  * When "id" is -1 accept any message;
  * Blocks until the message is received or the timeout is reached.
  */
-    int
+    static int
 channel_read_json_block(
 	channel_T   *channel,
 	ch_part_T   part,
@@ -3264,7 +3343,7 @@ channel_read_json_block(
 	more = channel_parse_json(channel, part);
 
 	/* search for message "id" */
-	if (channel_get_json(channel, part, id, rettv) == OK)
+	if (channel_get_json(channel, part, id, TRUE, rettv) == OK)
 	{
 	    chanpart->ch_block_id = 0;
 	    return OK;
@@ -4290,6 +4369,20 @@ get_job_options(typval_T *tv, jobopt_T *
 		    return FAIL;
 		}
 	    }
+	    else if (STRCMP(hi->hi_key, "drop") == 0)
+	    {
+		int never = FALSE;
+		val = get_tv_string(item);
+
+		if (STRCMP(val, "never") == 0)
+		    never = TRUE;
+		else if (STRCMP(val, "auto") != 0)
+		{
+		    EMSG2(_(e_invarg2), "drop");
+		    return FAIL;
+		}
+		opt->jo_drop_never = never;
+	    }
 	    else if (STRCMP(hi->hi_key, "exit_cb") == 0)
 	    {
 		if (!(supported & JO_EXIT_CB))