diff src/channel.c @ 7788:192ae655ac91 v7.4.1191

commit https://github.com/vim/vim/commit/3b5f929b18492fec291d1ec95a91f54e5912c03b Author: Bram Moolenaar <Bram@vim.org> Date: Thu Jan 28 22:37:01 2016 +0100 patch 7.4.1191 Problem: The channel feature isn't working yet. Solution: Add the connect(), disconnect(), sendexpr() and sendraw() functions. Add initial documentation. Add a demo server.
author Christian Brabandt <cb@256bit.org>
date Thu, 28 Jan 2016 22:45:04 +0100
parents e09af43f98f7
children 2981a37cec61
line wrap: on
line diff
--- a/src/channel.c
+++ b/src/channel.c
@@ -77,11 +77,11 @@ struct readqueue
 typedef struct readqueue queue_T;
 
 typedef struct {
-    sock_T  ch_fd;	/* the socket, -1 for a closed channel */
-    int	    ch_idx;	/* used by channel_poll_setup() */
-    queue_T ch_head;	/* dummy node, header for circular queue */
+    sock_T    ch_fd;	/* the socket, -1 for a closed channel */
+    int	      ch_idx;	/* used by channel_poll_setup() */
+    queue_T   ch_head;	/* dummy node, header for circular queue */
 
-    int	    ch_error;	/* When TRUE an error was reported.  Avoids giving
+    int	      ch_error;	/* When TRUE an error was reported.  Avoids giving
 			 * pages full of error messages when the other side
 			 * has exited, only mention the first error until the
 			 * connection works again. */
@@ -89,13 +89,19 @@ typedef struct {
     XtInputId ch_inputHandler;  /* Cookie for input */
 #endif
 #ifdef FEAT_GUI_GTK
-    gint ch_inputHandler;	/* Cookie for input */
+    gint      ch_inputHandler;	/* Cookie for input */
 #endif
 #ifdef FEAT_GUI_W32
-    int  ch_inputHandler;	/* simply ret.value of WSAAsyncSelect() */
+    int       ch_inputHandler;	/* simply ret.value of WSAAsyncSelect() */
 #endif
 
-    void (*ch_close_cb)(void);	/* callback invoked when channel is closed */
+    void      (*ch_close_cb)(void); /* callback for when channel is closed */
+
+    char_u    *ch_callback;	/* function to call when a msg is not handled */
+    char_u    *ch_req_callback;	/* function to call for current request */
+    int	      ch_will_block;	/* do not use callback right now */
+
+    int	      ch_json_mode;
 } channel_T;
 
 /*
@@ -190,7 +196,7 @@ channel_gui_register(int idx)
 	channel->ch_inputHandler =
 	    XtAppAddInput((XtAppContext)app_context, channel->ch_fd,
 			 (XtPointer)(XtInputReadMask + XtInputExceptMask),
-					messageFromNetbeans, (XtPointer)idx);
+				   messageFromNetbeans, (XtPointer)(long)idx);
 # else
 #  ifdef FEAT_GUI_GTK
     /*
@@ -383,12 +389,152 @@ channel_open(char *hostname, int port_in
 }
 
 /*
+ * Set the json mode of channel "idx" to TRUE or FALSE.
+ */
+    void
+channel_set_json_mode(int idx, int json_mode)
+{
+    channels[idx].ch_json_mode = json_mode;
+}
+
+/*
+ * Set the callback for channel "idx".
+ */
+    void
+channel_set_callback(int idx, char_u *callback)
+{
+    vim_free(channels[idx].ch_callback);
+    channels[idx].ch_callback = vim_strsave(callback);
+}
+
+/*
+ * Set the callback for channel "idx" for the next response.
+ */
+    void
+channel_set_req_callback(int idx, char_u *callback)
+{
+    vim_free(channels[idx].ch_req_callback);
+    channels[idx].ch_req_callback = callback == NULL
+					       ? NULL : vim_strsave(callback);
+}
+
+/*
+ * Set the flag that the callback for channel "idx" should not be used now.
+ */
+    void
+channel_will_block(int idx)
+{
+    channels[idx].ch_will_block = TRUE;
+}
+
+/*
+ * Decode JSON "msg", which must have the form "[nr, expr]".
+ * Put "expr" in "tv".
+ * Return OK or FAIL.
+ */
+    int
+channel_decode_json(char_u *msg, typval_T *tv)
+{
+    js_read_T	reader;
+    typval_T	listtv;
+
+    reader.js_buf = msg;
+    reader.js_eof = TRUE;
+    reader.js_used = 0;
+    json_decode(&reader, &listtv);
+    /* TODO: use the sequence number */
+    if (listtv.v_type == VAR_LIST
+	  && listtv.vval.v_list->lv_len == 2
+	  && listtv.vval.v_list->lv_first->li_tv.v_type == VAR_NUMBER)
+    {
+	/* Move the item from the list and then change the type to avoid the
+	 * item being freed. */
+	*tv = listtv.vval.v_list->lv_last->li_tv;
+	listtv.vval.v_list->lv_last->li_tv.v_type = VAR_NUMBER;
+	list_unref(listtv.vval.v_list);
+	return OK;
+    }
+
+    /* give error message? */
+    clear_tv(&listtv);
+    return FAIL;
+}
+
+/*
+ * Invoke the "callback" on channel "idx".
+ */
+    static void
+invoke_callback(int idx, char_u *callback)
+{
+    typval_T	argv[3];
+    typval_T	rettv;
+    int		dummy;
+    char_u	*msg;
+    int		ret = OK;
+
+    argv[0].v_type = VAR_NUMBER;
+    argv[0].vval.v_number = idx;
+
+    /* Concatenate everything into one buffer.
+     * TODO: only read what the callback will use.
+     * TODO: avoid multiple allocations. */
+    while (channel_collapse(idx) == OK)
+	;
+    msg = channel_get(idx);
+
+    if (channels[idx].ch_json_mode)
+	ret = channel_decode_json(msg, &argv[1]);
+    else
+    {
+	argv[1].v_type = VAR_STRING;
+	argv[1].vval.v_string = msg;
+    }
+
+    if (ret == OK)
+    {
+	call_func(callback, (int)STRLEN(callback),
+				 &rettv, 2, argv, 0L, 0L, &dummy, TRUE, NULL);
+	/* If an echo command was used the cursor needs to be put back where
+	 * it belongs. */
+	setcursor();
+	cursor_on();
+	out_flush();
+    }
+    vim_free(msg);
+}
+
+/*
+ * Invoke a callback for channel "idx" if needed.
+ */
+    static void
+may_invoke_callback(int idx)
+{
+    if (channels[idx].ch_will_block)
+	return;
+    if (channel_peek(idx) == NULL)
+	return;
+
+    if (channels[idx].ch_req_callback != NULL)
+    {
+	/* invoke the one-time callback */
+	invoke_callback(idx, channels[idx].ch_req_callback);
+	channels[idx].ch_req_callback = NULL;
+	return;
+    }
+
+    if (channels[idx].ch_callback != NULL)
+	/* invoke the channel callback */
+	invoke_callback(idx, channels[idx].ch_callback);
+}
+
+/*
  * Return TRUE when channel "idx" is open.
+ * Also returns FALSE or invalid "idx".
  */
     int
 channel_is_open(int idx)
 {
-    return channels[idx].ch_fd >= 0;
+    return idx >= 0 && idx < channel_count && channels[idx].ch_fd >= 0;
 }
 
 /*
@@ -407,6 +553,8 @@ channel_close(int idx)
 #ifdef FEAT_GUI
 	channel_gui_unregister(idx);
 #endif
+	vim_free(channel->ch_callback);
+	channel->ch_callback = NULL;
     }
 }
 
@@ -551,7 +699,57 @@ channel_clear(int idx)
 #define MAXMSGSIZE 4096
 
 /*
- * Read from channel "idx".  The data is put in the read queue.
+ * Check for reading from "fd" with "timeout" msec.
+ * Return FAIL when there is nothing to read.
+ */
+    static int
+channel_wait(int fd, int timeout)
+{
+#ifdef HAVE_SELECT
+    struct timeval	tval;
+    fd_set		rfds;
+    int			ret;
+
+    FD_ZERO(&rfds);
+    FD_SET(fd, &rfds);
+    tval.tv_sec = timeout / 1000;
+    tval.tv_usec = (timeout % 1000) * 1000;
+    for (;;)
+    {
+	ret = select(fd + 1, &rfds, NULL, NULL, &tval);
+# ifdef EINTR
+	if (ret == -1 && errno == EINTR)
+	    continue;
+# endif
+	if (ret <= 0)
+	    return FAIL;
+	break;
+    }
+#else
+    struct pollfd	fds;
+
+    fds.fd = fd;
+    fds.events = POLLIN;
+    if (poll(&fds, 1, timeout) <= 0)
+	return FAIL;
+#endif
+    return OK;
+}
+
+/*
+ * Return a unique ID to be used in a message.
+ */
+    int
+channel_get_id()
+{
+    static int next_id = 1;
+
+    return next_id++;
+}
+
+/*
+ * Read from channel "idx" for as long as there is something to read.
+ * The data is put in the read queue.
  */
     void
 channel_read(int idx)
@@ -559,14 +757,6 @@ channel_read(int idx)
     static char_u	*buf = NULL;
     int			len = 0;
     int			readlen = 0;
-#ifdef HAVE_SELECT
-    struct timeval	tval;
-    fd_set		rfds;
-#else
-# ifdef HAVE_POLL
-    struct pollfd	fds;
-# endif
-#endif
     channel_T		*channel = &channels[idx];
 
     if (channel->ch_fd < 0)
@@ -588,21 +778,8 @@ channel_read(int idx)
      * MAXMSGSIZE long. */
     for (;;)
     {
-#ifdef HAVE_SELECT
-	FD_ZERO(&rfds);
-	FD_SET(channel->ch_fd, &rfds);
-	tval.tv_sec = 0;
-	tval.tv_usec = 0;
-	if (select(channel->ch_fd + 1, &rfds, NULL, NULL, &tval) <= 0)
+	if (channel_wait(channel->ch_fd, 0) == FAIL)
 	    break;
-#else
-# ifdef HAVE_POLL
-	fds.fd = channel->ch_fd;
-	fds.events = POLLIN;
-	if (poll(&fds, 1, 0) <= 0)
-	    break;
-# endif
-#endif
 	len = sock_read(channel->ch_fd, buf, MAXMSGSIZE);
 	if (len <= 0)
 	    break;	/* error or nothing more to read */
@@ -641,12 +818,44 @@ channel_read(int idx)
 	}
     }
 
+    may_invoke_callback(idx);
+
 #if defined(CH_HAS_GUI) && defined(FEAT_GUI_GTK)
     if (CH_HAS_GUI && gtk_main_level() > 0)
 	gtk_main_quit();
 #endif
 }
 
+/*
+ * Read from channel "idx".  Blocks until there is something to read or the
+ * timeout expires.
+ * Returns what was read in allocated memory.
+ * Returns NULL in case of error or timeout.
+ */
+    char_u *
+channel_read_block(int idx)
+{
+    if (channel_peek(idx) == NULL)
+    {
+	/* Wait for up to 2 seconds.
+	 * TODO: use timeout set on the channel. */
+	if (channel_wait(channels[idx].ch_fd, 2000) == FAIL)
+	{
+	    channels[idx].ch_will_block = FALSE;
+	    return NULL;
+	}
+	channel_read(idx);
+    }
+
+    /* Concatenate everything into one buffer.
+     * TODO: avoid multiple allocations. */
+    while (channel_collapse(idx) == OK)
+	;
+
+    channels[idx].ch_will_block = FALSE;
+    return channel_get(idx);
+}
+
 # if defined(FEAT_GUI_W32) || defined(PROTO)
 /*
  * Lookup the channel index from the socket.
@@ -668,8 +877,9 @@ channel_socket2idx(sock_T fd)
 /*
  * Write "buf" (NUL terminated string) to channel "idx".
  * When "fun" is not NULL an error message might be given.
+ * Return FAIL or OK.
  */
-    void
+    int
 channel_send(int idx, char_u *buf, char *fun)
 {
     channel_T	*channel = &channels[idx];
@@ -683,8 +893,10 @@ channel_send(int idx, char_u *buf, char 
 	    EMSG2("E630: %s(): write while not connected", fun);
 	}
 	channel->ch_error = TRUE;
+	return FAIL;
     }
-    else if (sock_write(channel->ch_fd, buf, len) != len)
+
+    if (sock_write(channel->ch_fd, buf, len) != len)
     {
 	if (!channel->ch_error && fun != NULL)
 	{
@@ -692,9 +904,11 @@ channel_send(int idx, char_u *buf, char 
 	    EMSG2("E631: %s(): write failed", fun);
 	}
 	channel->ch_error = TRUE;
+	return FAIL;
     }
-    else
-	channel->ch_error = FALSE;
+
+    channel->ch_error = FALSE;
+    return OK;
 }
 
 # if (defined(UNIX) && !defined(HAVE_SELECT)) || defined(PROTO)