# HG changeset patch # User Christian Brabandt # Date 1480603504 -3600 # Node ID acfc83aca8ee209485e388844e2030d6fda1faf6 # Parent a4e378d55cc1f78bda41f5f5469a72bd481e25cd commit https://github.com/vim/vim/commit/958dc6923d341390531888058495569d73c356c3 Author: Bram Moolenaar Date: Thu Dec 1 15:34:12 2016 +0100 patch 8.0.0107 Problem: When reading channel output in a timer, messages may go missing. (Skywind) Solution: Add the "drop" option. Write error messages in the channel log. Don't have ch_canread() check for the channel being open. diff --git a/runtime/doc/channel.txt b/runtime/doc/channel.txt --- a/runtime/doc/channel.txt +++ b/runtime/doc/channel.txt @@ -155,7 +155,13 @@ Use |ch_status()| to see if the channel func MyCloseHandler(channel) < Vim will invoke callbacks that handle data before invoking close_cb, thus when this function is called no more data will - be received. + be passed to the callbacks. + *channel-drop* +"drop" Specifies when to drop messages: + "auto" When there is no callback to handle a message. + The "close_cb" is also considered for this. + "never" All messages will be kept. + *waittime* "waittime" The time to wait for the connection to be made in milliseconds. A negative number waits forever. diff --git a/src/channel.c b/src/channel.c --- a/src/channel.c +++ b/src/channel.c @@ -1195,6 +1195,7 @@ channel_set_options(channel_T *channel, if (opt->jo_set & JO_CLOSE_CALLBACK) set_callback(&channel->ch_close_cb, &channel->ch_close_partial, opt->jo_close_cb, opt->jo_close_partial); + channel->ch_drop_never = opt->jo_drop_never; if ((opt->jo_set & JO_OUT_IO) && opt->jo_io[PART_OUT] == JIO_BUFFER) { @@ -1918,6 +1919,7 @@ channel_parse_json(channel_T *channel, c clear_tv(&listtv); else { + item->jq_no_callback = FALSE; item->jq_value = alloc_tv(); if (item->jq_value == NULL) { @@ -2050,11 +2052,17 @@ remove_json_node(jsonq_T *head, jsonq_T * When "id" is positive it must match the first number in the list. * When "id" is zero or negative jut get the first message. But not the one * with id ch_block_id. + * When "without_callback" is TRUE also get messages that were pushed back. * Return OK when found and return the value in "rettv". * Return FAIL otherwise. */ static int -channel_get_json(channel_T *channel, ch_part_T part, int id, typval_T **rettv) +channel_get_json( + channel_T *channel, + ch_part_T part, + int id, + int without_callback, + typval_T **rettv) { jsonq_T *head = &channel->ch_part[part].ch_json_head; jsonq_T *item = head->jq_next; @@ -2064,10 +2072,11 @@ channel_get_json(channel_T *channel, ch_ list_T *l = item->jq_value->vval.v_list; typval_T *tv = &l->lv_first->li_tv; - if ((id > 0 && tv->v_type == VAR_NUMBER && tv->vval.v_number == id) + if ((without_callback || !item->jq_no_callback) + && ((id > 0 && tv->v_type == VAR_NUMBER && tv->vval.v_number == id) || (id <= 0 && (tv->v_type != VAR_NUMBER || tv->vval.v_number == 0 - || tv->vval.v_number != channel->ch_part[part].ch_block_id))) + || tv->vval.v_number != channel->ch_part[part].ch_block_id)))) { *rettv = item->jq_value; if (tv->v_type == VAR_NUMBER) @@ -2080,6 +2089,65 @@ channel_get_json(channel_T *channel, ch_ return FAIL; } +/* + * Put back "rettv" into the JSON queue, there was no callback for it. + * Takes over the values in "rettv". + */ + static void +channel_push_json(channel_T *channel, ch_part_T part, typval_T *rettv) +{ + jsonq_T *head = &channel->ch_part[part].ch_json_head; + jsonq_T *item = head->jq_next; + jsonq_T *newitem; + + if (head->jq_prev != NULL && head->jq_prev->jq_no_callback) + /* last item was pushed back, append to the end */ + item = NULL; + else while (item != NULL && item->jq_no_callback) + /* append after the last item that was pushed back */ + item = item->jq_next; + + newitem = (jsonq_T *)alloc((unsigned)sizeof(jsonq_T)); + if (newitem == NULL) + clear_tv(rettv); + else + { + newitem->jq_value = alloc_tv(); + if (newitem->jq_value == NULL) + { + vim_free(newitem); + clear_tv(rettv); + } + else + { + newitem->jq_no_callback = FALSE; + *newitem->jq_value = *rettv; + if (item == NULL) + { + /* append to the end */ + newitem->jq_prev = head->jq_prev; + head->jq_prev = newitem; + newitem->jq_next = NULL; + if (newitem->jq_prev == NULL) + head->jq_next = newitem; + else + newitem->jq_prev->jq_next = newitem; + } + else + { + /* append after "item" */ + newitem->jq_prev = item; + newitem->jq_next = item->jq_next; + item->jq_next = newitem; + if (newitem->jq_next == NULL) + head->jq_prev = newitem; + else + newitem->jq_next->jq_prev = newitem; + } + } + } +} + #define CH_JSON_MAX_ARGS 4 /* @@ -2410,11 +2478,11 @@ may_invoke_callback(channel_T *channel, int argc = 0; /* Get any json message in the queue. */ - if (channel_get_json(channel, part, -1, &listtv) == FAIL) + if (channel_get_json(channel, part, -1, FALSE, &listtv) == FAIL) { /* Parse readahead, return when there is still no message. */ channel_parse_json(channel, part); - if (channel_get_json(channel, part, -1, &listtv) == FAIL) + if (channel_get_json(channel, part, -1, FALSE, &listtv) == FAIL) return FALSE; } @@ -2454,7 +2522,7 @@ may_invoke_callback(channel_T *channel, { /* If there is a close callback it may use ch_read() to get the * messages. */ - if (channel->ch_close_cb == NULL) + if (channel->ch_close_cb == NULL && !channel->ch_drop_never) drop_messages(channel, part); return FALSE; } @@ -2531,7 +2599,7 @@ may_invoke_callback(channel_T *channel, { int done = FALSE; - /* invoke the one-time callback with the matching nr */ + /* JSON or JS mode: invoke the one-time callback with the matching nr */ for (cbitem = cbhead->cq_next; cbitem != NULL; cbitem = cbitem->cq_next) if (cbitem->cq_seq_nr == seq_nr) { @@ -2540,7 +2608,17 @@ may_invoke_callback(channel_T *channel, break; } if (!done) - ch_logn(channel, "Dropping message %d without callback", seq_nr); + { + if (channel->ch_drop_never) + { + /* message must be read with ch_read() */ + channel_push_json(channel, part, listtv); + listtv = NULL; + } + else + ch_logn(channel, "Dropping message %d without callback", + seq_nr); + } } else if (callback != NULL || buffer != NULL) { @@ -2567,7 +2645,7 @@ may_invoke_callback(channel_T *channel, } } else - ch_log(channel, "Dropping message"); + ch_logn(channel, "Dropping message %d", seq_nr); if (listtv != NULL) free_tv(listtv); @@ -2792,9 +2870,10 @@ channel_close(channel_T *channel, int in redraw_after_callback(); } - /* any remaining messages are useless now */ - for (part = PART_SOCK; part < PART_IN; ++part) - drop_messages(channel, part); + if (!channel->ch_drop_never) + /* any remaining messages are useless now */ + for (part = PART_SOCK; part < PART_IN; ++part) + drop_messages(channel, part); } channel->ch_nb_close_cb = NULL; @@ -3091,9 +3170,9 @@ ch_close_part_on_error( channel_close_now(channel_T *channel) { ch_log(channel, "Closing channel because all readable fds are closed"); - channel_close(channel, TRUE); if (channel->ch_nb_close_cb != NULL) (*channel->ch_nb_close_cb)(); + channel_close(channel, TRUE); } /* @@ -3243,7 +3322,7 @@ channel_read_block(channel_T *channel, c * When "id" is -1 accept any message; * Blocks until the message is received or the timeout is reached. */ - int + static int channel_read_json_block( channel_T *channel, ch_part_T part, @@ -3264,7 +3343,7 @@ channel_read_json_block( more = channel_parse_json(channel, part); /* search for message "id" */ - if (channel_get_json(channel, part, id, rettv) == OK) + if (channel_get_json(channel, part, id, TRUE, rettv) == OK) { chanpart->ch_block_id = 0; return OK; @@ -4290,6 +4369,20 @@ get_job_options(typval_T *tv, jobopt_T * return FAIL; } } + else if (STRCMP(hi->hi_key, "drop") == 0) + { + int never = FALSE; + val = get_tv_string(item); + + if (STRCMP(val, "never") == 0) + never = TRUE; + else if (STRCMP(val, "auto") != 0) + { + EMSG2(_(e_invarg2), "drop"); + return FAIL; + } + opt->jo_drop_never = never; + } else if (STRCMP(hi->hi_key, "exit_cb") == 0) { if (!(supported & JO_EXIT_CB)) diff --git a/src/evalfunc.c b/src/evalfunc.c --- a/src/evalfunc.c +++ b/src/evalfunc.c @@ -1786,7 +1786,7 @@ f_ceil(typval_T *argvars, typval_T *rett static void f_ch_canread(typval_T *argvars, typval_T *rettv) { - channel_T *channel = get_channel_arg(&argvars[0], TRUE, TRUE, 0); + channel_T *channel = get_channel_arg(&argvars[0], FALSE, FALSE, 0); rettv->vval.v_number = 0; if (channel != NULL) diff --git a/src/message.c b/src/message.c --- a/src/message.c +++ b/src/message.c @@ -42,6 +42,9 @@ static int confirm_msg_used = FALSE; /* static char_u *confirm_msg = NULL; /* ":confirm" message */ static char_u *confirm_msg_tail; /* tail of confirm_msg */ #endif +#ifdef FEAT_JOB_CHANNEL +static int emsg_to_channel_log = FALSE; +#endif struct msg_hist { @@ -166,6 +169,14 @@ msg_attr_keep( && STRCMP(s, last_msg_hist->msg))) add_msg_hist(s, -1, attr); +#ifdef FEAT_JOB_CHANNEL + if (emsg_to_channel_log) + { + /* Write message in the channel log. */ + ch_logs(NULL, "ERROR: %s", (char *)s); + } +#endif + /* When displaying keep_msg, don't let msg_start() free it, caller must do * that. */ if (s == keep_msg) @@ -556,6 +567,7 @@ emsg(char_u *s) { int attr; char_u *p; + int r; #ifdef FEAT_EVAL int ignore = FALSE; int severe; @@ -624,6 +636,9 @@ emsg(char_u *s) } redir_write(s, -1); } +#ifdef FEAT_JOB_CHANNEL + ch_logs(NULL, "ERROR: %s", (char *)s); +#endif return TRUE; } @@ -650,6 +665,9 @@ emsg(char_u *s) * and a redraw is expected because * msg_scrolled is non-zero */ +#ifdef FEAT_JOB_CHANNEL + emsg_to_channel_log = TRUE; +#endif /* * Display name and line number for the source of the error. */ @@ -659,7 +677,12 @@ emsg(char_u *s) * Display the error message itself. */ msg_nowait = FALSE; /* wait for this msg */ - return msg_attr(s, attr); + r = msg_attr(s, attr); + +#ifdef FEAT_JOB_CHANNEL + emsg_to_channel_log = FALSE; +#endif + return r; } diff --git a/src/proto/channel.pro b/src/proto/channel.pro --- a/src/proto/channel.pro +++ b/src/proto/channel.pro @@ -33,7 +33,6 @@ void channel_close_in(channel_T *channel void channel_clear(channel_T *channel); void channel_free_all(void); char_u *channel_read_block(channel_T *channel, ch_part_T part, int timeout); -int channel_read_json_block(channel_T *channel, ch_part_T part, int timeout_arg, int id, typval_T **rettv); void common_channel_read(typval_T *argvars, typval_T *rettv, int raw); channel_T *channel_fd2channel(sock_T fd, ch_part_T *partp); void channel_handle_events(void); diff --git a/src/structs.h b/src/structs.h --- a/src/structs.h +++ b/src/structs.h @@ -1474,6 +1474,7 @@ struct jsonq_S typval_T *jq_value; jsonq_T *jq_next; jsonq_T *jq_prev; + int jq_no_callback; /* TRUE when no callback was found */ }; struct cbq_S @@ -1597,6 +1598,7 @@ struct channel_S { partial_T *ch_partial; char_u *ch_close_cb; /* call when channel is closed */ partial_T *ch_close_partial; + int ch_drop_never; job_T *ch_job; /* Job that uses this channel; this does not * count as a reference to avoid a circular @@ -1684,6 +1686,7 @@ typedef struct partial_T *jo_close_partial; /* not referenced! */ char_u *jo_exit_cb; /* not allocated! */ partial_T *jo_exit_partial; /* not referenced! */ + int jo_drop_never; int jo_waittime; int jo_timeout; int jo_out_timeout; diff --git a/src/version.c b/src/version.c --- a/src/version.c +++ b/src/version.c @@ -765,6 +765,8 @@ static char *(features[]) = static int included_patches[] = { /* Add new patch number below this line */ /**/ + 107, +/**/ 106, /**/ 105,