comparison src/channel.c @ 8031:ece323e2b57f v7.4.1310

commit https://github.com/vim/vim/commit/6463ca229cb9412581419497924c85fcbfc854ab Author: Bram Moolenaar <Bram@vim.org> Date: Sat Feb 13 17:04:46 2016 +0100 patch 7.4.1310 Problem: Jobs don't open a channel. Solution: Create pipes and add them to the channel. Add ch_logfile(). Only Unix for now.
author Christian Brabandt <cb@256bit.org>
date Sat, 13 Feb 2016 17:15:05 +0100
parents b2cfa3416ba0
children 72324c2e890a
comparison
equal deleted inserted replaced
8030:05f57db9d8da 8031:ece323e2b57f
11 */ 11 */
12 12
13 #include "vim.h" 13 #include "vim.h"
14 14
15 #if defined(FEAT_CHANNEL) || defined(PROTO) 15 #if defined(FEAT_CHANNEL) || defined(PROTO)
16
17 /*
18 * Change the zero to 1 to enable debugging.
19 * This will write a file "channel_debug.log".
20 */
21 #if 0
22 # define CHERROR(fmt, arg) cherror(fmt, arg)
23 # define CHLOG(idx, send, buf) chlog(idx, send, buf)
24 # define CHFILE "channel_debug.log"
25
26 static void cherror(char *fmt, char *arg);
27 static void chlog(int send, char_u *buf);
28 #else
29 # define CHERROR(fmt, arg)
30 # define CHLOG(idx, send, buf)
31 #endif
32 16
33 /* TRUE when netbeans is running with a GUI. */ 17 /* TRUE when netbeans is running with a GUI. */
34 #ifdef FEAT_GUI 18 #ifdef FEAT_GUI
35 # define CH_HAS_GUI (gui.in_use || gui.starting) 19 # define CH_HAS_GUI (gui.in_use || gui.starting)
36 #endif 20 #endif
68 52
69 #ifdef FEAT_GUI_W32 53 #ifdef FEAT_GUI_W32
70 extern HWND s_hwnd; /* Gvim's Window handle */ 54 extern HWND s_hwnd; /* Gvim's Window handle */
71 #endif 55 #endif
72 56
73 struct readqueue
74 {
75 char_u *buffer;
76 struct readqueue *next;
77 struct readqueue *prev;
78 };
79 typedef struct readqueue readq_T;
80
81 struct jsonqueue
82 {
83 typval_T *value;
84 struct jsonqueue *next;
85 struct jsonqueue *prev;
86 };
87 typedef struct jsonqueue jsonq_T;
88
89 struct cbqueue
90 {
91 char_u *callback;
92 int seq_nr;
93 struct cbqueue *next;
94 struct cbqueue *prev;
95 };
96 typedef struct cbqueue cbq_T;
97
98 typedef struct {
99 sock_T ch_fd; /* the socket, -1 for a closed channel */
100 int ch_idx; /* used by channel_poll_setup() */
101 readq_T ch_head; /* dummy node, header for circular queue */
102
103 int ch_error; /* When TRUE an error was reported. Avoids giving
104 * pages full of error messages when the other side
105 * has exited, only mention the first error until the
106 * connection works again. */
107 #ifdef FEAT_GUI_X11
108 XtInputId ch_inputHandler; /* Cookie for input */
109 #endif
110 #ifdef FEAT_GUI_GTK
111 gint ch_inputHandler; /* Cookie for input */
112 #endif
113 #ifdef WIN32
114 int ch_inputHandler; /* simply ret.value of WSAAsyncSelect() */
115 #endif
116
117 void (*ch_close_cb)(void); /* callback for when channel is closed */
118
119 int ch_block_id; /* ID that channel_read_json_block() is
120 waiting for */
121 char_u *ch_callback; /* function to call when a msg is not handled */
122 cbq_T ch_cb_head; /* dummy node for pre-request callbacks */
123
124 ch_mode_T ch_mode;
125 jsonq_T ch_json_head; /* dummy node, header for circular queue */
126
127 int ch_timeout; /* request timeout in msec */
128 } channel_T;
129
130 /* 57 /*
131 * Information about all channels. 58 * Information about all channels.
132 * There can be gaps for closed channels, they will be reused later. 59 * There can be gaps for closed channels, they will be reused later.
133 */ 60 */
134 static channel_T *channels = NULL; 61 static channel_T *channels = NULL;
135 static int channel_count = 0; 62 static int channel_count = 0;
136 63
137 /* 64 /* Log file opened with ch_logfile(). */
138 * TODO: open debug file when desired. 65 static FILE *log_fd = NULL;
139 */ 66
140 FILE *debugfd = NULL; 67 void
68 ch_logfile(FILE *file)
69 {
70 if (log_fd != NULL)
71 fclose(log_fd);
72 log_fd = file;
73 if (log_fd != NULL)
74 fprintf(log_fd, "==== start log session ====\n");
75 }
76
77 static void
78 ch_log_lead(char *what, int ch_idx)
79 {
80 if (log_fd != NULL)
81 {
82 if (ch_idx >= 0)
83 fprintf(log_fd, "%son %d: ", what, ch_idx);
84 else
85 fprintf(log_fd, "%s: ", what);
86 }
87 }
88
89 static void
90 ch_log(int ch_idx, char *msg)
91 {
92 if (log_fd != NULL)
93 {
94 ch_log_lead("", ch_idx);
95 fputs(msg, log_fd);
96 fflush(log_fd);
97 }
98 }
99
100 static void
101 ch_logn(int ch_idx, char *msg, int nr)
102 {
103 if (log_fd != NULL)
104 {
105 ch_log_lead("", ch_idx);
106 fprintf(log_fd, msg, nr);
107 fflush(log_fd);
108 }
109 }
110
111 static void
112 ch_logs(int ch_idx, char *msg, char *name)
113 {
114 if (log_fd != NULL)
115 {
116 ch_log_lead("", ch_idx);
117 fprintf(log_fd, msg, name);
118 fflush(log_fd);
119 }
120 }
121
122 static void
123 ch_logsn(int ch_idx, char *msg, char *name, int nr)
124 {
125 if (log_fd != NULL)
126 {
127 ch_log_lead("", ch_idx);
128 fprintf(log_fd, msg, name, nr);
129 fflush(log_fd);
130 }
131 }
132
133 static void
134 ch_error(int ch_idx, char *msg)
135 {
136 if (log_fd != NULL)
137 {
138 ch_log_lead("ERR ", ch_idx);
139 fputs(msg, log_fd);
140 fflush(log_fd);
141 }
142 }
143
144 static void
145 ch_errorn(int ch_idx, char *msg, int nr)
146 {
147 if (log_fd != NULL)
148 {
149 ch_log_lead("ERR ", ch_idx);
150 fprintf(log_fd, msg, nr);
151 fflush(log_fd);
152 }
153 }
154
155 static void
156 ch_errors(int ch_idx, char *msg, char *arg)
157 {
158 if (log_fd != NULL)
159 {
160 ch_log_lead("ERR ", ch_idx);
161 fprintf(log_fd, msg, arg);
162 fflush(log_fd);
163 }
164 }
141 165
142 #ifdef _WIN32 166 #ifdef _WIN32
143 # undef PERROR 167 # undef PERROR
144 # define PERROR(msg) (void)emsg3((char_u *)"%s: %s", \ 168 # define PERROR(msg) (void)emsg3((char_u *)"%s: %s", \
145 (char_u *)msg, (char_u *)strerror_win32(errno)) 169 (char_u *)msg, (char_u *)strerror_win32(errno))
179 } 203 }
180 return msgbuf; 204 return msgbuf;
181 } 205 }
182 #endif 206 #endif
183 207
184 /* 208 static void
185 * Add a new channel slot, return the index. 209 init_channel(int ch_idx)
186 * The channel isn't actually used into ch_fd is set >= 0; 210 {
187 * Returns -1 if all channels are in use.
188 */
189 static int
190 add_channel(void)
191 {
192 int idx;
193 channel_T *ch; 211 channel_T *ch;
194 212
195 if (channels != NULL) 213 ch = &channels[ch_idx];
196 {
197 for (idx = 0; idx < channel_count; ++idx)
198 if (channels[idx].ch_fd < 0)
199 /* re-use a closed channel slot */
200 return idx;
201 if (channel_count == MAX_OPEN_CHANNELS)
202 return -1;
203 }
204 else
205 {
206 channels = (channel_T *)alloc((int)sizeof(channel_T)
207 * MAX_OPEN_CHANNELS);
208 if (channels == NULL)
209 return -1;
210 }
211
212 ch = &channels[channel_count];
213 (void)vim_memset(ch, 0, sizeof(channel_T)); 214 (void)vim_memset(ch, 0, sizeof(channel_T));
214 215
215 ch->ch_fd = (sock_T)-1; 216 ch->ch_sock = (sock_T)-1;
217 #ifdef CHANNEL_PIPES
218 ch->ch_in = -1;
219 ch->ch_out = -1;
220 ch->ch_err = -1;
221 #endif
216 #ifdef FEAT_GUI_X11 222 #ifdef FEAT_GUI_X11
217 ch->ch_inputHandler = (XtInputId)NULL; 223 ch->ch_inputHandler = (XtInputId)NULL;
218 #endif 224 #endif
219 #ifdef FEAT_GUI_GTK 225 #ifdef FEAT_GUI_GTK
220 ch->ch_inputHandler = 0; 226 ch->ch_inputHandler = 0;
229 ch->ch_cb_head.prev = &ch->ch_cb_head; 235 ch->ch_cb_head.prev = &ch->ch_cb_head;
230 ch->ch_json_head.next = &ch->ch_json_head; 236 ch->ch_json_head.next = &ch->ch_json_head;
231 ch->ch_json_head.prev = &ch->ch_json_head; 237 ch->ch_json_head.prev = &ch->ch_json_head;
232 238
233 ch->ch_timeout = 2000; 239 ch->ch_timeout = 2000;
234 240 }
241
242 /*
243 * Add a new channel slot, return the index.
244 * The channel isn't actually used into ch_sock is set >= 0;
245 * Returns -1 if all channels are in use.
246 */
247 int
248 add_channel(void)
249 {
250 int ch_idx;
251
252 if (channels != NULL)
253 {
254 for (ch_idx = 0; ch_idx < channel_count; ++ch_idx)
255 if (!channel_is_open(ch_idx))
256 {
257 /* re-use a closed channel slot */
258 init_channel(ch_idx);
259 ch_log(ch_idx, "Opening channel (used before)\n");
260 return ch_idx;
261 }
262 if (channel_count == MAX_OPEN_CHANNELS)
263 return -1;
264 }
265 else
266 {
267 channels = (channel_T *)alloc((int)sizeof(channel_T)
268 * MAX_OPEN_CHANNELS);
269 if (channels == NULL)
270 return -1;
271 }
272 init_channel(channel_count);
273 ch_log(channel_count, "Opening new channel\n");
235 return channel_count++; 274 return channel_count++;
236 } 275 }
237 276
238 #if defined(FEAT_GUI) || defined(PROTO) 277 #if defined(FEAT_GUI) || defined(PROTO)
239 /* 278 /*
243 static void 282 static void
244 messageFromNetbeans(XtPointer clientData, 283 messageFromNetbeans(XtPointer clientData,
245 int *unused1 UNUSED, 284 int *unused1 UNUSED,
246 XtInputId *unused2 UNUSED) 285 XtInputId *unused2 UNUSED)
247 { 286 {
248 channel_read((int)(long)clientData); 287 channel_read((int)(long)clientData, FALSE, "messageFromNetbeans");
249 } 288 }
250 #endif 289 #endif
251 290
252 #ifdef FEAT_GUI_GTK 291 #ifdef FEAT_GUI_GTK
253 static void 292 static void
254 messageFromNetbeans(gpointer clientData, 293 messageFromNetbeans(gpointer clientData,
255 gint unused1 UNUSED, 294 gint unused1 UNUSED,
256 GdkInputCondition unused2 UNUSED) 295 GdkInputCondition unused2 UNUSED)
257 { 296 {
258 channel_read((int)(long)clientData); 297 channel_read((int)(long)clientData, FALSE, "messageFromNetbeans");
259 } 298 }
260 #endif 299 #endif
261 300
262 static void 301 static void
263 channel_gui_register(int idx) 302 channel_gui_register(int ch_idx)
264 { 303 {
265 channel_T *channel = &channels[idx]; 304 channel_T *channel = &channels[ch_idx];
266 305
267 if (!CH_HAS_GUI) 306 if (!CH_HAS_GUI)
268 return; 307 return;
269 308
309 /* TODO: pipes */
270 # ifdef FEAT_GUI_X11 310 # ifdef FEAT_GUI_X11
271 /* tell notifier we are interested in being called 311 /* tell notifier we are interested in being called
272 * when there is input on the editor connection socket 312 * when there is input on the editor connection socket
273 */ 313 */
274 if (channel->ch_inputHandler == (XtInputId)NULL) 314 if (channel->ch_inputHandler == (XtInputId)NULL)
275 channel->ch_inputHandler = 315 channel->ch_inputHandler =
276 XtAppAddInput((XtAppContext)app_context, channel->ch_fd, 316 XtAppAddInput((XtAppContext)app_context, channel->ch_sock,
277 (XtPointer)(XtInputReadMask + XtInputExceptMask), 317 (XtPointer)(XtInputReadMask + XtInputExceptMask),
278 messageFromNetbeans, (XtPointer)(long)idx); 318 messageFromNetbeans, (XtPointer)(long)ch_idx);
279 # else 319 # else
280 # ifdef FEAT_GUI_GTK 320 # ifdef FEAT_GUI_GTK
281 /* 321 /*
282 * Tell gdk we are interested in being called when there 322 * Tell gdk we are interested in being called when there
283 * is input on the editor connection socket 323 * is input on the editor connection socket
284 */ 324 */
285 if (channel->ch_inputHandler == 0) 325 if (channel->ch_inputHandler == 0)
286 channel->ch_inputHandler = 326 channel->ch_inputHandler =
287 gdk_input_add((gint)channel->ch_fd, (GdkInputCondition) 327 gdk_input_add((gint)channel->ch_sock, (GdkInputCondition)
288 ((int)GDK_INPUT_READ + (int)GDK_INPUT_EXCEPTION), 328 ((int)GDK_INPUT_READ + (int)GDK_INPUT_EXCEPTION),
289 messageFromNetbeans, (gpointer)(long)idx); 329 messageFromNetbeans, (gpointer)(long)ch_idx);
290 # else 330 # else
291 # ifdef FEAT_GUI_W32 331 # ifdef FEAT_GUI_W32
292 /* 332 /*
293 * Tell Windows we are interested in receiving message when there 333 * Tell Windows we are interested in receiving message when there
294 * is input on the editor connection socket. 334 * is input on the editor connection socket.
295 */ 335 */
296 if (channel->ch_inputHandler == -1) 336 if (channel->ch_inputHandler == -1)
297 channel->ch_inputHandler = 337 channel->ch_inputHandler =
298 WSAAsyncSelect(channel->ch_fd, s_hwnd, WM_NETBEANS, FD_READ); 338 WSAAsyncSelect(channel->ch_sock, s_hwnd, WM_NETBEANS, FD_READ);
299 # endif 339 # endif
300 # endif 340 # endif
301 # endif 341 # endif
302 } 342 }
303 343
309 channel_gui_register_all(void) 349 channel_gui_register_all(void)
310 { 350 {
311 int i; 351 int i;
312 352
313 for (i = 0; i < channel_count; ++i) 353 for (i = 0; i < channel_count; ++i)
314 if (channels[i].ch_fd >= 0) 354 /* TODO: pipes */
355 if (channels[i].ch_sock >= 0)
315 channel_gui_register(i); 356 channel_gui_register(i);
316 } 357 }
317 358
318 static void 359 static void
319 channel_gui_unregister(int idx) 360 channel_gui_unregister(int ch_idx)
320 { 361 {
321 channel_T *channel = &channels[idx]; 362 channel_T *channel = &channels[ch_idx];
322 363
364 /* TODO: pipes */
323 # ifdef FEAT_GUI_X11 365 # ifdef FEAT_GUI_X11
324 if (channel->ch_inputHandler != (XtInputId)NULL) 366 if (channel->ch_inputHandler != (XtInputId)NULL)
325 { 367 {
326 XtRemoveInput(channel->ch_inputHandler); 368 XtRemoveInput(channel->ch_inputHandler);
327 channel->ch_inputHandler = (XtInputId)NULL; 369 channel->ch_inputHandler = (XtInputId)NULL;
335 } 377 }
336 # else 378 # else
337 # ifdef FEAT_GUI_W32 379 # ifdef FEAT_GUI_W32
338 if (channel->ch_inputHandler == 0) 380 if (channel->ch_inputHandler == 0)
339 { 381 {
340 WSAAsyncSelect(channel->ch_fd, s_hwnd, 0, 0); 382 WSAAsyncSelect(channel->ch_sock, s_hwnd, 0, 0);
341 channel->ch_inputHandler = -1; 383 channel->ch_inputHandler = -1;
342 } 384 }
343 # endif 385 # endif
344 # endif 386 # endif
345 # endif 387 # endif
346 } 388 }
347 389
348 #endif 390 #endif
349 391
350 /* 392 /*
351 * Open a channel to "hostname":"port". 393 * Open a socket channel to "hostname":"port".
352 * Returns the channel number for success. 394 * Returns the channel number for success.
353 * Returns a negative number for failure. 395 * Returns a negative number for failure.
354 */ 396 */
355 int 397 int
356 channel_open(char *hostname, int port_in, int waittime, void (*close_cb)(void)) 398 channel_open(char *hostname, int port_in, int waittime, void (*close_cb)(void))
362 u_short port = port_in; 404 u_short port = port_in;
363 u_long val = 1; 405 u_long val = 1;
364 #else 406 #else
365 int port = port_in; 407 int port = port_in;
366 #endif 408 #endif
367 int idx; 409 int ch_idx;
368 int ret; 410 int ret;
369 411
370 #ifdef WIN32 412 #ifdef WIN32
371 channel_init_winsock(); 413 channel_init_winsock();
372 #endif 414 #endif
373 415
374 idx = add_channel(); 416 ch_idx = add_channel();
375 if (idx < 0) 417 if (ch_idx < 0)
376 { 418 {
377 CHERROR("All channels are in use\n", ""); 419 ch_error(-1, "All channels are in use.\n");
378 EMSG(_("E897: All channels are in use")); 420 EMSG(_("E897: All channels are in use"));
379 return -1; 421 return -1;
380 } 422 }
381 423
382 if ((sd = (sock_T)socket(AF_INET, SOCK_STREAM, 0)) == (sock_T)-1) 424 if ((sd = (sock_T)socket(AF_INET, SOCK_STREAM, 0)) == (sock_T)-1)
383 { 425 {
384 CHERROR("error in socket() in channel_open()\n", ""); 426 ch_error(-1, "in socket() in channel_open().\n");
385 PERROR("E898: socket() in channel_open()"); 427 PERROR("E898: socket() in channel_open()");
386 return -1; 428 return -1;
387 } 429 }
388 430
389 /* Get the server internet address and put into addr structure */ 431 /* Get the server internet address and put into addr structure */
391 vim_memset((char *)&server, 0, sizeof(server)); 433 vim_memset((char *)&server, 0, sizeof(server));
392 server.sin_family = AF_INET; 434 server.sin_family = AF_INET;
393 server.sin_port = htons(port); 435 server.sin_port = htons(port);
394 if ((host = gethostbyname(hostname)) == NULL) 436 if ((host = gethostbyname(hostname)) == NULL)
395 { 437 {
396 CHERROR("error in gethostbyname() in channel_open()\n", ""); 438 ch_error(-1, "in gethostbyname() in channel_open()\n");
397 PERROR("E901: gethostbyname() in channel_open()"); 439 PERROR("E901: gethostbyname() in channel_open()");
398 sock_close(sd); 440 sock_close(sd);
399 return -1; 441 return -1;
400 } 442 }
401 memcpy((char *)&server.sin_addr, host->h_addr, host->h_length); 443 memcpy((char *)&server.sin_addr, host->h_addr, host->h_length);
410 fcntl(sd, F_SETFL, O_NONBLOCK) < 0 452 fcntl(sd, F_SETFL, O_NONBLOCK) < 0
411 #endif 453 #endif
412 ) 454 )
413 { 455 {
414 SOCK_ERRNO; 456 SOCK_ERRNO;
415 CHERROR("channel_open: Connect failed with errno %d\n", errno); 457 ch_errorn(-1, "channel_open: Connect failed with errno %d\n",
458 errno);
416 sock_close(sd); 459 sock_close(sd);
417 return -1; 460 return -1;
418 } 461 }
419 } 462 }
420 463
421 /* Try connecting to the server. */ 464 /* Try connecting to the server. */
465 ch_logsn(-1, "Connecting to %s port %d", hostname, port);
422 ret = connect(sd, (struct sockaddr *)&server, sizeof(server)); 466 ret = connect(sd, (struct sockaddr *)&server, sizeof(server));
423 SOCK_ERRNO; 467 SOCK_ERRNO;
424 if (ret < 0) 468 if (ret < 0)
425 { 469 {
426 if (errno != EWOULDBLOCK && errno != EINPROGRESS) 470 if (errno != EWOULDBLOCK
427 { 471 #ifdef EINPROGRESS
428 CHERROR("channel_open: Connect failed with errno %d\n", errno); 472 && errno != EINPROGRESS
429 CHERROR("Cannot connect to port\n", ""); 473 #endif
474 )
475 {
476 ch_errorn(-1, "channel_open: Connect failed with errno %d\n",
477 errno);
430 PERROR(_("E902: Cannot connect to port")); 478 PERROR(_("E902: Cannot connect to port"));
431 sock_close(sd); 479 sock_close(sd);
432 return -1; 480 return -1;
433 } 481 }
434 } 482 }
444 tv.tv_usec = (waittime % 1000) * 1000; 492 tv.tv_usec = (waittime % 1000) * 1000;
445 ret = select((int)sd + 1, NULL, &wfds, NULL, &tv); 493 ret = select((int)sd + 1, NULL, &wfds, NULL, &tv);
446 if (ret < 0) 494 if (ret < 0)
447 { 495 {
448 SOCK_ERRNO; 496 SOCK_ERRNO;
449 CHERROR("channel_open: Connect failed with errno %d\n", errno); 497 ch_errorn(-1, "channel_open: Connect failed with errno %d\n",
450 CHERROR("Cannot connect to port\n", ""); 498 errno);
451 PERROR(_("E902: Cannot connect to port")); 499 PERROR(_("E902: Cannot connect to port"));
452 sock_close(sd); 500 sock_close(sd);
453 return -1; 501 return -1;
454 } 502 }
455 if (!FD_ISSET(sd, &wfds)) 503 if (!FD_ISSET(sd, &wfds))
475 { 523 {
476 sock_close(sd); 524 sock_close(sd);
477 if ((sd = (sock_T)socket(AF_INET, SOCK_STREAM, 0)) == (sock_T)-1) 525 if ((sd = (sock_T)socket(AF_INET, SOCK_STREAM, 0)) == (sock_T)-1)
478 { 526 {
479 SOCK_ERRNO; 527 SOCK_ERRNO;
480 CHERROR("socket() retry in channel_open()\n", ""); 528 ch_log(-1, "socket() retry in channel_open()\n");
481 PERROR("E900: socket() retry in channel_open()"); 529 PERROR("E900: socket() retry in channel_open()");
482 return -1; 530 return -1;
483 } 531 }
484 if (connect(sd, (struct sockaddr *)&server, sizeof(server))) 532 if (connect(sd, (struct sockaddr *)&server, sizeof(server)))
485 { 533 {
488 536
489 SOCK_ERRNO; 537 SOCK_ERRNO;
490 while (retries-- && ((errno == ECONNREFUSED) 538 while (retries-- && ((errno == ECONNREFUSED)
491 || (errno == EINTR))) 539 || (errno == EINTR)))
492 { 540 {
493 CHERROR("retrying...\n", ""); 541 ch_log(-1, "retrying...\n");
494 mch_delay(3000L, TRUE); 542 mch_delay(3000L, TRUE);
495 ui_breakcheck(); 543 ui_breakcheck();
496 if (got_int) 544 if (got_int)
497 { 545 {
498 errno = EINTR; 546 errno = EINTR;
507 SOCK_ERRNO; 555 SOCK_ERRNO;
508 } 556 }
509 if (!success) 557 if (!success)
510 { 558 {
511 /* Get here when the server can't be found. */ 559 /* Get here when the server can't be found. */
512 CHERROR("Cannot connect to port after retry\n", ""); 560 ch_error(-1, "Cannot connect to port after retry\n");
513 PERROR(_("E899: Cannot connect to port after retry2")); 561 PERROR(_("E899: Cannot connect to port after retry2"));
514 sock_close(sd); 562 sock_close(sd);
515 return -1; 563 return -1;
516 } 564 }
517 } 565 }
518 } 566 }
519 567
520 channels[idx].ch_fd = sd; 568 channels[ch_idx].ch_sock = sd;
521 channels[idx].ch_close_cb = close_cb; 569 channels[ch_idx].ch_close_cb = close_cb;
522 570
523 #ifdef FEAT_GUI 571 #ifdef FEAT_GUI
524 channel_gui_register(idx); 572 channel_gui_register(ch_idx);
525 #endif 573 #endif
526 574
527 return idx; 575 return ch_idx;
528 } 576 }
529 577
530 /* 578 #if defined(CHANNEL_PIPES) || defined(PROTO)
531 * Set the json mode of channel "idx" to "ch_mode".
532 */
533 void 579 void
534 channel_set_json_mode(int idx, ch_mode_T ch_mode) 580 channel_set_pipes(int ch_idx, int in, int out, int err)
535 { 581 {
536 channels[idx].ch_mode = ch_mode; 582 channel_T *channel = &channels[ch_idx];
537 } 583
538 584 channel->ch_in = in;
539 /* 585 channel->ch_out = out;
540 * Set the read timeout of channel "idx". 586 channel->ch_err = err;
541 */ 587 }
588 #endif
589
542 void 590 void
543 channel_set_timeout(int idx, int timeout) 591 channel_set_job(int ch_idx, job_T *job)
544 { 592 {
545 channels[idx].ch_timeout = timeout; 593 channels[ch_idx].ch_job = job;
546 } 594 }
547 595
548 /* 596 /*
549 * Set the callback for channel "idx". 597 * Set the json mode of channel "ch_idx" to "ch_mode".
550 */ 598 */
551 void 599 void
552 channel_set_callback(int idx, char_u *callback) 600 channel_set_json_mode(int ch_idx, ch_mode_T ch_mode)
553 { 601 {
554 vim_free(channels[idx].ch_callback); 602 channels[ch_idx].ch_mode = ch_mode;
555 channels[idx].ch_callback = vim_strsave(callback); 603 }
556 } 604
557 605 /*
558 /* 606 * Set the read timeout of channel "ch_idx".
559 * Set the callback for channel "idx" for the response with "id".
560 */ 607 */
561 void 608 void
562 channel_set_req_callback(int idx, char_u *callback, int id) 609 channel_set_timeout(int ch_idx, int timeout)
563 { 610 {
564 cbq_T *cbhead = &channels[idx].ch_cb_head; 611 channels[ch_idx].ch_timeout = timeout;
612 }
613
614 /*
615 * Set the callback for channel "ch_idx".
616 */
617 void
618 channel_set_callback(int ch_idx, char_u *callback)
619 {
620 vim_free(channels[ch_idx].ch_callback);
621 channels[ch_idx].ch_callback = vim_strsave(callback);
622 }
623
624 /*
625 * Set the callback for channel "ch_idx" for the response with "id".
626 */
627 void
628 channel_set_req_callback(int ch_idx, char_u *callback, int id)
629 {
630 cbq_T *cbhead = &channels[ch_idx].ch_cb_head;
565 cbq_T *item = (cbq_T *)alloc((int)sizeof(cbq_T)); 631 cbq_T *item = (cbq_T *)alloc((int)sizeof(cbq_T));
566 632
567 if (item != NULL) 633 if (item != NULL)
568 { 634 {
569 item->callback = vim_strsave(callback); 635 item->callback = vim_strsave(callback);
574 item->prev->next = item; 640 item->prev->next = item;
575 } 641 }
576 } 642 }
577 643
578 /* 644 /*
579 * Invoke the "callback" on channel "idx". 645 * Invoke the "callback" on channel "ch_idx".
580 */ 646 */
581 static void 647 static void
582 invoke_callback(int idx, char_u *callback, typval_T *argv) 648 invoke_callback(int ch_idx, char_u *callback, typval_T *argv)
583 { 649 {
584 typval_T rettv; 650 typval_T rettv;
585 int dummy; 651 int dummy;
586 652
587 argv[0].v_type = VAR_NUMBER; 653 argv[0].v_type = VAR_NUMBER;
588 argv[0].vval.v_number = idx; 654 argv[0].vval.v_number = ch_idx;
589 655
590 call_func(callback, (int)STRLEN(callback), 656 call_func(callback, (int)STRLEN(callback),
591 &rettv, 2, argv, 0L, 0L, &dummy, TRUE, NULL); 657 &rettv, 2, argv, 0L, 0L, &dummy, TRUE, NULL);
592 /* If an echo command was used the cursor needs to be put back where 658 /* If an echo command was used the cursor needs to be put back where
593 * it belongs. */ 659 * it belongs. */
600 * Return the first buffer from the channel and remove it. 666 * Return the first buffer from the channel and remove it.
601 * The caller must free it. 667 * The caller must free it.
602 * Returns NULL if there is nothing. 668 * Returns NULL if there is nothing.
603 */ 669 */
604 char_u * 670 char_u *
605 channel_get(int idx) 671 channel_get(int ch_idx)
606 { 672 {
607 readq_T *head = &channels[idx].ch_head; 673 readq_T *head = &channels[ch_idx].ch_head;
608 readq_T *node; 674 readq_T *node;
609 char_u *p; 675 char_u *p;
610 676
611 if (head->next == head || head->next == NULL) 677 if (head->next == head || head->next == NULL)
612 return NULL; 678 return NULL;
621 687
622 /* 688 /*
623 * Returns the whole buffer contents concatenated. 689 * Returns the whole buffer contents concatenated.
624 */ 690 */
625 static char_u * 691 static char_u *
626 channel_get_all(int idx) 692 channel_get_all(int ch_idx)
627 { 693 {
628 /* Concatenate everything into one buffer. 694 /* Concatenate everything into one buffer.
629 * TODO: avoid multiple allocations. */ 695 * TODO: avoid multiple allocations. */
630 while (channel_collapse(idx) == OK) 696 while (channel_collapse(ch_idx) == OK)
631 ; 697 ;
632 return channel_get(idx); 698 return channel_get(ch_idx);
633 } 699 }
634 700
635 /* 701 /*
636 * Collapses the first and second buffer in the channel "idx". 702 * Collapses the first and second buffer in the channel "ch_idx".
637 * Returns FAIL if that is not possible. 703 * Returns FAIL if that is not possible.
638 */ 704 */
639 int 705 int
640 channel_collapse(int idx) 706 channel_collapse(int ch_idx)
641 { 707 {
642 readq_T *head = &channels[idx].ch_head; 708 readq_T *head = &channels[ch_idx].ch_head;
643 readq_T *node = head->next; 709 readq_T *node = head->next;
644 char_u *p; 710 char_u *p;
645 711
646 if (node == head || node == NULL || node->next == head) 712 if (node == head || node == NULL || node->next == head)
647 return FAIL; 713 return FAIL;
797 } 863 }
798 return FAIL; 864 return FAIL;
799 } 865 }
800 866
801 /* 867 /*
802 * Execute a command received over channel "idx". 868 * Execute a command received over channel "ch_idx".
803 * "cmd" is the command string, "arg2" the second argument. 869 * "cmd" is the command string, "arg2" the second argument.
804 * "arg3" is the third argument, NULL if missing. 870 * "arg3" is the third argument, NULL if missing.
805 */ 871 */
806 static void 872 static void
807 channel_exe_cmd(int idx, char_u *cmd, typval_T *arg2, typval_T *arg3) 873 channel_exe_cmd(int ch_idx, char_u *cmd, typval_T *arg2, typval_T *arg3)
808 { 874 {
809 char_u *arg; 875 char_u *arg;
810 876
811 if (arg2->v_type != VAR_STRING) 877 if (arg2->v_type != VAR_STRING)
812 { 878 {
860 else 926 else
861 { 927 {
862 typval_T *tv; 928 typval_T *tv;
863 typval_T err_tv; 929 typval_T err_tv;
864 char_u *json = NULL; 930 char_u *json = NULL;
865 channel_T *channel = &channels[idx]; 931 channel_T *channel = &channels[ch_idx];
866 int options = channel->ch_mode == MODE_JS ? JSON_JS : 0; 932 int options = channel->ch_mode == MODE_JS ? JSON_JS : 0;
867 933
868 /* Don't pollute the display with errors. */ 934 /* Don't pollute the display with errors. */
869 ++emsg_skip; 935 ++emsg_skip;
870 tv = eval_expr(arg, NULL); 936 tv = eval_expr(arg, NULL);
883 json = json_encode_nr_expr(arg3->vval.v_number, tv, 949 json = json_encode_nr_expr(arg3->vval.v_number, tv,
884 options); 950 options);
885 } 951 }
886 if (json != NULL) 952 if (json != NULL)
887 { 953 {
888 channel_send(idx, json, "eval"); 954 channel_send(ch_idx, json, "eval");
889 vim_free(json); 955 vim_free(json);
890 } 956 }
891 } 957 }
892 --emsg_skip; 958 --emsg_skip;
893 if (tv != &err_tv) 959 if (tv != &err_tv)
897 else if (p_verbose > 2) 963 else if (p_verbose > 2)
898 EMSG2("E905: received unknown command: %s", cmd); 964 EMSG2("E905: received unknown command: %s", cmd);
899 } 965 }
900 966
901 /* 967 /*
902 * Invoke a callback for channel "idx" if needed. 968 * Invoke a callback for channel "ch_idx" if needed.
903 * Return OK when a message was handled, there might be another one. 969 * Return OK when a message was handled, there might be another one.
904 */ 970 */
905 static int 971 static int
906 may_invoke_callback(int idx) 972 may_invoke_callback(int ch_idx)
907 { 973 {
908 char_u *msg = NULL; 974 char_u *msg = NULL;
909 typval_T *listtv = NULL; 975 typval_T *listtv = NULL;
910 list_T *list; 976 list_T *list;
911 typval_T *typetv; 977 typval_T *typetv;
912 typval_T argv[3]; 978 typval_T argv[3];
913 int seq_nr = -1; 979 int seq_nr = -1;
914 channel_T *channel = &channels[idx]; 980 channel_T *channel = &channels[ch_idx];
915 ch_mode_T ch_mode = channel->ch_mode; 981 ch_mode_T ch_mode = channel->ch_mode;
916 982
917 if (channel->ch_close_cb != NULL) 983 if (channel->ch_close_cb != NULL)
918 /* this channel is handled elsewhere (netbeans) */ 984 /* this channel is handled elsewhere (netbeans) */
919 return FALSE; 985 return FALSE;
920 986
921 if (ch_mode != MODE_RAW) 987 if (ch_mode != MODE_RAW)
922 { 988 {
923 /* Get any json message in the queue. */ 989 /* Get any json message in the queue. */
924 if (channel_get_json(idx, -1, &listtv) == FAIL) 990 if (channel_get_json(ch_idx, -1, &listtv) == FAIL)
925 { 991 {
926 /* Parse readahead, return when there is still no message. */ 992 /* Parse readahead, return when there is still no message. */
927 channel_parse_json(idx); 993 channel_parse_json(ch_idx);
928 if (channel_get_json(idx, -1, &listtv) == FAIL) 994 if (channel_get_json(ch_idx, -1, &listtv) == FAIL)
929 return FALSE; 995 return FALSE;
930 } 996 }
931 997
932 list = listtv->vval.v_list; 998 list = listtv->vval.v_list;
933 argv[1] = list->lv_first->li_next->li_tv; 999 argv[1] = list->lv_first->li_next->li_tv;
938 char_u *cmd = typetv->vval.v_string; 1004 char_u *cmd = typetv->vval.v_string;
939 1005
940 /* ["cmd", arg] or ["cmd", arg, arg] */ 1006 /* ["cmd", arg] or ["cmd", arg, arg] */
941 if (list->lv_len == 3) 1007 if (list->lv_len == 3)
942 arg3 = &list->lv_last->li_tv; 1008 arg3 = &list->lv_last->li_tv;
943 channel_exe_cmd(idx, cmd, &argv[1], arg3); 1009 ch_logs(ch_idx, "Executing %s command", (char *)cmd);
1010 channel_exe_cmd(ch_idx, cmd, &argv[1], arg3);
944 clear_tv(listtv); 1011 clear_tv(listtv);
945 return TRUE; 1012 return TRUE;
946 } 1013 }
947 1014
948 if (typetv->v_type != VAR_NUMBER) 1015 if (typetv->v_type != VAR_NUMBER)
949 { 1016 {
950 /* TODO: give error */ 1017 ch_error(ch_idx,
1018 "Dropping message with invalid sequence number type\n");
951 clear_tv(listtv); 1019 clear_tv(listtv);
952 return FALSE; 1020 return FALSE;
953 } 1021 }
954 seq_nr = typetv->vval.v_number; 1022 seq_nr = typetv->vval.v_number;
955 } 1023 }
956 else if (channel_peek(idx) == NULL) 1024 else if (channel_peek(ch_idx) == NULL)
957 { 1025 {
958 /* nothing to read on raw channel */ 1026 /* nothing to read on raw channel */
959 return FALSE; 1027 return FALSE;
960 } 1028 }
961 else 1029 else
962 { 1030 {
1031 /* If there is no callback, don't do anything. */
1032 if (channel->ch_callback == NULL)
1033 return FALSE;
1034
963 /* For a raw channel we don't know where the message ends, just get 1035 /* For a raw channel we don't know where the message ends, just get
964 * everything. */ 1036 * everything. */
965 msg = channel_get_all(idx); 1037 msg = channel_get_all(ch_idx);
966 argv[1].v_type = VAR_STRING; 1038 argv[1].v_type = VAR_STRING;
967 argv[1].vval.v_string = msg; 1039 argv[1].vval.v_string = msg;
968 } 1040 }
969 1041
970 if (seq_nr > 0) 1042 if (seq_nr > 0)
971 { 1043 {
972 cbq_T *cbhead = &channel->ch_cb_head; 1044 cbq_T *cbhead = &channel->ch_cb_head;
973 cbq_T *cbitem = cbhead->next; 1045 cbq_T *cbitem = cbhead->next;
1046 int done = FALSE;
974 1047
975 /* invoke the one-time callback with the matching nr */ 1048 /* invoke the one-time callback with the matching nr */
976 while (cbitem != cbhead) 1049 while (cbitem != cbhead)
977 { 1050 {
978 if (cbitem->seq_nr == seq_nr) 1051 if (cbitem->seq_nr == seq_nr)
979 { 1052 {
980 invoke_callback(idx, cbitem->callback, argv); 1053 ch_log(ch_idx, "Invoking one-time callback\n");
1054 invoke_callback(ch_idx, cbitem->callback, argv);
981 remove_cb_node(cbitem); 1055 remove_cb_node(cbitem);
1056 done = TRUE;
982 break; 1057 break;
983 } 1058 }
984 cbitem = cbitem->next; 1059 cbitem = cbitem->next;
985 } 1060 }
1061 if (!done)
1062 ch_log(ch_idx, "Dropping message without callback\n");
986 } 1063 }
987 else if (channel->ch_callback != NULL) 1064 else if (channel->ch_callback != NULL)
988 { 1065 {
989 /* invoke the channel callback */ 1066 /* invoke the channel callback */
990 invoke_callback(idx, channel->ch_callback, argv); 1067 ch_log(ch_idx, "Invoking channel callback\n");
991 } 1068 invoke_callback(ch_idx, channel->ch_callback, argv);
992 /* else: drop the message TODO: give error */ 1069 }
1070 else
1071 ch_log(ch_idx, "Dropping message\n");
993 1072
994 if (listtv != NULL) 1073 if (listtv != NULL)
995 clear_tv(listtv); 1074 clear_tv(listtv);
996 vim_free(msg); 1075 vim_free(msg);
997 1076
998 return TRUE; 1077 return TRUE;
999 } 1078 }
1000 1079
1001 /* 1080 /*
1002 * Return TRUE when channel "idx" is open. 1081 * Return TRUE when channel "ch_idx" is open for writing to.
1003 * Also returns FALSE or invalid "idx". 1082 * Also returns FALSE or invalid "ch_idx".
1004 */ 1083 */
1005 int 1084 int
1006 channel_is_open(int idx) 1085 channel_can_write_to(int ch_idx)
1007 { 1086 {
1008 return idx >= 0 && idx < channel_count && channels[idx].ch_fd >= 0; 1087 return ch_idx >= 0 && ch_idx < channel_count
1009 } 1088 && (channels[ch_idx].ch_sock >= 0
1010 1089 #ifdef CHANNEL_PIPES
1011 /* 1090 || channels[ch_idx].ch_in >= 0
1012 * Close channel "idx". 1091 #endif
1092 );
1093 }
1094
1095 /*
1096 * Return TRUE when channel "ch_idx" is open for reading or writing.
1097 * Also returns FALSE or invalid "ch_idx".
1098 */
1099 int
1100 channel_is_open(int ch_idx)
1101 {
1102 return ch_idx >= 0 && ch_idx < channel_count
1103 && (channels[ch_idx].ch_sock >= 0
1104 #ifdef CHANNEL_PIPES
1105 || channels[ch_idx].ch_in >= 0
1106 || channels[ch_idx].ch_out >= 0
1107 || channels[ch_idx].ch_err >= 0
1108 #endif
1109 );
1110 }
1111
1112 /*
1113 * Close channel "ch_idx".
1013 * This does not trigger the close callback. 1114 * This does not trigger the close callback.
1014 */ 1115 */
1015 void 1116 void
1016 channel_close(int idx) 1117 channel_close(int ch_idx)
1017 { 1118 {
1018 channel_T *channel = &channels[idx]; 1119 channel_T *channel = &channels[ch_idx];
1019 jsonq_T *jhead; 1120 jsonq_T *jhead;
1020 cbq_T *cbhead; 1121 cbq_T *cbhead;
1021 1122
1022 if (channel->ch_fd >= 0) 1123 if (channel->ch_sock >= 0)
1023 { 1124 {
1024 sock_close(channel->ch_fd); 1125 sock_close(channel->ch_sock);
1025 channel->ch_fd = -1; 1126 channel->ch_sock = -1;
1026 channel->ch_close_cb = NULL; 1127 channel->ch_close_cb = NULL;
1027 #ifdef FEAT_GUI 1128 #ifdef FEAT_GUI
1028 channel_gui_unregister(idx); 1129 channel_gui_unregister(ch_idx);
1029 #endif 1130 #endif
1030 vim_free(channel->ch_callback); 1131 vim_free(channel->ch_callback);
1031 channel->ch_callback = NULL; 1132 channel->ch_callback = NULL;
1032 channel->ch_timeout = 2000; 1133 channel->ch_timeout = 2000;
1033 1134
1034 while (channel_peek(idx) != NULL) 1135 while (channel_peek(ch_idx) != NULL)
1035 vim_free(channel_get(idx)); 1136 vim_free(channel_get(ch_idx));
1036 1137
1037 cbhead = &channel->ch_cb_head; 1138 cbhead = &channel->ch_cb_head;
1038 while (cbhead->next != cbhead) 1139 while (cbhead->next != cbhead)
1039 remove_cb_node(cbhead->next); 1140 remove_cb_node(cbhead->next);
1040 1141
1043 { 1144 {
1044 clear_tv(jhead->next->value); 1145 clear_tv(jhead->next->value);
1045 remove_json_node(jhead->next); 1146 remove_json_node(jhead->next);
1046 } 1147 }
1047 } 1148 }
1048 } 1149 #if defined(CHANNEL_PIPES)
1049 1150 if (channel->ch_in >= 0)
1050 /* 1151 {
1051 * Store "buf[len]" on channel "idx". 1152 close(channel->ch_in);
1153 channel->ch_in = -1;
1154 }
1155 if (channel->ch_out >= 0)
1156 {
1157 close(channel->ch_out);
1158 channel->ch_out = -1;
1159 }
1160 if (channel->ch_err >= 0)
1161 {
1162 close(channel->ch_err);
1163 channel->ch_err = -1;
1164 }
1165 #endif
1166 }
1167
1168 /*
1169 * Store "buf[len]" on channel "ch_idx".
1052 * Returns OK or FAIL. 1170 * Returns OK or FAIL.
1053 */ 1171 */
1054 int 1172 int
1055 channel_save(int idx, char_u *buf, int len) 1173 channel_save(int ch_idx, char_u *buf, int len)
1056 { 1174 {
1057 readq_T *node; 1175 readq_T *node;
1058 readq_T *head = &channels[idx].ch_head; 1176 readq_T *head = &channels[ch_idx].ch_head;
1059 1177
1060 node = (readq_T *)alloc(sizeof(readq_T)); 1178 node = (readq_T *)alloc(sizeof(readq_T));
1061 if (node == NULL) 1179 if (node == NULL)
1062 return FAIL; /* out of memory */ 1180 return FAIL; /* out of memory */
1063 node->buffer = alloc(len + 1); 1181 node->buffer = alloc(len + 1);
1073 node->next = head; 1191 node->next = head;
1074 node->prev = head->prev; 1192 node->prev = head->prev;
1075 head->prev->next = node; 1193 head->prev->next = node;
1076 head->prev = node; 1194 head->prev = node;
1077 1195
1078 if (debugfd != NULL) 1196 if (log_fd != NULL)
1079 { 1197 {
1080 fprintf(debugfd, "RECV on %d: ", idx); 1198 ch_log_lead("RECV ", ch_idx);
1081 if (fwrite(buf, len, 1, debugfd) != 1) 1199 fprintf(log_fd, "'");
1200 if (fwrite(buf, len, 1, log_fd) != 1)
1082 return FAIL; 1201 return FAIL;
1083 fprintf(debugfd, "\n"); 1202 fprintf(log_fd, "'\n");
1084 } 1203 }
1085 return OK; 1204 return OK;
1086 } 1205 }
1087 1206
1088 /* 1207 /*
1089 * Return the first buffer from the channel without removing it. 1208 * Return the first buffer from the channel without removing it.
1090 * Returns NULL if there is nothing. 1209 * Returns NULL if there is nothing.
1091 */ 1210 */
1092 char_u * 1211 char_u *
1093 channel_peek(int idx) 1212 channel_peek(int ch_idx)
1094 { 1213 {
1095 readq_T *head = &channels[idx].ch_head; 1214 readq_T *head = &channels[ch_idx].ch_head;
1096 1215
1097 if (head->next == head || head->next == NULL) 1216 if (head->next == head || head->next == NULL)
1098 return NULL; 1217 return NULL;
1099 return head->next->buffer; 1218 return head->next->buffer;
1100 } 1219 }
1101 1220
1102 /* 1221 /*
1103 * Clear the read buffer on channel "idx". 1222 * Clear the read buffer on channel "ch_idx".
1104 */ 1223 */
1105 void 1224 void
1106 channel_clear(int idx) 1225 channel_clear(int ch_idx)
1107 { 1226 {
1108 readq_T *head = &channels[idx].ch_head; 1227 readq_T *head = &channels[ch_idx].ch_head;
1109 readq_T *node = head->next; 1228 readq_T *node = head->next;
1110 readq_T *next; 1229 readq_T *next;
1111 1230
1112 while (node != NULL && node != head) 1231 while (node != NULL && node != head)
1113 { 1232 {
1134 * Check for reading from "fd" with "timeout" msec. 1253 * Check for reading from "fd" with "timeout" msec.
1135 * Return FAIL when there is nothing to read. 1254 * Return FAIL when there is nothing to read.
1136 * Always returns OK for FEAT_GUI_W32. 1255 * Always returns OK for FEAT_GUI_W32.
1137 */ 1256 */
1138 static int 1257 static int
1139 channel_wait(int fd, int timeout) 1258 channel_wait(int ch_idx, int fd, int timeout)
1140 { 1259 {
1141 #if defined(HAVE_SELECT) && !defined(FEAT_GUI_W32) 1260 #if defined(HAVE_SELECT) && !defined(FEAT_GUI_W32)
1142 struct timeval tval; 1261 struct timeval tval;
1143 fd_set rfds; 1262 fd_set rfds;
1144 int ret; 1263 int ret;
1145 1264
1265 if (timeout > 0)
1266 ch_logn(ch_idx, "Waiting for %d msec\n", timeout);
1146 FD_ZERO(&rfds); 1267 FD_ZERO(&rfds);
1147 FD_SET(fd, &rfds); 1268 FD_SET(fd, &rfds);
1148 tval.tv_sec = timeout / 1000; 1269 tval.tv_sec = timeout / 1000;
1149 tval.tv_usec = (timeout % 1000) * 1000; 1270 tval.tv_usec = (timeout % 1000) * 1000;
1150 for (;;) 1271 for (;;)
1153 # ifdef EINTR 1274 # ifdef EINTR
1154 if (ret == -1 && errno == EINTR) 1275 if (ret == -1 && errno == EINTR)
1155 continue; 1276 continue;
1156 # endif 1277 # endif
1157 if (ret <= 0) 1278 if (ret <= 0)
1279 {
1280 ch_log(ch_idx, "Nothing to read\n");
1158 return FAIL; 1281 return FAIL;
1282 }
1159 break; 1283 break;
1160 } 1284 }
1161 #else 1285 #else
1162 # ifdef HAVE_POLL 1286 # ifdef HAVE_POLL
1163 struct pollfd fds; 1287 struct pollfd fds;
1164 1288
1289 if (timeout > 0)
1290 ch_logn(ch_idx, "Waiting for %d msec\n", timeout);
1165 fds.fd = fd; 1291 fds.fd = fd;
1166 fds.events = POLLIN; 1292 fds.events = POLLIN;
1167 if (poll(&fds, 1, timeout) <= 0) 1293 if (poll(&fds, 1, timeout) <= 0)
1294 {
1295 ch_log(ch_idx, "Nothing to read\n");
1168 return FAIL; 1296 return FAIL;
1297 }
1169 # endif 1298 # endif
1170 #endif 1299 #endif
1171 return OK; 1300 return OK;
1172 } 1301 }
1173 1302
1181 1310
1182 return next_id++; 1311 return next_id++;
1183 } 1312 }
1184 1313
1185 /* 1314 /*
1186 * Read from channel "idx" for as long as there is something to read. 1315 * Get the file descriptor to read from, either the socket or stdout.
1316 */
1317 static int
1318 get_read_fd(int ch_idx, int use_stderr)
1319 {
1320 channel_T *channel = &channels[ch_idx];
1321
1322 if (channel->ch_sock >= 0)
1323 return channel->ch_sock;
1324 #if defined(CHANNEL_PIPES)
1325 if (!use_stderr && channel->ch_out >= 0)
1326 return channel->ch_out;
1327 if (use_stderr && channel->ch_err >= 0)
1328 return channel->ch_err;
1329 #endif
1330 ch_error(ch_idx, "channel_read() called while socket is closed\n");
1331 return -1;
1332 }
1333
1334 /*
1335 * Read from channel "ch_idx" for as long as there is something to read.
1187 * The data is put in the read queue. 1336 * The data is put in the read queue.
1188 */ 1337 */
1189 void 1338 void
1190 channel_read(int idx) 1339 channel_read(int ch_idx, int use_stderr, char *func)
1191 { 1340 {
1341 channel_T *channel = &channels[ch_idx];
1192 static char_u *buf = NULL; 1342 static char_u *buf = NULL;
1193 int len = 0; 1343 int len = 0;
1194 int readlen = 0; 1344 int readlen = 0;
1195 channel_T *channel = &channels[idx]; 1345 int fd;
1196 1346 int use_socket = FALSE;
1197 if (channel->ch_fd < 0) 1347
1198 { 1348 fd = get_read_fd(ch_idx, use_stderr);
1199 CHLOG(idx, FALSE, "channel_read() called while socket is closed\n"); 1349 if (fd < 0)
1200 return; 1350 return;
1201 } 1351 use_socket = channel->ch_sock >= 0;
1202 1352
1203 /* Allocate a buffer to read into. */ 1353 /* Allocate a buffer to read into. */
1204 if (buf == NULL) 1354 if (buf == NULL)
1205 { 1355 {
1206 buf = alloc(MAXMSGSIZE); 1356 buf = alloc(MAXMSGSIZE);
1211 /* Keep on reading for as long as there is something to read. 1361 /* Keep on reading for as long as there is something to read.
1212 * Use select() or poll() to avoid blocking on a message that is exactly 1362 * Use select() or poll() to avoid blocking on a message that is exactly
1213 * MAXMSGSIZE long. */ 1363 * MAXMSGSIZE long. */
1214 for (;;) 1364 for (;;)
1215 { 1365 {
1216 if (channel_wait(channel->ch_fd, 0) == FAIL) 1366 if (channel_wait(ch_idx, fd, 0) == FAIL)
1217 break; 1367 break;
1218 len = sock_read(channel->ch_fd, buf, MAXMSGSIZE); 1368 if (use_socket)
1369 len = sock_read(fd, buf, MAXMSGSIZE);
1370 else
1371 len = read(fd, buf, MAXMSGSIZE);
1219 if (len <= 0) 1372 if (len <= 0)
1220 break; /* error or nothing more to read */ 1373 break; /* error or nothing more to read */
1221 1374
1222 /* Store the read message in the queue. */ 1375 /* Store the read message in the queue. */
1223 channel_save(idx, buf, len); 1376 channel_save(ch_idx, buf, len);
1224 readlen += len; 1377 readlen += len;
1225 if (len < MAXMSGSIZE) 1378 if (len < MAXMSGSIZE)
1226 break; /* did read everything that's available */ 1379 break; /* did read everything that's available */
1227 } 1380 }
1228 #ifdef FEAT_GUI_W32 1381 #ifdef FEAT_GUI_W32
1229 if (len == SOCKET_ERROR) 1382 if (use_socket && len == SOCKET_ERROR)
1230 { 1383 {
1231 /* For Win32 GUI channel_wait() always returns OK and we handle the 1384 /* For Win32 GUI channel_wait() always returns OK and we handle the
1232 * situation that there is nothing to read here. 1385 * situation that there is nothing to read here.
1233 * TODO: how about a timeout? */ 1386 * TODO: how about a timeout? */
1234 if (WSAGetLastError() == WSAEWOULDBLOCK) 1387 if (WSAGetLastError() == WSAEWOULDBLOCK)
1247 * -> autocmd triggered while processing the netbeans cmd 1400 * -> autocmd triggered while processing the netbeans cmd
1248 * -> ui_breakcheck 1401 * -> ui_breakcheck
1249 * -> gui event loop or select loop 1402 * -> gui event loop or select loop
1250 * -> channel_read() 1403 * -> channel_read()
1251 */ 1404 */
1252 channel_save(idx, (char_u *)DETACH_MSG, (int)STRLEN(DETACH_MSG)); 1405 ch_errors(ch_idx, "%s(): Cannot read\n", func);
1253 1406 channel_save(ch_idx, (char_u *)DETACH_MSG, (int)STRLEN(DETACH_MSG));
1254 channel_close(idx); 1407
1255 if (channel->ch_close_cb != NULL) 1408 if (use_socket)
1256 (*channel->ch_close_cb)(); 1409 {
1410 channel_close(ch_idx);
1411 if (channel->ch_close_cb != NULL)
1412 (*channel->ch_close_cb)();
1413 }
1414 #if defined(CHANNEL_PIPES)
1415 else
1416 {
1417 close(fd);
1418 channel->ch_out = -1;
1419 }
1420 #endif
1257 1421
1258 if (len < 0) 1422 if (len < 0)
1259 { 1423 {
1260 /* Todo: which channel? */ 1424 ch_error(ch_idx, "channel_read(): cannot read from channel\n");
1261 CHERROR("%s(): cannot from channel\n", "channel_read");
1262 PERROR(_("E896: read from channel")); 1425 PERROR(_("E896: read from channel"));
1263 } 1426 }
1264 } 1427 }
1265 1428
1266 #if defined(CH_HAS_GUI) && defined(FEAT_GUI_GTK) 1429 #if defined(CH_HAS_GUI) && defined(FEAT_GUI_GTK)
1430 /* signal the main loop that there is something to read */
1267 if (CH_HAS_GUI && gtk_main_level() > 0) 1431 if (CH_HAS_GUI && gtk_main_level() > 0)
1268 gtk_main_quit(); 1432 gtk_main_quit();
1269 #endif 1433 #endif
1270 } 1434 }
1271 1435
1272 /* 1436 /*
1273 * Read from raw channel "idx". Blocks until there is something to read or 1437 * Read from raw channel "ch_idx". Blocks until there is something to read or
1274 * the timeout expires. 1438 * the timeout expires.
1275 * Returns what was read in allocated memory. 1439 * Returns what was read in allocated memory.
1276 * Returns NULL in case of error or timeout. 1440 * Returns NULL in case of error or timeout.
1277 */ 1441 */
1278 char_u * 1442 char_u *
1279 channel_read_block(int idx) 1443 channel_read_block(int ch_idx)
1280 { 1444 {
1281 if (channel_peek(idx) == NULL) 1445 ch_log(ch_idx, "Reading raw\n");
1282 { 1446 if (channel_peek(ch_idx) == NULL)
1447 {
1448 int fd = get_read_fd(ch_idx, FALSE);
1449
1450 ch_log(ch_idx, "No readahead\n");
1283 /* Wait for up to the channel timeout. */ 1451 /* Wait for up to the channel timeout. */
1284 if (channel_wait(channels[idx].ch_fd, channels[idx].ch_timeout) == FAIL) 1452 if (fd < 0 || channel_wait(ch_idx, fd,
1453 channels[ch_idx].ch_timeout) == FAIL)
1285 return NULL; 1454 return NULL;
1286 channel_read(idx); 1455 channel_read(ch_idx, FALSE, "channel_read_block");
1287 } 1456 }
1288 1457
1289 return channel_get_all(idx); 1458 /* TODO: only get the first message */
1459 ch_log(ch_idx, "Returning readahead\n");
1460 return channel_get_all(ch_idx);
1290 } 1461 }
1291 1462
1292 /* 1463 /*
1293 * Read one JSON message from channel "ch_idx" with ID "id" and store the 1464 * Read one JSON message from channel "ch_idx" with ID "id" and store the
1294 * result in "rettv". 1465 * result in "rettv".
1297 int 1468 int
1298 channel_read_json_block(int ch_idx, int id, typval_T **rettv) 1469 channel_read_json_block(int ch_idx, int id, typval_T **rettv)
1299 { 1470 {
1300 int more; 1471 int more;
1301 channel_T *channel = &channels[ch_idx]; 1472 channel_T *channel = &channels[ch_idx];
1302 1473 int fd;
1474
1475 ch_log(ch_idx, "Reading JSON\n");
1303 channel->ch_block_id = id; 1476 channel->ch_block_id = id;
1304 for (;;) 1477 for (;;)
1305 { 1478 {
1306 more = channel_parse_json(ch_idx); 1479 more = channel_parse_json(ch_idx);
1307 1480
1318 * messages may have arrived. */ 1491 * messages may have arrived. */
1319 if (channel_parse_messages()) 1492 if (channel_parse_messages())
1320 continue; 1493 continue;
1321 1494
1322 /* Wait for up to the channel timeout. */ 1495 /* Wait for up to the channel timeout. */
1323 if (channel->ch_fd < 0 || channel_wait(channel->ch_fd, 1496 fd = get_read_fd(ch_idx, FALSE);
1324 channel->ch_timeout) == FAIL) 1497 if (fd < 0 || channel_wait(ch_idx, fd, channel->ch_timeout) == FAIL)
1325 break; 1498 break;
1326 channel_read(ch_idx); 1499 channel_read(ch_idx, FALSE, "channel_read_json_block");
1327 } 1500 }
1328 } 1501 }
1329 channel->ch_block_id = 0; 1502 channel->ch_block_id = 0;
1330 return FAIL; 1503 return FAIL;
1331 } 1504 }
1334 /* 1507 /*
1335 * Lookup the channel index from the socket. 1508 * Lookup the channel index from the socket.
1336 * Returns -1 when the socket isn't found. 1509 * Returns -1 when the socket isn't found.
1337 */ 1510 */
1338 int 1511 int
1339 channel_socket2idx(sock_T fd) 1512 channel_fd2idx(sock_T fd)
1340 { 1513 {
1341 int i; 1514 int i;
1342 1515
1343 if (fd >= 0) 1516 if (fd >= 0)
1344 for (i = 0; i < channel_count; ++i) 1517 for (i = 0; i < channel_count; ++i)
1345 if (channels[i].ch_fd == fd) 1518 if (channels[i].ch_sock == fd
1519 # if defined(CHANNEL_PIPES)
1520 || channels[i].ch_out == fd
1521 || channels[i].ch_err == fd
1522 # endif
1523 )
1346 return i; 1524 return i;
1347 return -1; 1525 return -1;
1348 } 1526 }
1349 # endif 1527 # endif
1350 1528
1351 /* 1529 /*
1352 * Write "buf" (NUL terminated string) to channel "idx". 1530 * Write "buf" (NUL terminated string) to channel "ch_idx".
1353 * When "fun" is not NULL an error message might be given. 1531 * When "fun" is not NULL an error message might be given.
1354 * Return FAIL or OK. 1532 * Return FAIL or OK.
1355 */ 1533 */
1356 int 1534 int
1357 channel_send(int idx, char_u *buf, char *fun) 1535 channel_send(int ch_idx, char_u *buf, char *fun)
1358 { 1536 {
1359 channel_T *channel = &channels[idx]; 1537 channel_T *channel = &channels[ch_idx];
1360 int len = (int)STRLEN(buf); 1538 int len = (int)STRLEN(buf);
1361 1539 int res;
1362 if (channel->ch_fd < 0) 1540 int fd;
1541 int use_socket = FALSE;
1542
1543 if (channel->ch_sock >= 0)
1544 {
1545 fd = channel->ch_sock;
1546 use_socket = TRUE;
1547 }
1548 #if defined(CHANNEL_PIPES)
1549 else if (channel->ch_in >= 0)
1550 fd = channel->ch_in;
1551 #endif
1552 if (fd < 0)
1363 { 1553 {
1364 if (!channel->ch_error && fun != NULL) 1554 if (!channel->ch_error && fun != NULL)
1365 { 1555 {
1366 CHERROR(" %s(): write while not connected\n", fun); 1556 ch_errors(ch_idx, "%s(): write while not connected\n", fun);
1367 EMSG2("E630: %s(): write while not connected", fun); 1557 EMSG2("E630: %s(): write while not connected", fun);
1368 } 1558 }
1369 channel->ch_error = TRUE; 1559 channel->ch_error = TRUE;
1370 return FAIL; 1560 return FAIL;
1371 } 1561 }
1372 1562
1373 if (sock_write(channel->ch_fd, buf, len) != len) 1563 if (log_fd != NULL)
1564 {
1565 ch_log_lead("SEND ", ch_idx);
1566 fprintf(log_fd, "'");
1567 ignored = fwrite(buf, len, 1, log_fd);
1568 fprintf(log_fd, "'\n");
1569 fflush(log_fd);
1570 }
1571
1572 if (use_socket)
1573 res = sock_write(fd, buf, len);
1574 else
1575 res = write(fd, buf, len);
1576 if (res != len)
1374 { 1577 {
1375 if (!channel->ch_error && fun != NULL) 1578 if (!channel->ch_error && fun != NULL)
1376 { 1579 {
1377 CHERROR(" %s(): write failed\n", fun); 1580 ch_errors(ch_idx, "%s(): write failed\n", fun);
1378 EMSG2("E631: %s(): write failed", fun); 1581 EMSG2("E631: %s(): write failed", fun);
1379 } 1582 }
1380 channel->ch_error = TRUE; 1583 channel->ch_error = TRUE;
1381 return FAIL; 1584 return FAIL;
1382 } 1585 }
1397 int nfd = nfd_in; 1600 int nfd = nfd_in;
1398 int i; 1601 int i;
1399 struct pollfd *fds = fds_in; 1602 struct pollfd *fds = fds_in;
1400 1603
1401 for (i = 0; i < channel_count; ++i) 1604 for (i = 0; i < channel_count; ++i)
1402 if (channels[i].ch_fd >= 0) 1605 {
1403 { 1606 if (channels[i].ch_sock >= 0)
1404 channels[i].ch_idx = nfd; 1607 {
1405 fds[nfd].fd = channels[i].ch_fd; 1608 channels[i].ch_sock_idx = nfd;
1609 fds[nfd].fd = channels[i].ch_sock;
1406 fds[nfd].events = POLLIN; 1610 fds[nfd].events = POLLIN;
1407 nfd++; 1611 nfd++;
1408 } 1612 }
1409 else 1613 else
1410 channels[i].ch_idx = -1; 1614 channels[i].ch_sock_idx = -1;
1615
1616 # ifdef CHANNEL_PIPES
1617 if (channels[i].ch_out >= 0)
1618 {
1619 channels[i].ch_out_idx = nfd;
1620 fds[nfd].fd = channels[i].ch_out;
1621 fds[nfd].events = POLLIN;
1622 nfd++;
1623 }
1624 else
1625 channels[i].ch_out_idx = -1;
1626
1627 if (channels[i].ch_err >= 0)
1628 {
1629 channels[i].ch_err_idx = nfd;
1630 fds[nfd].fd = channels[i].ch_err;
1631 fds[nfd].events = POLLIN;
1632 nfd++;
1633 }
1634 else
1635 channels[i].ch_err_idx = -1;
1636 # endif
1637 }
1411 1638
1412 return nfd; 1639 return nfd;
1413 } 1640 }
1414 1641
1415 /* 1642 /*
1421 int ret = ret_in; 1648 int ret = ret_in;
1422 int i; 1649 int i;
1423 struct pollfd *fds = fds_in; 1650 struct pollfd *fds = fds_in;
1424 1651
1425 for (i = 0; i < channel_count; ++i) 1652 for (i = 0; i < channel_count; ++i)
1426 if (ret > 0 && channels[i].ch_idx != -1 1653 {
1427 && fds[channels[i].ch_idx].revents & POLLIN) 1654 if (ret > 0 && channels[i].ch_sock_idx != -1
1428 { 1655 && fds[channels[i].ch_sock_idx].revents & POLLIN)
1429 channel_read(i); 1656 {
1657 channel_read(i, FALSE, "channel_poll_check");
1430 --ret; 1658 --ret;
1431 } 1659 }
1660 # ifdef CHANNEL_PIPES
1661 if (ret > 0 && channels[i].ch_out_idx != -1
1662 && fds[channels[i].ch_out_idx].revents & POLLIN)
1663 {
1664 channel_read(i, FALSE, "channel_poll_check");
1665 --ret;
1666 }
1667 if (ret > 0 && channels[i].ch_err_idx != -1
1668 && fds[channels[i].ch_err_idx].revents & POLLIN)
1669 {
1670 channel_read(i, TRUE, "channel_poll_check");
1671 --ret;
1672 }
1673 # endif
1674 }
1432 1675
1433 return ret; 1676 return ret;
1434 } 1677 }
1435 # endif /* UNIX && !HAVE_SELECT */ 1678 # endif /* UNIX && !HAVE_SELECT */
1436 1679
1444 int maxfd = maxfd_in; 1687 int maxfd = maxfd_in;
1445 int i; 1688 int i;
1446 fd_set *rfds = rfds_in; 1689 fd_set *rfds = rfds_in;
1447 1690
1448 for (i = 0; i < channel_count; ++i) 1691 for (i = 0; i < channel_count; ++i)
1449 if (channels[i].ch_fd >= 0) 1692 {
1450 { 1693 if (channels[i].ch_sock >= 0)
1451 FD_SET(channels[i].ch_fd, rfds); 1694 {
1452 if (maxfd < channels[i].ch_fd) 1695 FD_SET(channels[i].ch_sock, rfds);
1453 maxfd = channels[i].ch_fd; 1696 if (maxfd < channels[i].ch_sock)
1454 } 1697 maxfd = channels[i].ch_sock;
1698 }
1699 # ifdef CHANNEL_PIPES
1700 if (channels[i].ch_out >= 0)
1701 {
1702 FD_SET(channels[i].ch_out, rfds);
1703 if (maxfd < channels[i].ch_out)
1704 maxfd = channels[i].ch_out;
1705 }
1706 if (channels[i].ch_err >= 0)
1707 {
1708 FD_SET(channels[i].ch_err, rfds);
1709 if (maxfd < channels[i].ch_err)
1710 maxfd = channels[i].ch_err;
1711 }
1712 # endif
1713 }
1455 1714
1456 return maxfd; 1715 return maxfd;
1457 } 1716 }
1458 1717
1459 /* 1718 /*
1465 int ret = ret_in; 1724 int ret = ret_in;
1466 int i; 1725 int i;
1467 fd_set *rfds = rfds_in; 1726 fd_set *rfds = rfds_in;
1468 1727
1469 for (i = 0; i < channel_count; ++i) 1728 for (i = 0; i < channel_count; ++i)
1470 if (ret > 0 && channels[i].ch_fd >= 0 1729 {
1471 && FD_ISSET(channels[i].ch_fd, rfds)) 1730 if (ret > 0 && channels[i].ch_sock >= 0
1472 { 1731 && FD_ISSET(channels[i].ch_sock, rfds))
1473 channel_read(i); 1732 {
1733 channel_read(i, FALSE, "channel_select_check");
1474 --ret; 1734 --ret;
1475 } 1735 }
1736 # ifdef CHANNEL_PIPES
1737 if (ret > 0 && channels[i].ch_out >= 0
1738 && FD_ISSET(channels[i].ch_out, rfds))
1739 {
1740 channel_read(i, FALSE, "channel_select_check");
1741 --ret;
1742 }
1743 if (ret > 0 && channels[i].ch_err >= 0
1744 && FD_ISSET(channels[i].ch_err, rfds))
1745 {
1746 channel_read(i, TRUE, "channel_select_check");
1747 --ret;
1748 }
1749 # endif
1750 }
1476 1751
1477 return ret; 1752 return ret;
1478 } 1753 }
1479 # endif /* !FEAT_GUI_W32 && HAVE_SELECT */ 1754 # endif /* !FEAT_GUI_W32 && HAVE_SELECT */
1480 1755
1526 } 1801 }
1527 return abort; 1802 return abort;
1528 } 1803 }
1529 1804
1530 /* 1805 /*
1531 * Return the mode of channel "idx". 1806 * Return the mode of channel "ch_idx".
1532 * If "idx" is invalid returns MODE_JSON. 1807 * If "ch_idx" is invalid returns MODE_JSON.
1533 */ 1808 */
1534 ch_mode_T 1809 ch_mode_T
1535 channel_get_mode(int idx) 1810 channel_get_mode(int ch_idx)
1536 { 1811 {
1537 if (idx < 0 || idx >= channel_count) 1812 if (ch_idx < 0 || ch_idx >= channel_count)
1538 return MODE_JSON; 1813 return MODE_JSON;
1539 return channels[idx].ch_mode; 1814 return channels[ch_idx].ch_mode;
1540 } 1815 }
1541 1816
1542 #endif /* FEAT_CHANNEL */ 1817 #endif /* FEAT_CHANNEL */