Mercurial > vim
diff src/channel.c @ 7788:192ae655ac91 v7.4.1191
commit https://github.com/vim/vim/commit/3b5f929b18492fec291d1ec95a91f54e5912c03b
Author: Bram Moolenaar <Bram@vim.org>
Date: Thu Jan 28 22:37:01 2016 +0100
patch 7.4.1191
Problem: The channel feature isn't working yet.
Solution: Add the connect(), disconnect(), sendexpr() and sendraw()
functions. Add initial documentation. Add a demo server.
author | Christian Brabandt <cb@256bit.org> |
---|---|
date | Thu, 28 Jan 2016 22:45:04 +0100 |
parents | e09af43f98f7 |
children | 2981a37cec61 |
line wrap: on
line diff
--- a/src/channel.c +++ b/src/channel.c @@ -77,11 +77,11 @@ struct readqueue typedef struct readqueue queue_T; typedef struct { - sock_T ch_fd; /* the socket, -1 for a closed channel */ - int ch_idx; /* used by channel_poll_setup() */ - queue_T ch_head; /* dummy node, header for circular queue */ + sock_T ch_fd; /* the socket, -1 for a closed channel */ + int ch_idx; /* used by channel_poll_setup() */ + queue_T ch_head; /* dummy node, header for circular queue */ - int ch_error; /* When TRUE an error was reported. Avoids giving + int ch_error; /* When TRUE an error was reported. Avoids giving * pages full of error messages when the other side * has exited, only mention the first error until the * connection works again. */ @@ -89,13 +89,19 @@ typedef struct { XtInputId ch_inputHandler; /* Cookie for input */ #endif #ifdef FEAT_GUI_GTK - gint ch_inputHandler; /* Cookie for input */ + gint ch_inputHandler; /* Cookie for input */ #endif #ifdef FEAT_GUI_W32 - int ch_inputHandler; /* simply ret.value of WSAAsyncSelect() */ + int ch_inputHandler; /* simply ret.value of WSAAsyncSelect() */ #endif - void (*ch_close_cb)(void); /* callback invoked when channel is closed */ + void (*ch_close_cb)(void); /* callback for when channel is closed */ + + char_u *ch_callback; /* function to call when a msg is not handled */ + char_u *ch_req_callback; /* function to call for current request */ + int ch_will_block; /* do not use callback right now */ + + int ch_json_mode; } channel_T; /* @@ -190,7 +196,7 @@ channel_gui_register(int idx) channel->ch_inputHandler = XtAppAddInput((XtAppContext)app_context, channel->ch_fd, (XtPointer)(XtInputReadMask + XtInputExceptMask), - messageFromNetbeans, (XtPointer)idx); + messageFromNetbeans, (XtPointer)(long)idx); # else # ifdef FEAT_GUI_GTK /* @@ -383,12 +389,152 @@ channel_open(char *hostname, int port_in } /* + * Set the json mode of channel "idx" to TRUE or FALSE. + */ + void +channel_set_json_mode(int idx, int json_mode) +{ + channels[idx].ch_json_mode = json_mode; +} + +/* + * Set the callback for channel "idx". + */ + void +channel_set_callback(int idx, char_u *callback) +{ + vim_free(channels[idx].ch_callback); + channels[idx].ch_callback = vim_strsave(callback); +} + +/* + * Set the callback for channel "idx" for the next response. + */ + void +channel_set_req_callback(int idx, char_u *callback) +{ + vim_free(channels[idx].ch_req_callback); + channels[idx].ch_req_callback = callback == NULL + ? NULL : vim_strsave(callback); +} + +/* + * Set the flag that the callback for channel "idx" should not be used now. + */ + void +channel_will_block(int idx) +{ + channels[idx].ch_will_block = TRUE; +} + +/* + * Decode JSON "msg", which must have the form "[nr, expr]". + * Put "expr" in "tv". + * Return OK or FAIL. + */ + int +channel_decode_json(char_u *msg, typval_T *tv) +{ + js_read_T reader; + typval_T listtv; + + reader.js_buf = msg; + reader.js_eof = TRUE; + reader.js_used = 0; + json_decode(&reader, &listtv); + /* TODO: use the sequence number */ + if (listtv.v_type == VAR_LIST + && listtv.vval.v_list->lv_len == 2 + && listtv.vval.v_list->lv_first->li_tv.v_type == VAR_NUMBER) + { + /* Move the item from the list and then change the type to avoid the + * item being freed. */ + *tv = listtv.vval.v_list->lv_last->li_tv; + listtv.vval.v_list->lv_last->li_tv.v_type = VAR_NUMBER; + list_unref(listtv.vval.v_list); + return OK; + } + + /* give error message? */ + clear_tv(&listtv); + return FAIL; +} + +/* + * Invoke the "callback" on channel "idx". + */ + static void +invoke_callback(int idx, char_u *callback) +{ + typval_T argv[3]; + typval_T rettv; + int dummy; + char_u *msg; + int ret = OK; + + argv[0].v_type = VAR_NUMBER; + argv[0].vval.v_number = idx; + + /* Concatenate everything into one buffer. + * TODO: only read what the callback will use. + * TODO: avoid multiple allocations. */ + while (channel_collapse(idx) == OK) + ; + msg = channel_get(idx); + + if (channels[idx].ch_json_mode) + ret = channel_decode_json(msg, &argv[1]); + else + { + argv[1].v_type = VAR_STRING; + argv[1].vval.v_string = msg; + } + + if (ret == OK) + { + call_func(callback, (int)STRLEN(callback), + &rettv, 2, argv, 0L, 0L, &dummy, TRUE, NULL); + /* If an echo command was used the cursor needs to be put back where + * it belongs. */ + setcursor(); + cursor_on(); + out_flush(); + } + vim_free(msg); +} + +/* + * Invoke a callback for channel "idx" if needed. + */ + static void +may_invoke_callback(int idx) +{ + if (channels[idx].ch_will_block) + return; + if (channel_peek(idx) == NULL) + return; + + if (channels[idx].ch_req_callback != NULL) + { + /* invoke the one-time callback */ + invoke_callback(idx, channels[idx].ch_req_callback); + channels[idx].ch_req_callback = NULL; + return; + } + + if (channels[idx].ch_callback != NULL) + /* invoke the channel callback */ + invoke_callback(idx, channels[idx].ch_callback); +} + +/* * Return TRUE when channel "idx" is open. + * Also returns FALSE or invalid "idx". */ int channel_is_open(int idx) { - return channels[idx].ch_fd >= 0; + return idx >= 0 && idx < channel_count && channels[idx].ch_fd >= 0; } /* @@ -407,6 +553,8 @@ channel_close(int idx) #ifdef FEAT_GUI channel_gui_unregister(idx); #endif + vim_free(channel->ch_callback); + channel->ch_callback = NULL; } } @@ -551,7 +699,57 @@ channel_clear(int idx) #define MAXMSGSIZE 4096 /* - * Read from channel "idx". The data is put in the read queue. + * Check for reading from "fd" with "timeout" msec. + * Return FAIL when there is nothing to read. + */ + static int +channel_wait(int fd, int timeout) +{ +#ifdef HAVE_SELECT + struct timeval tval; + fd_set rfds; + int ret; + + FD_ZERO(&rfds); + FD_SET(fd, &rfds); + tval.tv_sec = timeout / 1000; + tval.tv_usec = (timeout % 1000) * 1000; + for (;;) + { + ret = select(fd + 1, &rfds, NULL, NULL, &tval); +# ifdef EINTR + if (ret == -1 && errno == EINTR) + continue; +# endif + if (ret <= 0) + return FAIL; + break; + } +#else + struct pollfd fds; + + fds.fd = fd; + fds.events = POLLIN; + if (poll(&fds, 1, timeout) <= 0) + return FAIL; +#endif + return OK; +} + +/* + * Return a unique ID to be used in a message. + */ + int +channel_get_id() +{ + static int next_id = 1; + + return next_id++; +} + +/* + * Read from channel "idx" for as long as there is something to read. + * The data is put in the read queue. */ void channel_read(int idx) @@ -559,14 +757,6 @@ channel_read(int idx) static char_u *buf = NULL; int len = 0; int readlen = 0; -#ifdef HAVE_SELECT - struct timeval tval; - fd_set rfds; -#else -# ifdef HAVE_POLL - struct pollfd fds; -# endif -#endif channel_T *channel = &channels[idx]; if (channel->ch_fd < 0) @@ -588,21 +778,8 @@ channel_read(int idx) * MAXMSGSIZE long. */ for (;;) { -#ifdef HAVE_SELECT - FD_ZERO(&rfds); - FD_SET(channel->ch_fd, &rfds); - tval.tv_sec = 0; - tval.tv_usec = 0; - if (select(channel->ch_fd + 1, &rfds, NULL, NULL, &tval) <= 0) + if (channel_wait(channel->ch_fd, 0) == FAIL) break; -#else -# ifdef HAVE_POLL - fds.fd = channel->ch_fd; - fds.events = POLLIN; - if (poll(&fds, 1, 0) <= 0) - break; -# endif -#endif len = sock_read(channel->ch_fd, buf, MAXMSGSIZE); if (len <= 0) break; /* error or nothing more to read */ @@ -641,12 +818,44 @@ channel_read(int idx) } } + may_invoke_callback(idx); + #if defined(CH_HAS_GUI) && defined(FEAT_GUI_GTK) if (CH_HAS_GUI && gtk_main_level() > 0) gtk_main_quit(); #endif } +/* + * Read from channel "idx". Blocks until there is something to read or the + * timeout expires. + * Returns what was read in allocated memory. + * Returns NULL in case of error or timeout. + */ + char_u * +channel_read_block(int idx) +{ + if (channel_peek(idx) == NULL) + { + /* Wait for up to 2 seconds. + * TODO: use timeout set on the channel. */ + if (channel_wait(channels[idx].ch_fd, 2000) == FAIL) + { + channels[idx].ch_will_block = FALSE; + return NULL; + } + channel_read(idx); + } + + /* Concatenate everything into one buffer. + * TODO: avoid multiple allocations. */ + while (channel_collapse(idx) == OK) + ; + + channels[idx].ch_will_block = FALSE; + return channel_get(idx); +} + # if defined(FEAT_GUI_W32) || defined(PROTO) /* * Lookup the channel index from the socket. @@ -668,8 +877,9 @@ channel_socket2idx(sock_T fd) /* * Write "buf" (NUL terminated string) to channel "idx". * When "fun" is not NULL an error message might be given. + * Return FAIL or OK. */ - void + int channel_send(int idx, char_u *buf, char *fun) { channel_T *channel = &channels[idx]; @@ -683,8 +893,10 @@ channel_send(int idx, char_u *buf, char EMSG2("E630: %s(): write while not connected", fun); } channel->ch_error = TRUE; + return FAIL; } - else if (sock_write(channel->ch_fd, buf, len) != len) + + if (sock_write(channel->ch_fd, buf, len) != len) { if (!channel->ch_error && fun != NULL) { @@ -692,9 +904,11 @@ channel_send(int idx, char_u *buf, char EMSG2("E631: %s(): write failed", fun); } channel->ch_error = TRUE; + return FAIL; } - else - channel->ch_error = FALSE; + + channel->ch_error = FALSE; + return OK; } # if (defined(UNIX) && !defined(HAVE_SELECT)) || defined(PROTO)