# HG changeset patch # User Christian Brabandt # Date 1455989404 -3600 # Node ID aa845d10c6fb8dd1bc258cb6c3d6001145b845af # Parent 85b476dac933c46bd171b06ef681feed1790d59c commit https://github.com/vim/vim/commit/42d38a2db17e70312d073095257555c27a5f9443 Author: Bram Moolenaar Date: Sat Feb 20 18:18:59 2016 +0100 patch 7.4.1369 Problem: Channels don't have a queue for stderr. Solution: Have a queue for each part of the channel. diff --git a/src/channel.c b/src/channel.c --- a/src/channel.c +++ b/src/channel.c @@ -274,7 +274,7 @@ static int next_ch_id = 0; channel_T * add_channel(void) { - int which; + int part; channel_T *channel = (channel_T *)alloc_clear((int)sizeof(channel_T)); if (channel == NULL) @@ -284,25 +284,24 @@ add_channel(void) ch_log(channel, "Created channel"); #ifdef CHANNEL_PIPES - for (which = CHAN_SOCK; which <= CHAN_IN; ++which) + for (part = PART_SOCK; part <= PART_IN; ++part) #else - which = CHAN_SOCK; + part = PART_SOCK; #endif { - channel->ch_pfd[which].ch_fd = CHAN_FD_INVALID; + channel->ch_part[part].ch_fd = INVALID_FD; #ifdef FEAT_GUI_X11 - channel->ch_pfd[which].ch_inputHandler = (XtInputId)NULL; + channel->ch_part[part].ch_inputHandler = (XtInputId)NULL; #endif #ifdef FEAT_GUI_GTK - channel->ch_pfd[which].ch_inputHandler = 0; + channel->ch_part[part].ch_inputHandler = 0; #endif #ifdef FEAT_GUI_W32 - channel->ch_pfd[which].ch_inputHandler = -1; + channel->ch_part[part].ch_inputHandler = -1; #endif + channel->ch_part[part].ch_timeout = 2000; } - channel->ch_timeout = 2000; - if (first_channel != NULL) { first_channel->ch_prev = channel; @@ -349,17 +348,26 @@ channel_from_id(int id) static void channel_read_netbeans(int id) { - channel_T *channel = channel_from_id(id); + channel_T *channel = channel_from_id(id); + int part; if (channel == NULL) ch_errorn(NULL, "Channel %d not found", id); else - channel_read(channel, -1, "messageFromNetbeans"); + { + /* TODO: check stderr */ + if (channel->CH_SOCK_FD != INVALID_FD) + part = PART_SOCK; + else + part = PART_OUT; + channel_read(channel, part, "messageFromNetbeans"); + } } #endif /* * Read a command from netbeans. + * TODO: instead of channel ID use the FD. */ #ifdef FEAT_GUI_X11 static void @@ -382,15 +390,15 @@ messageFromNetbeans(gpointer clientData, #endif static void -channel_gui_register_one(channel_T *channel, int which) +channel_gui_register_one(channel_T *channel, int part) { # ifdef FEAT_GUI_X11 /* Tell notifier we are interested in being called * when there is input on the editor connection socket. */ - if (channel->ch_pfd[which].ch_inputHandler == (XtInputId)NULL) - channel->ch_pfd[which].ch_inputHandler = XtAppAddInput( + if (channel->ch_part[part].ch_inputHandler == (XtInputId)NULL) + channel->ch_part[part].ch_inputHandler = XtAppAddInput( (XtAppContext)app_context, - channel->ch_pfd[which].ch_fd, + channel->ch_part[part].ch_fd, (XtPointer)(XtInputReadMask + XtInputExceptMask), messageFromNetbeans, (XtPointer)(long)channel->ch_id); @@ -398,9 +406,9 @@ channel_gui_register_one(channel_T *chan # ifdef FEAT_GUI_GTK /* Tell gdk we are interested in being called when there * is input on the editor connection socket. */ - if (channel->ch_pfd[which].ch_inputHandler == 0) - channel->ch_pfd[which].ch_inputHandler = gdk_input_add( - (gint)channel->ch_pfd[which].ch_fd, + if (channel->ch_part[part].ch_inputHandler == 0) + channel->ch_part[part].ch_inputHandler = gdk_input_add( + (gint)channel->ch_part[part].ch_fd, (GdkInputCondition) ((int)GDK_INPUT_READ + (int)GDK_INPUT_EXCEPTION), messageFromNetbeans, @@ -409,9 +417,9 @@ channel_gui_register_one(channel_T *chan # ifdef FEAT_GUI_W32 /* Tell Windows we are interested in receiving message when there * is input on the editor connection socket. */ - if (channel->ch_pfd[which].ch_inputHandler == -1) - channel->ch_pfd[which].ch_inputHandler = WSAAsyncSelect( - channel->ch_pfd[which].ch_fd, + if (channel->ch_part[part].ch_inputHandler == -1) + channel->ch_part[part].ch_inputHandler = WSAAsyncSelect( + channel->ch_part[part].ch_fd, s_hwnd, WM_NETBEANS, FD_READ); # endif # endif @@ -424,13 +432,13 @@ channel_gui_register(channel_T *channel) if (!CH_HAS_GUI) return; - if (channel->CH_SOCK != CHAN_FD_INVALID) - channel_gui_register_one(channel, CHAN_SOCK); + if (channel->CH_SOCK_FD != INVALID_FD) + channel_gui_register_one(channel, PART_SOCK); # ifdef CHANNEL_PIPES - if (channel->CH_OUT != CHAN_FD_INVALID) - channel_gui_register_one(channel, CHAN_OUT); - if (channel->CH_ERR != CHAN_FD_INVALID) - channel_gui_register_one(channel, CHAN_ERR); + if (channel->CH_OUT_FD != INVALID_FD) + channel_gui_register_one(channel, PART_OUT); + if (channel->CH_ERR_FD != INVALID_FD) + channel_gui_register_one(channel, PART_ERR); # endif } @@ -450,33 +458,33 @@ channel_gui_register_all(void) static void channel_gui_unregister(channel_T *channel) { - int which; + int part; #ifdef CHANNEL_PIPES - for (which = CHAN_SOCK; which < CHAN_IN; ++which) + for (part = PART_SOCK; part < PART_IN; ++part) #else - which = CHAN_SOCK; + part = PART_SOCK; #endif { # ifdef FEAT_GUI_X11 - if (channel->ch_pfd[which].ch_inputHandler != (XtInputId)NULL) + if (channel->ch_part[part].ch_inputHandler != (XtInputId)NULL) { - XtRemoveInput(channel->ch_pfd[which].ch_inputHandler); - channel->ch_pfd[which].ch_inputHandler = (XtInputId)NULL; + XtRemoveInput(channel->ch_part[part].ch_inputHandler); + channel->ch_part[part].ch_inputHandler = (XtInputId)NULL; } # else # ifdef FEAT_GUI_GTK - if (channel->ch_pfd[which].ch_inputHandler != 0) + if (channel->ch_part[part].ch_inputHandler != 0) { - gdk_input_remove(channel->ch_pfd[which].ch_inputHandler); - channel->ch_pfd[which].ch_inputHandler = 0; + gdk_input_remove(channel->ch_part[part].ch_inputHandler); + channel->ch_part[part].ch_inputHandler = 0; } # else # ifdef FEAT_GUI_W32 - if (channel->ch_pfd[which].ch_inputHandler == 0) + if (channel->ch_part[part].ch_inputHandler == 0) { - WSAAsyncSelect(channel->ch_pfd[which].ch_fd, s_hwnd, 0, 0); - channel->ch_pfd[which].ch_inputHandler = -1; + WSAAsyncSelect(channel->ch_part[part].ch_fd, s_hwnd, 0, 0); + channel->ch_part[part].ch_inputHandler = -1; } # endif # endif @@ -721,7 +729,7 @@ channel_open(char *hostname, int port_in #endif } - channel->CH_SOCK = (sock_T)sd; + channel->CH_SOCK_FD = (sock_T)sd; channel->ch_close_cb = close_cb; #ifdef FEAT_GUI @@ -735,9 +743,9 @@ channel_open(char *hostname, int port_in void channel_set_pipes(channel_T *channel, sock_T in, sock_T out, sock_T err) { - channel->CH_IN = in; - channel->CH_OUT = out; - channel->CH_ERR = err; + channel->CH_IN_FD = in; + channel->CH_OUT_FD = out; + channel->CH_ERR_FD = err; } #endif @@ -753,10 +761,14 @@ channel_set_job(channel_T *channel, job_ void channel_set_options(channel_T *channel, jobopt_T *options) { + int part; + if (options->jo_set & JO_MODE) - channel->ch_mode = options->jo_mode; + for (part = PART_SOCK; part <= PART_IN; ++part) + channel->ch_part[part].ch_mode = options->jo_mode; if (options->jo_set & JO_TIMEOUT) - channel->ch_timeout = options->jo_timeout; + for (part = PART_SOCK; part <= PART_IN; ++part) + channel->ch_part[part].ch_timeout = options->jo_timeout; if (options->jo_set & JO_CALLBACK) { @@ -769,12 +781,16 @@ channel_set_options(channel_T *channel, } /* - * Set the callback for channel "channel" for the response with "id". + * Set the callback for "channel"/"part" for the response with "id". */ void -channel_set_req_callback(channel_T *channel, char_u *callback, int id) +channel_set_req_callback( + channel_T *channel, + int part, + char_u *callback, + int id) { - cbq_T *head = &channel->ch_cb_head; + cbq_T *head = &channel->ch_part[part].ch_cb_head; cbq_T *item = (cbq_T *)alloc((int)sizeof(cbq_T)); if (item != NULL) @@ -813,14 +829,14 @@ invoke_callback(channel_T *channel, char } /* - * Return the first buffer from the channel and remove it. + * Return the first buffer from channel "channel"/"part" and remove it. * The caller must free it. * Returns NULL if there is nothing. */ char_u * -channel_get(channel_T *channel) +channel_get(channel_T *channel, int part) { - readq_T *head = &channel->ch_head; + readq_T *head = &channel->ch_part[part].ch_head; readq_T *node = head->rq_next; char_u *p; @@ -838,26 +854,26 @@ channel_get(channel_T *channel) } /* - * Returns the whole buffer contents concatenated. + * Returns the whole buffer contents concatenated for "channel"/"part". */ static char_u * -channel_get_all(channel_T *channel) +channel_get_all(channel_T *channel, int part) { /* Concatenate everything into one buffer. * TODO: avoid multiple allocations. */ - while (channel_collapse(channel) == OK) + while (channel_collapse(channel, part) == OK) ; - return channel_get(channel); + return channel_get(channel, part); } /* - * Collapses the first and second buffer in the channel "channel". + * Collapses the first and second buffer for "channel"/"part". * Returns FAIL if that is not possible. */ int -channel_collapse(channel_T *channel) +channel_collapse(channel_T *channel, int part) { - readq_T *head = &channel->ch_head; + readq_T *head = &channel->ch_part[part].ch_head; readq_T *node = head->rq_next; char_u *p; @@ -882,31 +898,88 @@ channel_collapse(channel_T *channel) } /* - * Use the read buffer of channel "channel" and parse a JSON messages that is + * Store "buf[len]" on "channel"/"part". + * Returns OK or FAIL. + */ + static int +channel_save(channel_T *channel, int part, char_u *buf, int len) +{ + readq_T *node; + readq_T *head = &channel->ch_part[part].ch_head; + char_u *p; + int i; + + node = (readq_T *)alloc(sizeof(readq_T)); + if (node == NULL) + return FAIL; /* out of memory */ + node->rq_buffer = alloc(len + 1); + if (node->rq_buffer == NULL) + { + vim_free(node); + return FAIL; /* out of memory */ + } + + if (channel->ch_part[part].ch_mode == MODE_NL) + { + /* Drop any CR before a NL. */ + p = node->rq_buffer; + for (i = 0; i < len; ++i) + if (buf[i] != CAR || i + 1 >= len || buf[i + 1] != NL) + *p++ = buf[i]; + *p = NUL; + } + else + { + mch_memmove(node->rq_buffer, buf, len); + node->rq_buffer[len] = NUL; + } + + /* append node to the tail of the queue */ + node->rq_next = NULL; + node->rq_prev = head->rq_prev; + if (head->rq_prev == NULL) + head->rq_next = node; + else + head->rq_prev->rq_next = node; + head->rq_prev = node; + + if (log_fd != NULL) + { + ch_log_lead("RECV ", channel); + fprintf(log_fd, "'"); + if (fwrite(buf, len, 1, log_fd) != 1) + return FAIL; + fprintf(log_fd, "'\n"); + } + return OK; +} + +/* + * Use the read buffer of "channel"/"part" and parse a JSON messages that is * complete. The messages are added to the queue. * Return TRUE if there is more to read. */ static int -channel_parse_json(channel_T *channel) +channel_parse_json(channel_T *channel, int part) { js_read_T reader; typval_T listtv; jsonq_T *item; - jsonq_T *head = &channel->ch_json_head; + jsonq_T *head = &channel->ch_part[part].ch_json_head; int ret; - if (channel_peek(channel) == NULL) + if (channel_peek(channel, part) == NULL) return FALSE; /* TODO: make reader work properly */ - /* reader.js_buf = channel_peek(channel); */ - reader.js_buf = channel_get_all(channel); + /* reader.js_buf = channel_peek(channel, part); */ + reader.js_buf = channel_get_all(channel, part); reader.js_used = 0; reader.js_fill = NULL; /* reader.js_fill = channel_fill; */ reader.js_cookie = channel; ret = json_decode(&reader, &listtv, - channel->ch_mode == MODE_JS ? JSON_JS : 0); + channel->ch_part[part].ch_mode == MODE_JS ? JSON_JS : 0); if (ret == OK) { /* Only accept the response when it is a list with at least two @@ -948,7 +1021,7 @@ channel_parse_json(channel_T *channel) * TODO: insert in front */ if (reader.js_buf[reader.js_used] != NUL) { - channel_save(channel, reader.js_buf + reader.js_used, + channel_save(channel, part, reader.js_buf + reader.js_used, (int)(reader.js_end - reader.js_buf) - reader.js_used); ret = TRUE; } @@ -1002,9 +1075,9 @@ remove_json_node(jsonq_T *head, jsonq_T * Return FAIL otherwise. */ static int -channel_get_json(channel_T *channel, int id, typval_T **rettv) +channel_get_json(channel_T *channel, int part, int id, typval_T **rettv) { - jsonq_T *head = &channel->ch_json_head; + jsonq_T *head = &channel->ch_part[part].ch_json_head; jsonq_T *item = head->jq_next; while (item != NULL) @@ -1014,8 +1087,8 @@ channel_get_json(channel_T *channel, int if ((id > 0 && tv->v_type == VAR_NUMBER && tv->vval.v_number == id) || (id <= 0 && (tv->v_type != VAR_NUMBER - || tv->vval.v_number == 0 - || tv->vval.v_number != channel->ch_block_id))) + || tv->vval.v_number == 0 + || tv->vval.v_number != channel->ch_part[part].ch_block_id))) { *rettv = item->jq_value; remove_json_node(head, item); @@ -1027,12 +1100,17 @@ channel_get_json(channel_T *channel, int } /* - * Execute a command received over channel "channel". + * Execute a command received over "channel"/"part" * "cmd" is the command string, "arg2" the second argument. * "arg3" is the third argument, NULL if missing. */ static void -channel_exe_cmd(channel_T *channel, char_u *cmd, typval_T *arg2, typval_T *arg3) +channel_exe_cmd( + channel_T *channel, + int part, + char_u *cmd, + typval_T *arg2, + typval_T *arg3) { char_u *arg; @@ -1090,7 +1168,8 @@ channel_exe_cmd(channel_T *channel, char typval_T *tv; typval_T err_tv; char_u *json = NULL; - int options = channel->ch_mode == MODE_JS ? JSON_JS : 0; + int options = channel->ch_part[part].ch_mode == MODE_JS + ? JSON_JS : 0; /* Don't pollute the display with errors. */ ++emsg_skip; @@ -1114,7 +1193,7 @@ channel_exe_cmd(channel_T *channel, char } if (json != NULL) { - channel_send(channel, json, "eval"); + channel_send(channel, part, json, "eval"); vim_free(json); } } @@ -1128,12 +1207,11 @@ channel_exe_cmd(channel_T *channel, char } /* - * Invoke a callback for channel "channel" if needed. - * TODO: add "which" argument, read stderr. + * Invoke a callback for "channel"/"part" if needed. * Return TRUE when a message was handled, there might be another one. */ static int -may_invoke_callback(channel_T *channel) +may_invoke_callback(channel_T *channel, int part) { char_u *msg = NULL; typval_T *listtv = NULL; @@ -1141,20 +1219,26 @@ may_invoke_callback(channel_T *channel) typval_T *typetv; typval_T argv[3]; int seq_nr = -1; - ch_mode_T ch_mode = channel->ch_mode; + ch_mode_T ch_mode = channel->ch_part[part].ch_mode; + char_u *callback = NULL; if (channel->ch_close_cb != NULL) /* this channel is handled elsewhere (netbeans) */ return FALSE; + if (channel->ch_part[part].ch_callback != NULL) + callback = channel->ch_part[part].ch_callback; + else + callback = channel->ch_callback; + if (ch_mode == MODE_JSON || ch_mode == MODE_JS) { /* Get any json message in the queue. */ - if (channel_get_json(channel, -1, &listtv) == FAIL) + if (channel_get_json(channel, part, -1, &listtv) == FAIL) { /* Parse readahead, return when there is still no message. */ - channel_parse_json(channel); - if (channel_get_json(channel, -1, &listtv) == FAIL) + channel_parse_json(channel, part); + if (channel_get_json(channel, part, -1, &listtv) == FAIL) return FALSE; } @@ -1170,7 +1254,7 @@ may_invoke_callback(channel_T *channel) if (list->lv_len == 3) arg3 = &list->lv_last->li_tv; ch_logs(channel, "Executing %s command", (char *)cmd); - channel_exe_cmd(channel, cmd, &argv[1], arg3); + channel_exe_cmd(channel, part, cmd, &argv[1], arg3); free_tv(listtv); return TRUE; } @@ -1184,7 +1268,7 @@ may_invoke_callback(channel_T *channel) } seq_nr = typetv->vval.v_number; } - else if (channel_peek(channel) == NULL) + else if (channel_peek(channel, part) == NULL) { /* nothing to read on RAW or NL channel */ return FALSE; @@ -1192,9 +1276,9 @@ may_invoke_callback(channel_T *channel) else { /* If there is no callback drop the message. */ - if (channel->ch_callback == NULL) + if (callback == NULL) { - while ((msg = channel_get(channel)) != NULL) + while ((msg = channel_get(channel, part)) != NULL) vim_free(msg); return FALSE; } @@ -1208,16 +1292,16 @@ may_invoke_callback(channel_T *channel) * not try to concatenate the first and the second buffer. */ while (TRUE) { - buf = channel_peek(channel); + buf = channel_peek(channel, part); nl = vim_strchr(buf, NL); if (nl != NULL) break; - if (channel_collapse(channel) == FAIL) + if (channel_collapse(channel, part) == FAIL) return FALSE; /* incomplete message */ } if (nl[1] == NUL) /* get the whole buffer */ - msg = channel_get(channel); + msg = channel_get(channel, part); else { /* Copy the message into allocated memory and remove it from @@ -1229,7 +1313,7 @@ may_invoke_callback(channel_T *channel) else /* For a raw channel we don't know where the message ends, just * get everything we have. */ - msg = channel_get_all(channel); + msg = channel_get_all(channel, part); argv[1].v_type = VAR_STRING; argv[1].vval.v_string = msg; @@ -1237,7 +1321,7 @@ may_invoke_callback(channel_T *channel) if (seq_nr > 0) { - cbq_T *head = &channel->ch_cb_head; + cbq_T *head = &channel->ch_part[part].ch_cb_head; cbq_T *item = head->cq_next; int done = FALSE; @@ -1261,11 +1345,11 @@ may_invoke_callback(channel_T *channel) if (!done) ch_log(channel, "Dropping message without callback"); } - else if (channel->ch_callback != NULL) + else if (callback != NULL) { /* invoke the channel callback */ ch_log(channel, "Invoking channel callback"); - invoke_callback(channel, channel->ch_callback, argv); + invoke_callback(channel, callback, argv); } else ch_log(channel, "Dropping message"); @@ -1284,9 +1368,9 @@ may_invoke_callback(channel_T *channel) int channel_can_write_to(channel_T *channel) { - return channel != NULL && (channel->CH_SOCK != CHAN_FD_INVALID + return channel != NULL && (channel->CH_SOCK_FD != INVALID_FD #ifdef CHANNEL_PIPES - || channel->CH_IN != CHAN_FD_INVALID + || channel->CH_IN_FD != INVALID_FD #endif ); } @@ -1298,11 +1382,11 @@ channel_can_write_to(channel_T *channel) int channel_is_open(channel_T *channel) { - return channel != NULL && (channel->CH_SOCK != CHAN_FD_INVALID + return channel != NULL && (channel->CH_SOCK_FD != INVALID_FD #ifdef CHANNEL_PIPES - || channel->CH_IN != CHAN_FD_INVALID - || channel->CH_OUT != CHAN_FD_INVALID - || channel->CH_ERR != CHAN_FD_INVALID + || channel->CH_IN_FD != INVALID_FD + || channel->CH_OUT_FD != INVALID_FD + || channel->CH_ERR_FD != INVALID_FD #endif ); } @@ -1333,26 +1417,26 @@ channel_close(channel_T *channel) channel_gui_unregister(channel); #endif - if (channel->CH_SOCK != CHAN_FD_INVALID) + if (channel->CH_SOCK_FD != INVALID_FD) { - sock_close(channel->CH_SOCK); - channel->CH_SOCK = CHAN_FD_INVALID; + sock_close(channel->CH_SOCK_FD); + channel->CH_SOCK_FD = INVALID_FD; } #if defined(CHANNEL_PIPES) - if (channel->CH_IN != CHAN_FD_INVALID) + if (channel->CH_IN_FD != INVALID_FD) { - fd_close(channel->CH_IN); - channel->CH_IN = CHAN_FD_INVALID; + fd_close(channel->CH_IN_FD); + channel->CH_IN_FD = INVALID_FD; } - if (channel->CH_OUT != CHAN_FD_INVALID) + if (channel->CH_OUT_FD != INVALID_FD) { - fd_close(channel->CH_OUT); - channel->CH_OUT = CHAN_FD_INVALID; + fd_close(channel->CH_OUT_FD); + channel->CH_OUT_FD = INVALID_FD; } - if (channel->CH_ERR != CHAN_FD_INVALID) + if (channel->CH_ERR_FD != INVALID_FD) { - fd_close(channel->CH_ERR); - channel->CH_ERR = CHAN_FD_INVALID; + fd_close(channel->CH_ERR_FD); + channel->CH_ERR_FD = INVALID_FD; } #endif @@ -1361,70 +1445,13 @@ channel_close(channel_T *channel) } /* - * Store "buf[len]" on channel "channel". - * Returns OK or FAIL. - */ - int -channel_save(channel_T *channel, char_u *buf, int len) -{ - readq_T *node; - readq_T *head = &channel->ch_head; - char_u *p; - int i; - - node = (readq_T *)alloc(sizeof(readq_T)); - if (node == NULL) - return FAIL; /* out of memory */ - node->rq_buffer = alloc(len + 1); - if (node->rq_buffer == NULL) - { - vim_free(node); - return FAIL; /* out of memory */ - } - - if (channel->ch_mode == MODE_NL) - { - /* Drop any CR before a NL. */ - p = node->rq_buffer; - for (i = 0; i < len; ++i) - if (buf[i] != CAR || i + 1 >= len || buf[i + 1] != NL) - *p++ = buf[i]; - *p = NUL; - } - else - { - mch_memmove(node->rq_buffer, buf, len); - node->rq_buffer[len] = NUL; - } - - /* append node to the tail of the queue */ - node->rq_next = NULL; - node->rq_prev = head->rq_prev; - if (head->rq_prev == NULL) - head->rq_next = node; - else - head->rq_prev->rq_next = node; - head->rq_prev = node; - - if (log_fd != NULL) - { - ch_log_lead("RECV ", channel); - fprintf(log_fd, "'"); - if (fwrite(buf, len, 1, log_fd) != 1) - return FAIL; - fprintf(log_fd, "'\n"); - } - return OK; -} - -/* - * Return the first buffer from the channel without removing it. + * Return the first buffer from "channel"/"part" without removing it. * Returns NULL if there is nothing. */ char_u * -channel_peek(channel_T *channel) +channel_peek(channel_T *channel, int part) { - readq_T *head = &channel->ch_head; + readq_T *head = &channel->ch_part[part].ch_head; if (head->rq_next == NULL) return NULL; @@ -1432,16 +1459,16 @@ channel_peek(channel_T *channel) } /* - * Clear the read buffer on channel "channel". + * Clear the read buffer on "channel"/"part". */ - void -channel_clear(channel_T *channel) + static void +channel_clear_one(channel_T *channel, int part) { - jsonq_T *json_head = &channel->ch_json_head; - cbq_T *cb_head = &channel->ch_cb_head; + jsonq_T *json_head = &channel->ch_part[part].ch_json_head; + cbq_T *cb_head = &channel->ch_part[part].ch_cb_head; - while (channel_peek(channel) != NULL) - vim_free(channel_get(channel)); + while (channel_peek(channel, part) != NULL) + vim_free(channel_get(channel, part)); while (cb_head->cq_next != NULL) { @@ -1458,6 +1485,21 @@ channel_clear(channel_T *channel) remove_json_node(json_head, json_head->jq_next); } + vim_free(channel->ch_part[part].ch_callback); + channel->ch_part[part].ch_callback = NULL; +} + +/* + * Clear all the read buffers on "channel". + */ + void +channel_clear(channel_T *channel) +{ + channel_clear_one(channel, PART_SOCK); +#ifdef CHANNEL_PIPES + channel_clear_one(channel, PART_OUT); + channel_clear_one(channel, PART_ERR); +#endif vim_free(channel->ch_callback); channel->ch_callback = NULL; } @@ -1492,7 +1534,7 @@ channel_wait(channel_T *channel, sock_T ch_logn(channel, "Waiting for up to %d msec", timeout); # ifdef WIN32 - if (fd != channel->CH_SOCK) + if (fd != channel->CH_SOCK_FD) { DWORD nread; int diff; @@ -1567,30 +1609,12 @@ channel_get_id(void) } /* - * Get the file descriptor to read from, either the socket or stdout. - * TODO: should have a way to read stderr. - */ - static sock_T -get_read_fd(channel_T *channel) -{ - if (channel->CH_SOCK != CHAN_FD_INVALID) - return channel->CH_SOCK; -#if defined(CHANNEL_PIPES) - if (channel->CH_OUT != CHAN_FD_INVALID) - return channel->CH_OUT; -#endif - ch_error(channel, "channel_read() called while socket is closed"); - return CHAN_FD_INVALID; -} - -/* * Read from channel "channel" for as long as there is something to read. - * "which" is CHAN_SOCK, CHAN_OUT or CHAN_ERR. When -1 use CHAN_SOCK or - * CHAN_OUT, the one that is open. + * "part" is PART_SOCK, PART_OUT or PART_ERR. * The data is put in the read queue. */ void -channel_read(channel_T *channel, int which, char *func) +channel_read(channel_T *channel, int part, char *func) { static char_u *buf = NULL; int len = 0; @@ -1598,13 +1622,13 @@ channel_read(channel_T *channel, int whi sock_T fd; int use_socket = FALSE; - if (which < 0) - fd = get_read_fd(channel); - else - fd = channel->ch_pfd[which].ch_fd; - if (fd == CHAN_FD_INVALID) + fd = channel->ch_part[part].ch_fd; + if (fd == INVALID_FD) + { + ch_error(channel, "channel_read() called while socket is closed"); return; - use_socket = fd == channel->CH_SOCK; + } + use_socket = fd == channel->CH_SOCK_FD; /* Allocate a buffer to read into. */ if (buf == NULL) @@ -1629,7 +1653,7 @@ channel_read(channel_T *channel, int whi break; /* error or nothing more to read */ /* Store the read message in the queue. */ - channel_save(channel, buf, len); + channel_save(channel, part, buf, len); readlen += len; if (len < MAXMSGSIZE) break; /* did read everything that's available */ @@ -1660,7 +1684,8 @@ channel_read(channel_T *channel, int whi * -> channel_read() */ ch_errors(channel, "%s(): Cannot read", func); - channel_save(channel, (char_u *)DETACH_MSG, (int)STRLEN(DETACH_MSG)); + channel_save(channel, part, + (char_u *)DETACH_MSG, (int)STRLEN(DETACH_MSG)); /* TODO: When reading from stdout is not possible, should we try to * keep stdin and stderr open? Probably not, assume the other side @@ -1684,43 +1709,43 @@ channel_read(channel_T *channel, int whi } /* - * Read from RAW or NL channel "channel". Blocks until there is something to + * Read from RAW or NL "channel"/"part". Blocks until there is something to * read or the timeout expires. - * TODO: add "which" argument and read from stderr. * Returns what was read in allocated memory. * Returns NULL in case of error or timeout. */ char_u * -channel_read_block(channel_T *channel) +channel_read_block(channel_T *channel, int part) { char_u *buf; char_u *msg; - ch_mode_T mode = channel->ch_mode; - sock_T fd = get_read_fd(channel); + ch_mode_T mode = channel->ch_part[part].ch_mode; + int timeout = channel->ch_part[part].ch_timeout; + sock_T fd = channel->ch_part[part].ch_fd; char_u *nl; ch_logsn(channel, "Blocking %s read, timeout: %d msec", - mode == MODE_RAW ? "RAW" : "NL", channel->ch_timeout); + mode == MODE_RAW ? "RAW" : "NL", timeout); while (TRUE) { - buf = channel_peek(channel); + buf = channel_peek(channel, part); if (buf != NULL && (mode == MODE_RAW || (mode == MODE_NL && vim_strchr(buf, NL) != NULL))) break; - if (buf != NULL && channel_collapse(channel) == OK) + if (buf != NULL && channel_collapse(channel, part) == OK) continue; /* Wait for up to the channel timeout. */ - if (fd == CHAN_FD_INVALID - || channel_wait(channel, fd, channel->ch_timeout) == FAIL) + if (fd == INVALID_FD + || channel_wait(channel, fd, timeout) == FAIL) return NULL; - channel_read(channel, -1, "channel_read_block"); + channel_read(channel, part, "channel_read_block"); } if (mode == MODE_RAW) { - msg = channel_get_all(channel); + msg = channel_get_all(channel, part); } else { @@ -1728,7 +1753,7 @@ channel_read_block(channel_T *channel) if (nl[1] == NUL) { /* get the whole buffer */ - msg = channel_get(channel); + msg = channel_get(channel, part); *nl = NUL; } else @@ -1745,26 +1770,26 @@ channel_read_block(channel_T *channel) } /* - * Read one JSON message with ID "id" from channel "channel" and store the + * Read one JSON message with ID "id" from "channel"/"part" and store the * result in "rettv". * Blocks until the message is received or the timeout is reached. */ int -channel_read_json_block(channel_T *channel, int id, typval_T **rettv) +channel_read_json_block(channel_T *channel, int part, int id, typval_T **rettv) { int more; sock_T fd; ch_log(channel, "Reading JSON"); - channel->ch_block_id = id; + channel->ch_part[part].ch_block_id = id; for (;;) { - more = channel_parse_json(channel); + more = channel_parse_json(channel, part); /* search for messsage "id" */ - if (channel_get_json(channel, id, rettv) == OK) + if (channel_get_json(channel, part, id, rettv) == OK) { - channel->ch_block_id = 0; + channel->ch_part[part].ch_block_id = 0; return OK; } @@ -1776,40 +1801,40 @@ channel_read_json_block(channel_T *chann continue; /* Wait for up to the channel timeout. */ - fd = get_read_fd(channel); - if (fd == CHAN_FD_INVALID - || channel_wait(channel, fd, channel->ch_timeout) == FAIL) + fd = channel->ch_part[part].ch_fd; + if (fd == INVALID_FD || channel_wait(channel, fd, + channel->ch_part[part].ch_timeout) == FAIL) break; - channel_read(channel, -1, "channel_read_json_block"); + channel_read(channel, part, "channel_read_json_block"); } } - channel->ch_block_id = 0; + channel->ch_part[part].ch_block_id = 0; return FAIL; } # if defined(WIN32) || defined(PROTO) /* - * Lookup the channel from the socket. Set "which" to the fd index. + * Lookup the channel from the socket. Set "part" to the fd index. * Returns NULL when the socket isn't found. */ channel_T * -channel_fd2channel(sock_T fd, int *whichp) +channel_fd2channel(sock_T fd, int *part) { channel_T *channel; - int i; + int part; - if (fd != CHAN_FD_INVALID) + if (fd != INVALID_FD) for (channel = first_channel; channel != NULL; channel = channel->ch_next) { # ifdef CHANNEL_PIPES - for (i = CHAN_SOCK; i < CHAN_IN; ++i) + for (part = PART_SOCK; part < PART_IN; ++part) # else - i = CHAN_SOCK; + part = PART_SOCK; # endif - if (channel->ch_pfd[i].ch_fd == fd) + if (channel->ch_part[part].ch_fd == fd) { - *whichp = i; + *part = part; return channel; } } @@ -1820,7 +1845,7 @@ channel_fd2channel(sock_T fd, int *which channel_handle_events(void) { channel_T *channel; - int which; + int part; static int loop = 0; /* Skip heavily polling */ @@ -1831,44 +1856,35 @@ channel_handle_events(void) { # ifdef FEAT_GUI_W32 /* only check the pipes */ - for (which = CHAN_OUT; which < CHAN_ERR; ++which) + for (part = PART_OUT; part <= PART_ERR; ++part) # else # ifdef CHANNEL_PIPES /* check the socket and pipes */ - for (which = CHAN_SOCK; which < CHAN_ERR; ++which) + for (part = PART_SOCK; part <= PART_ERR; ++part) # else /* only check the socket */ - which = CHAN_SOCK; + part = PART_SOCK; # endif # endif - channel_read(channel, which, "channel_handle_events"); + channel_read(channel, part, "channel_handle_events"); } } # endif /* - * Write "buf" (NUL terminated string) to channel "channel". + * Write "buf" (NUL terminated string) to "channel"/"part". * When "fun" is not NULL an error message might be given. * Return FAIL or OK. */ int -channel_send(channel_T *channel, char_u *buf, char *fun) +channel_send(channel_T *channel, int part, char_u *buf, char *fun) { int len = (int)STRLEN(buf); int res; - sock_T fd = CHAN_FD_INVALID; - int use_socket = FALSE; + sock_T fd; - if (channel->CH_SOCK != CHAN_FD_INVALID) - { - fd = channel->CH_SOCK; - use_socket = TRUE; - } -#if defined(CHANNEL_PIPES) - else if (channel->CH_IN != CHAN_FD_INVALID) - fd = channel->CH_IN; -#endif - if (fd == CHAN_FD_INVALID) + fd = channel->ch_part[part].ch_fd; + if (fd == INVALID_FD) { if (!channel->ch_error && fun != NULL) { @@ -1888,7 +1904,7 @@ channel_send(channel_T *channel, char_u fflush(log_fd); } - if (use_socket) + if (part == PART_SOCK) res = sock_write(fd, (char *)buf, len); else res = fd_write(fd, (char *)buf, len); @@ -1919,25 +1935,25 @@ channel_poll_setup(int nfd_in, void *fds int nfd = nfd_in; channel_T *channel; struct pollfd *fds = fds_in; - int which; + int part; for (channel = first_channel; channel != NULL; channel = channel->ch_next) { # ifdef CHANNEL_PIPES - for (which = CHAN_SOCK; which < CHAN_IN; ++which) + for (part = PART_SOCK; part < PART_IN; ++part) # else - which = CHAN_SOCK; + part = PART_SOCK; # endif { - if (channel->ch_pfd[which].ch_fd != CHAN_FD_INVALID) + if (channel->ch_part[part].ch_fd != INVALID_FD) { - channel->ch_pfd[which].ch_poll_idx = nfd; - fds[nfd].fd = channel->ch_pfd[which].ch_fd; + channel->ch_part[part].ch_poll_idx = nfd; + fds[nfd].fd = channel->ch_part[part].ch_fd; fds[nfd].events = POLLIN; nfd++; } else - channel->ch_pfd[which].ch_poll_idx = -1; + channel->ch_part[part].ch_poll_idx = -1; } } @@ -1953,21 +1969,21 @@ channel_poll_check(int ret_in, void *fds int ret = ret_in; channel_T *channel; struct pollfd *fds = fds_in; - int which; + int part; for (channel = first_channel; channel != NULL; channel = channel->ch_next) { # ifdef CHANNEL_PIPES - for (which = CHAN_SOCK; which < CH_IN; ++which) + for (part = PART_SOCK; part < PART_IN; ++part) # else - which = CHAN_SOCK; + part = PART_SOCK; # endif { - int idx = channel->ch_pfd[which].ch_poll_idx; + int idx = channel->ch_part[part].ch_poll_idx; if (ret > 0 && idx != -1 && fds[idx].revents & POLLIN) { - channel_read(channel, which, "channel_poll_check"); + channel_read(channel, part, "channel_poll_check"); --ret; } } @@ -1987,19 +2003,19 @@ channel_select_setup(int maxfd_in, void int maxfd = maxfd_in; channel_T *channel; fd_set *rfds = rfds_in; - int which; + int part; for (channel = first_channel; channel != NULL; channel = channel->ch_next) { # ifdef CHANNEL_PIPES - for (which = CHAN_SOCK; which < CHAN_IN; ++which) + for (part = PART_SOCK; part < PART_IN; ++part) # else - which = CHAN_SOCK; + part = PART_SOCK; # endif { - sock_T fd = channel->ch_pfd[which].ch_fd; + sock_T fd = channel->ch_part[part].ch_fd; - if (fd != CHAN_FD_INVALID) + if (fd != INVALID_FD) { FD_SET((int)fd, rfds); if (maxfd < (int)fd) @@ -2020,21 +2036,21 @@ channel_select_check(int ret_in, void *r int ret = ret_in; channel_T *channel; fd_set *rfds = rfds_in; - int which; + int part; for (channel = first_channel; channel != NULL; channel = channel->ch_next) { # ifdef CHANNEL_PIPES - for (which = CHAN_SOCK; which < CHAN_IN; ++which) + for (part = PART_SOCK; part < PART_IN; ++part) # else - which = CHAN_SOCK; + part = PART_SOCK; # endif { - sock_T fd = channel->ch_pfd[which].ch_fd; + sock_T fd = channel->ch_part[part].ch_fd; - if (ret > 0 && fd != CHAN_FD_INVALID && FD_ISSET(fd, rfds)) + if (ret > 0 && fd != INVALID_FD && FD_ISSET(fd, rfds)) { - channel_read(channel, which, "channel_select_check"); + channel_read(channel, part, "channel_select_check"); --ret; } } @@ -2055,24 +2071,35 @@ channel_parse_messages(void) channel_T *channel = first_channel; int ret = FALSE; int r; + int part = PART_SOCK; while (channel != NULL) { - /* Increase the refcount, in case the handler causes the channel to be - * unreferenced or closed. */ - ++channel->ch_refcount; - r = may_invoke_callback(channel); - if (channel_unref(channel)) - /* channel was freed, start over */ - channel = first_channel; - - if (r == OK) + if (channel->ch_part[part].ch_fd != INVALID_FD) { - channel = first_channel; /* something was done, start over */ - ret = TRUE; + /* Increase the refcount, in case the handler causes the channel + * to be unreferenced or closed. */ + ++channel->ch_refcount; + r = may_invoke_callback(channel, part); + if (r == OK) + ret = TRUE; + if (channel_unref(channel) || r == OK) + { + /* channel was freed or something was done, start over */ + channel = first_channel; + part = PART_SOCK; + continue; + } } +#ifdef CHANNEL_PIPES + if (part < PART_ERR) + ++part; else +#endif + { channel = channel->ch_next; + part = PART_SOCK; + } } return ret; } @@ -2085,37 +2112,71 @@ set_ref_in_channel(int copyID) { int abort = FALSE; channel_T *channel; + int part; for (channel = first_channel; channel != NULL; channel = channel->ch_next) { - jsonq_T *head = &channel->ch_json_head; - jsonq_T *item = head->jq_next; - - while (item != NULL) +#ifdef CHANNEL_PIPES + for (part = PART_SOCK; part < PART_IN; ++part) +#else + part = PART_SOCK; +#endif { - list_T *l = item->jq_value->vval.v_list; + jsonq_T *head = &channel->ch_part[part].ch_json_head; + jsonq_T *item = head->jq_next; - if (l->lv_copyID != copyID) + while (item != NULL) { - l->lv_copyID = copyID; - abort = abort || set_ref_in_list(l, copyID, NULL); + list_T *l = item->jq_value->vval.v_list; + + if (l->lv_copyID != copyID) + { + l->lv_copyID = copyID; + abort = abort || set_ref_in_list(l, copyID, NULL); + } + item = item->jq_next; } - item = item->jq_next; } } return abort; } /* - * Return the mode of channel "channel". + * Return the "part" to write to for "channel". + */ + int +channel_part_send(channel_T *channel) +{ +#ifdef CHANNEL_PIPES + if (channel->CH_SOCK_FD == INVALID_FD) + return PART_IN; +#endif + return PART_SOCK; +} + +/* + * Return the default "part" to read from for "channel". + */ + int +channel_part_read(channel_T *channel) +{ +#ifdef CHANNEL_PIPES + if (channel->CH_SOCK_FD == INVALID_FD) + return PART_OUT; +#endif + return PART_SOCK; +} + +/* + * Return the mode of "channel"/"part" * If "channel" is invalid returns MODE_JSON. */ ch_mode_T -channel_get_mode(channel_T *channel) +channel_get_mode(channel_T *channel, int part) { if (channel == NULL) return MODE_JSON; - return channel->ch_mode; + return channel->ch_part[part].ch_mode; } #endif /* FEAT_CHANNEL */ diff --git a/src/eval.c b/src/eval.c --- a/src/eval.c +++ b/src/eval.c @@ -10112,33 +10112,42 @@ f_ch_open(typval_T *argvars, typval_T *r static void f_ch_readraw(typval_T *argvars, typval_T *rettv) { - channel_T *channel; + channel_T *channel; + int part; /* return an empty string by default */ rettv->v_type = VAR_STRING; rettv->vval.v_string = NULL; /* TODO: use timeout from the options */ + /* TODO: read from stderr */ channel = get_channel_arg(&argvars[0]); if (channel != NULL) - rettv->vval.v_string = channel_read_block(channel); + { + part = channel_part_read(channel); + rettv->vval.v_string = channel_read_block(channel, part); + } } /* * common for "sendexpr()" and "sendraw()" * Returns the channel if the caller should read the response. + * Sets "part_read" to the the read fd. * Otherwise returns NULL. */ static channel_T * -send_common(typval_T *argvars, char_u *text, int id, char *fun) +send_common(typval_T *argvars, char_u *text, int id, char *fun, int *part_read) { channel_T *channel; jobopt_T opt; + int part_send; channel = get_channel_arg(&argvars[0]); if (channel == NULL) return NULL; + part_send = channel_part_send(channel); + *part_read = channel_part_read(channel); opt.jo_callback = NULL; if (get_job_options(&argvars[2], &opt, JO_CALLBACK) == FAIL) @@ -10147,9 +10156,10 @@ send_common(typval_T *argvars, char_u *t /* Set the callback. An empty callback means no callback and not reading * the response. */ if (opt.jo_callback != NULL && *opt.jo_callback != NUL) - channel_set_req_callback(channel, opt.jo_callback, id); - - if (channel_send(channel, text, fun) == OK && opt.jo_callback == NULL) + channel_set_req_callback(channel, part_send, opt.jo_callback, id); + + if (channel_send(channel, part_send, text, fun) == OK + && opt.jo_callback == NULL) return channel; return NULL; } @@ -10165,6 +10175,8 @@ f_ch_sendexpr(typval_T *argvars, typval_ channel_T *channel; int id; ch_mode_T ch_mode; + int part_send; + int part_read; /* return an empty string by default */ rettv->v_type = VAR_STRING; @@ -10173,11 +10185,12 @@ f_ch_sendexpr(typval_T *argvars, typval_ channel = get_channel_arg(&argvars[0]); if (channel == NULL) return; - - ch_mode = channel_get_mode(channel); - if (ch_mode == MODE_RAW) - { - EMSG(_("E912: cannot use ch_sendexpr() with a raw channel")); + part_send = channel_part_send(channel); + + ch_mode = channel_get_mode(channel, part_send); + if (ch_mode == MODE_RAW || ch_mode == MODE_NL) + { + EMSG(_("E912: cannot use ch_sendexpr() with a raw or nl channel")); return; } @@ -10187,11 +10200,11 @@ f_ch_sendexpr(typval_T *argvars, typval_ if (text == NULL) return; - channel = send_common(argvars, text, id, "sendexpr"); + channel = send_common(argvars, text, id, "sendexpr", &part_read); vim_free(text); if (channel != NULL) { - if (channel_read_json_block(channel, id, &listtv) == OK) + if (channel_read_json_block(channel, part_read, id, &listtv) == OK) { list_T *list = listtv->vval.v_list; @@ -10213,15 +10226,16 @@ f_ch_sendraw(typval_T *argvars, typval_T char_u buf[NUMBUFLEN]; char_u *text; channel_T *channel; + int part_read; /* return an empty string by default */ rettv->v_type = VAR_STRING; rettv->vval.v_string = NULL; text = get_tv_string_buf(&argvars[1], buf); - channel = send_common(argvars, text, 0, "sendraw"); + channel = send_common(argvars, text, 0, "sendraw", &part_read); if (channel != NULL) - rettv->vval.v_string = channel_read_block(channel); + rettv->vval.v_string = channel_read_block(channel, part_read); } /* diff --git a/src/gui_w32.c b/src/gui_w32.c --- a/src/gui_w32.c +++ b/src/gui_w32.c @@ -1930,15 +1930,15 @@ process_message(void) #ifdef FEAT_CHANNEL if (msg.message == WM_NETBEANS) { - int what; - channel_T *channel = channel_fd2channel((sock_T)msg.wParam, &what); + int part; + channel_T *channel = channel_fd2channel((sock_T)msg.wParam, &part); if (channel != NULL) { /* Disable error messages, they can mess up the display and throw * an exception. */ ++emsg_off; - channel_read(channel, what, "process_message"); + channel_read(channel, part, "process_message"); --emsg_off; } return; diff --git a/src/netbeans.c b/src/netbeans.c --- a/src/netbeans.c +++ b/src/netbeans.c @@ -385,7 +385,7 @@ netbeans_parse_messages(void) while (nb_channel != NULL) { - buffer = channel_peek(nb_channel); + buffer = channel_peek(nb_channel, PART_SOCK); if (buffer == NULL) break; /* nothing to read */ @@ -396,7 +396,7 @@ netbeans_parse_messages(void) /* Command isn't complete. If there is no following buffer, * return (wait for more). If there is another buffer following, * prepend the text to that buffer and delete this one. */ - if (channel_collapse(nb_channel) == FAIL) + if (channel_collapse(nb_channel, PART_SOCK) == FAIL) return; } else @@ -409,7 +409,7 @@ netbeans_parse_messages(void) if (*p == NUL) { own_node = TRUE; - channel_get(nb_channel); + channel_get(nb_channel, PART_SOCK); } else own_node = FALSE; @@ -757,7 +757,7 @@ netbeans_end(void) nb_send(char *buf, char *fun) { if (nb_channel != NULL) - channel_send(nb_channel, (char_u *)buf, fun); + channel_send(nb_channel, PART_SOCK, (char_u *)buf, fun); } /* diff --git a/src/proto/channel.pro b/src/proto/channel.pro --- a/src/proto/channel.pro +++ b/src/proto/channel.pro @@ -11,29 +11,30 @@ channel_T *channel_open(char *hostname, void channel_set_pipes(channel_T *channel, sock_T in, sock_T out, sock_T err); void channel_set_job(channel_T *channel, job_T *job); void channel_set_options(channel_T *channel, jobopt_T *options); -void channel_set_req_callback(channel_T *channel, char_u *callback, int id); -char_u *channel_get(channel_T *channel); -int channel_collapse(channel_T *channel); +void channel_set_req_callback(channel_T *channel, int part, char_u *callback, int id); +char_u *channel_get(channel_T *channel, int part); +int channel_collapse(channel_T *channel, int part); int channel_can_write_to(channel_T *channel); int channel_is_open(channel_T *channel); char *channel_status(channel_T *channel); void channel_close(channel_T *channel); -int channel_save(channel_T *channel, char_u *buf, int len); -char_u *channel_peek(channel_T *channel); +char_u *channel_peek(channel_T *channel, int part); void channel_clear(channel_T *channel); void channel_free_all(void); int channel_get_id(void); -void channel_read(channel_T *channel, int which, char *func); -char_u *channel_read_block(channel_T *channel); -int channel_read_json_block(channel_T *channel, int id, typval_T **rettv); -channel_T *channel_fd2channel(sock_T fd, int *whichp); +void channel_read(channel_T *channel, int part, char *func); +char_u *channel_read_block(channel_T *channel, int part); +int channel_read_json_block(channel_T *channel, int part, int id, typval_T **rettv); +channel_T *channel_fd2channel(sock_T fd, int *part); void channel_handle_events(void); -int channel_send(channel_T *channel, char_u *buf, char *fun); +int channel_send(channel_T *channel, int part, char_u *buf, char *fun); int channel_poll_setup(int nfd_in, void *fds_in); int channel_poll_check(int ret_in, void *fds_in); int channel_select_setup(int maxfd_in, void *rfds_in); int channel_select_check(int ret_in, void *rfds_in); int channel_parse_messages(void); int set_ref_in_channel(int copyID); -ch_mode_T channel_get_mode(channel_T *channel); +int channel_part_send(channel_T *channel); +int channel_part_read(channel_T *channel); +ch_mode_T channel_get_mode(channel_T *channel, int part); /* vim: set ft=c : */ diff --git a/src/structs.h b/src/structs.h --- a/src/structs.h +++ b/src/structs.h @@ -1303,19 +1303,19 @@ typedef enum /* Ordering matters, it is used in for loops: IN is last, only SOCK/OUT/ERR * are polled. */ -#define CHAN_SOCK 0 -#define CH_SOCK ch_pfd[CHAN_SOCK].ch_fd +#define PART_SOCK 0 +#define CH_SOCK_FD ch_part[PART_SOCK].ch_fd #if defined(UNIX) || defined(WIN32) # define CHANNEL_PIPES -# define CHAN_FD_INVALID (-1) +# define INVALID_FD (-1) -# define CHAN_OUT 1 -# define CHAN_ERR 2 -# define CHAN_IN 3 -# define CH_OUT ch_pfd[CHAN_OUT].ch_fd -# define CH_ERR ch_pfd[CHAN_ERR].ch_fd -# define CH_IN ch_pfd[CHAN_IN].ch_fd +# define PART_OUT 1 +# define PART_ERR 2 +# define PART_IN 3 +# define CH_OUT_FD ch_part[PART_OUT].ch_fd +# define CH_ERR_FD ch_part[PART_ERR].ch_fd +# define CH_IN_FD ch_part[PART_IN].ch_fd #endif /* The per-fd info for a channel. */ @@ -1335,7 +1335,18 @@ typedef struct { #ifdef WIN32 int ch_inputHandler; /* ret.value of WSAAsyncSelect() */ #endif -} chan_fd_T; + + ch_mode_T ch_mode; + int ch_timeout; /* request timeout in msec */ + + readq_T ch_head; /* header for circular raw read queue */ + jsonq_T ch_json_head; /* header for circular json read queue */ + int ch_block_id; /* ID that channel_read_json_block() is + waiting for */ + + cbq_T ch_cb_head; /* dummy node for per-request callbacks */ + char_u *ch_callback; /* call when a msg is not handled */ +} chanpart_T; struct channel_S { channel_T *ch_next; @@ -1343,9 +1354,7 @@ struct channel_S { int ch_id; /* ID of the channel */ - chan_fd_T ch_pfd[4]; /* info for socket, out, err and in */ - - readq_T ch_head; /* dummy node, header for circular queue */ + chanpart_T ch_part[4]; /* info for socket, out, err and in */ int ch_error; /* When TRUE an error was reported. Avoids * giving pages full of error messages when @@ -1355,15 +1364,7 @@ struct channel_S { void (*ch_close_cb)(void); /* callback for when channel is closed */ - int ch_block_id; /* ID that channel_read_json_block() is - waiting for */ - char_u *ch_callback; /* function to call when a msg is not handled */ - cbq_T ch_cb_head; /* dummy node for pre-request callbacks */ - - ch_mode_T ch_mode; - jsonq_T ch_json_head; /* dummy node, header for circular queue */ - - int ch_timeout; /* request timeout in msec */ + char_u *ch_callback; /* call when any msg is not handled */ job_T *ch_job; /* Job that uses this channel; this does not * count as a reference to avoid a circular @@ -1372,10 +1373,10 @@ struct channel_S { int ch_refcount; /* reference count */ }; -#define JO_MODE 1 -#define JO_CALLBACK 2 -#define JO_WAITTIME 4 -#define JO_TIMEOUT 8 +#define JO_MODE 1 /* all modes */ +#define JO_CALLBACK 2 /* channel callback */ +#define JO_WAITTIME 4 /* only for ch_open() */ +#define JO_TIMEOUT 8 /* all timeouts */ #define JO_ALL 0xffffff /* diff --git a/src/version.c b/src/version.c --- a/src/version.c +++ b/src/version.c @@ -748,6 +748,8 @@ static char *(features[]) = static int included_patches[] = { /* Add new patch number below this line */ /**/ + 1369, +/**/ 1368, /**/ 1367,