comparison src/channel.c @ 10426:acfc83aca8ee v8.0.0107

commit https://github.com/vim/vim/commit/958dc6923d341390531888058495569d73c356c3 Author: Bram Moolenaar <Bram@vim.org> Date: Thu Dec 1 15:34:12 2016 +0100 patch 8.0.0107 Problem: When reading channel output in a timer, messages may go missing. (Skywind) Solution: Add the "drop" option. Write error messages in the channel log. Don't have ch_canread() check for the channel being open.
author Christian Brabandt <cb@256bit.org>
date Thu, 01 Dec 2016 15:45:04 +0100
parents e664ee056a84
children ea7fbae33285
comparison
equal deleted inserted replaced
10425:a4e378d55cc1 10426:acfc83aca8ee
1193 &channel->ch_part[PART_ERR].ch_partial, 1193 &channel->ch_part[PART_ERR].ch_partial,
1194 opt->jo_err_cb, opt->jo_err_partial); 1194 opt->jo_err_cb, opt->jo_err_partial);
1195 if (opt->jo_set & JO_CLOSE_CALLBACK) 1195 if (opt->jo_set & JO_CLOSE_CALLBACK)
1196 set_callback(&channel->ch_close_cb, &channel->ch_close_partial, 1196 set_callback(&channel->ch_close_cb, &channel->ch_close_partial,
1197 opt->jo_close_cb, opt->jo_close_partial); 1197 opt->jo_close_cb, opt->jo_close_partial);
1198 channel->ch_drop_never = opt->jo_drop_never;
1198 1199
1199 if ((opt->jo_set & JO_OUT_IO) && opt->jo_io[PART_OUT] == JIO_BUFFER) 1200 if ((opt->jo_set & JO_OUT_IO) && opt->jo_io[PART_OUT] == JIO_BUFFER)
1200 { 1201 {
1201 buf_T *buf; 1202 buf_T *buf;
1202 1203
1916 item = (jsonq_T *)alloc((unsigned)sizeof(jsonq_T)); 1917 item = (jsonq_T *)alloc((unsigned)sizeof(jsonq_T));
1917 if (item == NULL) 1918 if (item == NULL)
1918 clear_tv(&listtv); 1919 clear_tv(&listtv);
1919 else 1920 else
1920 { 1921 {
1922 item->jq_no_callback = FALSE;
1921 item->jq_value = alloc_tv(); 1923 item->jq_value = alloc_tv();
1922 if (item->jq_value == NULL) 1924 if (item->jq_value == NULL)
1923 { 1925 {
1924 vim_free(item); 1926 vim_free(item);
1925 clear_tv(&listtv); 1927 clear_tv(&listtv);
2048 /* 2050 /*
2049 * Get a message from the JSON queue for channel "channel". 2051 * Get a message from the JSON queue for channel "channel".
2050 * When "id" is positive it must match the first number in the list. 2052 * When "id" is positive it must match the first number in the list.
2051 * When "id" is zero or negative jut get the first message. But not the one 2053 * When "id" is zero or negative jut get the first message. But not the one
2052 * with id ch_block_id. 2054 * with id ch_block_id.
2055 * When "without_callback" is TRUE also get messages that were pushed back.
2053 * Return OK when found and return the value in "rettv". 2056 * Return OK when found and return the value in "rettv".
2054 * Return FAIL otherwise. 2057 * Return FAIL otherwise.
2055 */ 2058 */
2056 static int 2059 static int
2057 channel_get_json(channel_T *channel, ch_part_T part, int id, typval_T **rettv) 2060 channel_get_json(
2061 channel_T *channel,
2062 ch_part_T part,
2063 int id,
2064 int without_callback,
2065 typval_T **rettv)
2058 { 2066 {
2059 jsonq_T *head = &channel->ch_part[part].ch_json_head; 2067 jsonq_T *head = &channel->ch_part[part].ch_json_head;
2060 jsonq_T *item = head->jq_next; 2068 jsonq_T *item = head->jq_next;
2061 2069
2062 while (item != NULL) 2070 while (item != NULL)
2063 { 2071 {
2064 list_T *l = item->jq_value->vval.v_list; 2072 list_T *l = item->jq_value->vval.v_list;
2065 typval_T *tv = &l->lv_first->li_tv; 2073 typval_T *tv = &l->lv_first->li_tv;
2066 2074
2067 if ((id > 0 && tv->v_type == VAR_NUMBER && tv->vval.v_number == id) 2075 if ((without_callback || !item->jq_no_callback)
2076 && ((id > 0 && tv->v_type == VAR_NUMBER && tv->vval.v_number == id)
2068 || (id <= 0 && (tv->v_type != VAR_NUMBER 2077 || (id <= 0 && (tv->v_type != VAR_NUMBER
2069 || tv->vval.v_number == 0 2078 || tv->vval.v_number == 0
2070 || tv->vval.v_number != channel->ch_part[part].ch_block_id))) 2079 || tv->vval.v_number != channel->ch_part[part].ch_block_id))))
2071 { 2080 {
2072 *rettv = item->jq_value; 2081 *rettv = item->jq_value;
2073 if (tv->v_type == VAR_NUMBER) 2082 if (tv->v_type == VAR_NUMBER)
2074 ch_logn(channel, "Getting JSON message %d", tv->vval.v_number); 2083 ch_logn(channel, "Getting JSON message %d", tv->vval.v_number);
2075 remove_json_node(head, item); 2084 remove_json_node(head, item);
2076 return OK; 2085 return OK;
2077 } 2086 }
2078 item = item->jq_next; 2087 item = item->jq_next;
2079 } 2088 }
2080 return FAIL; 2089 return FAIL;
2090 }
2091
2092 /*
2093 * Put back "rettv" into the JSON queue, there was no callback for it.
2094 * Takes over the values in "rettv".
2095 */
2096 static void
2097 channel_push_json(channel_T *channel, ch_part_T part, typval_T *rettv)
2098 {
2099 jsonq_T *head = &channel->ch_part[part].ch_json_head;
2100 jsonq_T *item = head->jq_next;
2101 jsonq_T *newitem;
2102
2103 if (head->jq_prev != NULL && head->jq_prev->jq_no_callback)
2104 /* last item was pushed back, append to the end */
2105 item = NULL;
2106 else while (item != NULL && item->jq_no_callback)
2107 /* append after the last item that was pushed back */
2108 item = item->jq_next;
2109
2110 newitem = (jsonq_T *)alloc((unsigned)sizeof(jsonq_T));
2111 if (newitem == NULL)
2112 clear_tv(rettv);
2113 else
2114 {
2115 newitem->jq_value = alloc_tv();
2116 if (newitem->jq_value == NULL)
2117 {
2118 vim_free(newitem);
2119 clear_tv(rettv);
2120 }
2121 else
2122 {
2123 newitem->jq_no_callback = FALSE;
2124 *newitem->jq_value = *rettv;
2125 if (item == NULL)
2126 {
2127 /* append to the end */
2128 newitem->jq_prev = head->jq_prev;
2129 head->jq_prev = newitem;
2130 newitem->jq_next = NULL;
2131 if (newitem->jq_prev == NULL)
2132 head->jq_next = newitem;
2133 else
2134 newitem->jq_prev->jq_next = newitem;
2135 }
2136 else
2137 {
2138 /* append after "item" */
2139 newitem->jq_prev = item;
2140 newitem->jq_next = item->jq_next;
2141 item->jq_next = newitem;
2142 if (newitem->jq_next == NULL)
2143 head->jq_prev = newitem;
2144 else
2145 newitem->jq_next->jq_prev = newitem;
2146 }
2147 }
2148 }
2081 } 2149 }
2082 2150
2083 #define CH_JSON_MAX_ARGS 4 2151 #define CH_JSON_MAX_ARGS 4
2084 2152
2085 /* 2153 /*
2408 { 2476 {
2409 listitem_T *item; 2477 listitem_T *item;
2410 int argc = 0; 2478 int argc = 0;
2411 2479
2412 /* Get any json message in the queue. */ 2480 /* Get any json message in the queue. */
2413 if (channel_get_json(channel, part, -1, &listtv) == FAIL) 2481 if (channel_get_json(channel, part, -1, FALSE, &listtv) == FAIL)
2414 { 2482 {
2415 /* Parse readahead, return when there is still no message. */ 2483 /* Parse readahead, return when there is still no message. */
2416 channel_parse_json(channel, part); 2484 channel_parse_json(channel, part);
2417 if (channel_get_json(channel, part, -1, &listtv) == FAIL) 2485 if (channel_get_json(channel, part, -1, FALSE, &listtv) == FAIL)
2418 return FALSE; 2486 return FALSE;
2419 } 2487 }
2420 2488
2421 for (item = listtv->vval.v_list->lv_first; 2489 for (item = listtv->vval.v_list->lv_first;
2422 item != NULL && argc < CH_JSON_MAX_ARGS; 2490 item != NULL && argc < CH_JSON_MAX_ARGS;
2452 /* If there is no callback or buffer drop the message. */ 2520 /* If there is no callback or buffer drop the message. */
2453 if (callback == NULL && buffer == NULL) 2521 if (callback == NULL && buffer == NULL)
2454 { 2522 {
2455 /* If there is a close callback it may use ch_read() to get the 2523 /* If there is a close callback it may use ch_read() to get the
2456 * messages. */ 2524 * messages. */
2457 if (channel->ch_close_cb == NULL) 2525 if (channel->ch_close_cb == NULL && !channel->ch_drop_never)
2458 drop_messages(channel, part); 2526 drop_messages(channel, part);
2459 return FALSE; 2527 return FALSE;
2460 } 2528 }
2461 2529
2462 if (ch_mode == MODE_NL) 2530 if (ch_mode == MODE_NL)
2529 2597
2530 if (seq_nr > 0) 2598 if (seq_nr > 0)
2531 { 2599 {
2532 int done = FALSE; 2600 int done = FALSE;
2533 2601
2534 /* invoke the one-time callback with the matching nr */ 2602 /* JSON or JS mode: invoke the one-time callback with the matching nr */
2535 for (cbitem = cbhead->cq_next; cbitem != NULL; cbitem = cbitem->cq_next) 2603 for (cbitem = cbhead->cq_next; cbitem != NULL; cbitem = cbitem->cq_next)
2536 if (cbitem->cq_seq_nr == seq_nr) 2604 if (cbitem->cq_seq_nr == seq_nr)
2537 { 2605 {
2538 invoke_one_time_callback(channel, cbhead, cbitem, argv); 2606 invoke_one_time_callback(channel, cbhead, cbitem, argv);
2539 done = TRUE; 2607 done = TRUE;
2540 break; 2608 break;
2541 } 2609 }
2542 if (!done) 2610 if (!done)
2543 ch_logn(channel, "Dropping message %d without callback", seq_nr); 2611 {
2612 if (channel->ch_drop_never)
2613 {
2614 /* message must be read with ch_read() */
2615 channel_push_json(channel, part, listtv);
2616 listtv = NULL;
2617 }
2618 else
2619 ch_logn(channel, "Dropping message %d without callback",
2620 seq_nr);
2621 }
2544 } 2622 }
2545 else if (callback != NULL || buffer != NULL) 2623 else if (callback != NULL || buffer != NULL)
2546 { 2624 {
2547 if (buffer != NULL) 2625 if (buffer != NULL)
2548 { 2626 {
2565 invoke_callback(channel, callback, partial, argv); 2643 invoke_callback(channel, callback, partial, argv);
2566 } 2644 }
2567 } 2645 }
2568 } 2646 }
2569 else 2647 else
2570 ch_log(channel, "Dropping message"); 2648 ch_logn(channel, "Dropping message %d", seq_nr);
2571 2649
2572 if (listtv != NULL) 2650 if (listtv != NULL)
2573 free_tv(listtv); 2651 free_tv(listtv);
2574 vim_free(msg); 2652 vim_free(msg);
2575 2653
2790 { 2868 {
2791 channel_need_redraw = FALSE; 2869 channel_need_redraw = FALSE;
2792 redraw_after_callback(); 2870 redraw_after_callback();
2793 } 2871 }
2794 2872
2795 /* any remaining messages are useless now */ 2873 if (!channel->ch_drop_never)
2796 for (part = PART_SOCK; part < PART_IN; ++part) 2874 /* any remaining messages are useless now */
2797 drop_messages(channel, part); 2875 for (part = PART_SOCK; part < PART_IN; ++part)
2876 drop_messages(channel, part);
2798 } 2877 }
2799 2878
2800 channel->ch_nb_close_cb = NULL; 2879 channel->ch_nb_close_cb = NULL;
2801 } 2880 }
2802 2881
3089 3168
3090 static void 3169 static void
3091 channel_close_now(channel_T *channel) 3170 channel_close_now(channel_T *channel)
3092 { 3171 {
3093 ch_log(channel, "Closing channel because all readable fds are closed"); 3172 ch_log(channel, "Closing channel because all readable fds are closed");
3094 channel_close(channel, TRUE);
3095 if (channel->ch_nb_close_cb != NULL) 3173 if (channel->ch_nb_close_cb != NULL)
3096 (*channel->ch_nb_close_cb)(); 3174 (*channel->ch_nb_close_cb)();
3175 channel_close(channel, TRUE);
3097 } 3176 }
3098 3177
3099 /* 3178 /*
3100 * Read from channel "channel" for as long as there is something to read. 3179 * Read from channel "channel" for as long as there is something to read.
3101 * "part" is PART_SOCK, PART_OUT or PART_ERR. 3180 * "part" is PART_SOCK, PART_OUT or PART_ERR.
3241 * Read one JSON message with ID "id" from "channel"/"part" and store the 3320 * Read one JSON message with ID "id" from "channel"/"part" and store the
3242 * result in "rettv". 3321 * result in "rettv".
3243 * When "id" is -1 accept any message; 3322 * When "id" is -1 accept any message;
3244 * Blocks until the message is received or the timeout is reached. 3323 * Blocks until the message is received or the timeout is reached.
3245 */ 3324 */
3246 int 3325 static int
3247 channel_read_json_block( 3326 channel_read_json_block(
3248 channel_T *channel, 3327 channel_T *channel,
3249 ch_part_T part, 3328 ch_part_T part,
3250 int timeout_arg, 3329 int timeout_arg,
3251 int id, 3330 int id,
3262 for (;;) 3341 for (;;)
3263 { 3342 {
3264 more = channel_parse_json(channel, part); 3343 more = channel_parse_json(channel, part);
3265 3344
3266 /* search for message "id" */ 3345 /* search for message "id" */
3267 if (channel_get_json(channel, part, id, rettv) == OK) 3346 if (channel_get_json(channel, part, id, TRUE, rettv) == OK)
3268 { 3347 {
3269 chanpart->ch_block_id = 0; 3348 chanpart->ch_block_id = 0;
3270 return OK; 3349 return OK;
3271 } 3350 }
3272 3351
4288 { 4367 {
4289 EMSG2(_(e_invarg2), "close_cb"); 4368 EMSG2(_(e_invarg2), "close_cb");
4290 return FAIL; 4369 return FAIL;
4291 } 4370 }
4292 } 4371 }
4372 else if (STRCMP(hi->hi_key, "drop") == 0)
4373 {
4374 int never = FALSE;
4375 val = get_tv_string(item);
4376
4377 if (STRCMP(val, "never") == 0)
4378 never = TRUE;
4379 else if (STRCMP(val, "auto") != 0)
4380 {
4381 EMSG2(_(e_invarg2), "drop");
4382 return FAIL;
4383 }
4384 opt->jo_drop_never = never;
4385 }
4293 else if (STRCMP(hi->hi_key, "exit_cb") == 0) 4386 else if (STRCMP(hi->hi_key, "exit_cb") == 0)
4294 { 4387 {
4295 if (!(supported & JO_EXIT_CB)) 4388 if (!(supported & JO_EXIT_CB))
4296 break; 4389 break;
4297 opt->jo_set |= JO_EXIT_CB; 4390 opt->jo_set |= JO_EXIT_CB;