comparison src/channel.c @ 8151:aa845d10c6fb v7.4.1369

commit https://github.com/vim/vim/commit/42d38a2db17e70312d073095257555c27a5f9443 Author: Bram Moolenaar <Bram@vim.org> Date: Sat Feb 20 18:18:59 2016 +0100 patch 7.4.1369 Problem: Channels don't have a queue for stderr. Solution: Have a queue for each part of the channel.
author Christian Brabandt <cb@256bit.org>
date Sat, 20 Feb 2016 18:30:04 +0100
parents 8abef552ece7
children 6ee6fb27dcea
comparison
equal deleted inserted replaced
8150:85b476dac933 8151:aa845d10c6fb
272 * Returns NULL if out of memory. 272 * Returns NULL if out of memory.
273 */ 273 */
274 channel_T * 274 channel_T *
275 add_channel(void) 275 add_channel(void)
276 { 276 {
277 int which; 277 int part;
278 channel_T *channel = (channel_T *)alloc_clear((int)sizeof(channel_T)); 278 channel_T *channel = (channel_T *)alloc_clear((int)sizeof(channel_T));
279 279
280 if (channel == NULL) 280 if (channel == NULL)
281 return NULL; 281 return NULL;
282 282
283 channel->ch_id = next_ch_id++; 283 channel->ch_id = next_ch_id++;
284 ch_log(channel, "Created channel"); 284 ch_log(channel, "Created channel");
285 285
286 #ifdef CHANNEL_PIPES 286 #ifdef CHANNEL_PIPES
287 for (which = CHAN_SOCK; which <= CHAN_IN; ++which) 287 for (part = PART_SOCK; part <= PART_IN; ++part)
288 #else 288 #else
289 which = CHAN_SOCK; 289 part = PART_SOCK;
290 #endif 290 #endif
291 { 291 {
292 channel->ch_pfd[which].ch_fd = CHAN_FD_INVALID; 292 channel->ch_part[part].ch_fd = INVALID_FD;
293 #ifdef FEAT_GUI_X11 293 #ifdef FEAT_GUI_X11
294 channel->ch_pfd[which].ch_inputHandler = (XtInputId)NULL; 294 channel->ch_part[part].ch_inputHandler = (XtInputId)NULL;
295 #endif 295 #endif
296 #ifdef FEAT_GUI_GTK 296 #ifdef FEAT_GUI_GTK
297 channel->ch_pfd[which].ch_inputHandler = 0; 297 channel->ch_part[part].ch_inputHandler = 0;
298 #endif 298 #endif
299 #ifdef FEAT_GUI_W32 299 #ifdef FEAT_GUI_W32
300 channel->ch_pfd[which].ch_inputHandler = -1; 300 channel->ch_part[part].ch_inputHandler = -1;
301 #endif 301 #endif
302 } 302 channel->ch_part[part].ch_timeout = 2000;
303 303 }
304 channel->ch_timeout = 2000;
305 304
306 if (first_channel != NULL) 305 if (first_channel != NULL)
307 { 306 {
308 first_channel->ch_prev = channel; 307 first_channel->ch_prev = channel;
309 channel->ch_next = first_channel; 308 channel->ch_next = first_channel;
347 346
348 #if defined(FEAT_GUI_X11) || defined(FEAT_GUI_GTK) 347 #if defined(FEAT_GUI_X11) || defined(FEAT_GUI_GTK)
349 static void 348 static void
350 channel_read_netbeans(int id) 349 channel_read_netbeans(int id)
351 { 350 {
352 channel_T *channel = channel_from_id(id); 351 channel_T *channel = channel_from_id(id);
352 int part;
353 353
354 if (channel == NULL) 354 if (channel == NULL)
355 ch_errorn(NULL, "Channel %d not found", id); 355 ch_errorn(NULL, "Channel %d not found", id);
356 else 356 else
357 channel_read(channel, -1, "messageFromNetbeans"); 357 {
358 /* TODO: check stderr */
359 if (channel->CH_SOCK_FD != INVALID_FD)
360 part = PART_SOCK;
361 else
362 part = PART_OUT;
363 channel_read(channel, part, "messageFromNetbeans");
364 }
358 } 365 }
359 #endif 366 #endif
360 367
361 /* 368 /*
362 * Read a command from netbeans. 369 * Read a command from netbeans.
370 * TODO: instead of channel ID use the FD.
363 */ 371 */
364 #ifdef FEAT_GUI_X11 372 #ifdef FEAT_GUI_X11
365 static void 373 static void
366 messageFromNetbeans(XtPointer clientData, 374 messageFromNetbeans(XtPointer clientData,
367 int *unused1 UNUSED, 375 int *unused1 UNUSED,
380 channel_read_netbeans((int)(long)clientData); 388 channel_read_netbeans((int)(long)clientData);
381 } 389 }
382 #endif 390 #endif
383 391
384 static void 392 static void
385 channel_gui_register_one(channel_T *channel, int which) 393 channel_gui_register_one(channel_T *channel, int part)
386 { 394 {
387 # ifdef FEAT_GUI_X11 395 # ifdef FEAT_GUI_X11
388 /* Tell notifier we are interested in being called 396 /* Tell notifier we are interested in being called
389 * when there is input on the editor connection socket. */ 397 * when there is input on the editor connection socket. */
390 if (channel->ch_pfd[which].ch_inputHandler == (XtInputId)NULL) 398 if (channel->ch_part[part].ch_inputHandler == (XtInputId)NULL)
391 channel->ch_pfd[which].ch_inputHandler = XtAppAddInput( 399 channel->ch_part[part].ch_inputHandler = XtAppAddInput(
392 (XtAppContext)app_context, 400 (XtAppContext)app_context,
393 channel->ch_pfd[which].ch_fd, 401 channel->ch_part[part].ch_fd,
394 (XtPointer)(XtInputReadMask + XtInputExceptMask), 402 (XtPointer)(XtInputReadMask + XtInputExceptMask),
395 messageFromNetbeans, 403 messageFromNetbeans,
396 (XtPointer)(long)channel->ch_id); 404 (XtPointer)(long)channel->ch_id);
397 # else 405 # else
398 # ifdef FEAT_GUI_GTK 406 # ifdef FEAT_GUI_GTK
399 /* Tell gdk we are interested in being called when there 407 /* Tell gdk we are interested in being called when there
400 * is input on the editor connection socket. */ 408 * is input on the editor connection socket. */
401 if (channel->ch_pfd[which].ch_inputHandler == 0) 409 if (channel->ch_part[part].ch_inputHandler == 0)
402 channel->ch_pfd[which].ch_inputHandler = gdk_input_add( 410 channel->ch_part[part].ch_inputHandler = gdk_input_add(
403 (gint)channel->ch_pfd[which].ch_fd, 411 (gint)channel->ch_part[part].ch_fd,
404 (GdkInputCondition) 412 (GdkInputCondition)
405 ((int)GDK_INPUT_READ + (int)GDK_INPUT_EXCEPTION), 413 ((int)GDK_INPUT_READ + (int)GDK_INPUT_EXCEPTION),
406 messageFromNetbeans, 414 messageFromNetbeans,
407 (gpointer)(long)channel->ch_id); 415 (gpointer)(long)channel->ch_id);
408 # else 416 # else
409 # ifdef FEAT_GUI_W32 417 # ifdef FEAT_GUI_W32
410 /* Tell Windows we are interested in receiving message when there 418 /* Tell Windows we are interested in receiving message when there
411 * is input on the editor connection socket. */ 419 * is input on the editor connection socket. */
412 if (channel->ch_pfd[which].ch_inputHandler == -1) 420 if (channel->ch_part[part].ch_inputHandler == -1)
413 channel->ch_pfd[which].ch_inputHandler = WSAAsyncSelect( 421 channel->ch_part[part].ch_inputHandler = WSAAsyncSelect(
414 channel->ch_pfd[which].ch_fd, 422 channel->ch_part[part].ch_fd,
415 s_hwnd, WM_NETBEANS, FD_READ); 423 s_hwnd, WM_NETBEANS, FD_READ);
416 # endif 424 # endif
417 # endif 425 # endif
418 # endif 426 # endif
419 } 427 }
422 channel_gui_register(channel_T *channel) 430 channel_gui_register(channel_T *channel)
423 { 431 {
424 if (!CH_HAS_GUI) 432 if (!CH_HAS_GUI)
425 return; 433 return;
426 434
427 if (channel->CH_SOCK != CHAN_FD_INVALID) 435 if (channel->CH_SOCK_FD != INVALID_FD)
428 channel_gui_register_one(channel, CHAN_SOCK); 436 channel_gui_register_one(channel, PART_SOCK);
429 # ifdef CHANNEL_PIPES 437 # ifdef CHANNEL_PIPES
430 if (channel->CH_OUT != CHAN_FD_INVALID) 438 if (channel->CH_OUT_FD != INVALID_FD)
431 channel_gui_register_one(channel, CHAN_OUT); 439 channel_gui_register_one(channel, PART_OUT);
432 if (channel->CH_ERR != CHAN_FD_INVALID) 440 if (channel->CH_ERR_FD != INVALID_FD)
433 channel_gui_register_one(channel, CHAN_ERR); 441 channel_gui_register_one(channel, PART_ERR);
434 # endif 442 # endif
435 } 443 }
436 444
437 /* 445 /*
438 * Register any of our file descriptors with the GUI event handling system. 446 * Register any of our file descriptors with the GUI event handling system.
448 } 456 }
449 457
450 static void 458 static void
451 channel_gui_unregister(channel_T *channel) 459 channel_gui_unregister(channel_T *channel)
452 { 460 {
453 int which; 461 int part;
454 462
455 #ifdef CHANNEL_PIPES 463 #ifdef CHANNEL_PIPES
456 for (which = CHAN_SOCK; which < CHAN_IN; ++which) 464 for (part = PART_SOCK; part < PART_IN; ++part)
457 #else 465 #else
458 which = CHAN_SOCK; 466 part = PART_SOCK;
459 #endif 467 #endif
460 { 468 {
461 # ifdef FEAT_GUI_X11 469 # ifdef FEAT_GUI_X11
462 if (channel->ch_pfd[which].ch_inputHandler != (XtInputId)NULL) 470 if (channel->ch_part[part].ch_inputHandler != (XtInputId)NULL)
463 { 471 {
464 XtRemoveInput(channel->ch_pfd[which].ch_inputHandler); 472 XtRemoveInput(channel->ch_part[part].ch_inputHandler);
465 channel->ch_pfd[which].ch_inputHandler = (XtInputId)NULL; 473 channel->ch_part[part].ch_inputHandler = (XtInputId)NULL;
466 } 474 }
467 # else 475 # else
468 # ifdef FEAT_GUI_GTK 476 # ifdef FEAT_GUI_GTK
469 if (channel->ch_pfd[which].ch_inputHandler != 0) 477 if (channel->ch_part[part].ch_inputHandler != 0)
470 { 478 {
471 gdk_input_remove(channel->ch_pfd[which].ch_inputHandler); 479 gdk_input_remove(channel->ch_part[part].ch_inputHandler);
472 channel->ch_pfd[which].ch_inputHandler = 0; 480 channel->ch_part[part].ch_inputHandler = 0;
473 } 481 }
474 # else 482 # else
475 # ifdef FEAT_GUI_W32 483 # ifdef FEAT_GUI_W32
476 if (channel->ch_pfd[which].ch_inputHandler == 0) 484 if (channel->ch_part[part].ch_inputHandler == 0)
477 { 485 {
478 WSAAsyncSelect(channel->ch_pfd[which].ch_fd, s_hwnd, 0, 0); 486 WSAAsyncSelect(channel->ch_part[part].ch_fd, s_hwnd, 0, 0);
479 channel->ch_pfd[which].ch_inputHandler = -1; 487 channel->ch_part[part].ch_inputHandler = -1;
480 } 488 }
481 # endif 489 # endif
482 # endif 490 # endif
483 # endif 491 # endif
484 } 492 }
719 #else 727 #else
720 (void)fcntl(sd, F_SETFL, 0); 728 (void)fcntl(sd, F_SETFL, 0);
721 #endif 729 #endif
722 } 730 }
723 731
724 channel->CH_SOCK = (sock_T)sd; 732 channel->CH_SOCK_FD = (sock_T)sd;
725 channel->ch_close_cb = close_cb; 733 channel->ch_close_cb = close_cb;
726 734
727 #ifdef FEAT_GUI 735 #ifdef FEAT_GUI
728 channel_gui_register(channel); 736 channel_gui_register(channel);
729 #endif 737 #endif
733 741
734 #if defined(CHANNEL_PIPES) || defined(PROTO) 742 #if defined(CHANNEL_PIPES) || defined(PROTO)
735 void 743 void
736 channel_set_pipes(channel_T *channel, sock_T in, sock_T out, sock_T err) 744 channel_set_pipes(channel_T *channel, sock_T in, sock_T out, sock_T err)
737 { 745 {
738 channel->CH_IN = in; 746 channel->CH_IN_FD = in;
739 channel->CH_OUT = out; 747 channel->CH_OUT_FD = out;
740 channel->CH_ERR = err; 748 channel->CH_ERR_FD = err;
741 } 749 }
742 #endif 750 #endif
743 751
744 void 752 void
745 channel_set_job(channel_T *channel, job_T *job) 753 channel_set_job(channel_T *channel, job_T *job)
751 * Set various properties from an "options" argument. 759 * Set various properties from an "options" argument.
752 */ 760 */
753 void 761 void
754 channel_set_options(channel_T *channel, jobopt_T *options) 762 channel_set_options(channel_T *channel, jobopt_T *options)
755 { 763 {
764 int part;
765
756 if (options->jo_set & JO_MODE) 766 if (options->jo_set & JO_MODE)
757 channel->ch_mode = options->jo_mode; 767 for (part = PART_SOCK; part <= PART_IN; ++part)
768 channel->ch_part[part].ch_mode = options->jo_mode;
758 if (options->jo_set & JO_TIMEOUT) 769 if (options->jo_set & JO_TIMEOUT)
759 channel->ch_timeout = options->jo_timeout; 770 for (part = PART_SOCK; part <= PART_IN; ++part)
771 channel->ch_part[part].ch_timeout = options->jo_timeout;
760 772
761 if (options->jo_set & JO_CALLBACK) 773 if (options->jo_set & JO_CALLBACK)
762 { 774 {
763 vim_free(channel->ch_callback); 775 vim_free(channel->ch_callback);
764 if (options->jo_callback != NULL && *options->jo_callback != NUL) 776 if (options->jo_callback != NULL && *options->jo_callback != NUL)
767 channel->ch_callback = NULL; 779 channel->ch_callback = NULL;
768 } 780 }
769 } 781 }
770 782
771 /* 783 /*
772 * Set the callback for channel "channel" for the response with "id". 784 * Set the callback for "channel"/"part" for the response with "id".
773 */ 785 */
774 void 786 void
775 channel_set_req_callback(channel_T *channel, char_u *callback, int id) 787 channel_set_req_callback(
776 { 788 channel_T *channel,
777 cbq_T *head = &channel->ch_cb_head; 789 int part,
790 char_u *callback,
791 int id)
792 {
793 cbq_T *head = &channel->ch_part[part].ch_cb_head;
778 cbq_T *item = (cbq_T *)alloc((int)sizeof(cbq_T)); 794 cbq_T *item = (cbq_T *)alloc((int)sizeof(cbq_T));
779 795
780 if (item != NULL) 796 if (item != NULL)
781 { 797 {
782 item->cq_callback = vim_strsave(callback); 798 item->cq_callback = vim_strsave(callback);
811 cursor_on(); 827 cursor_on();
812 out_flush(); 828 out_flush();
813 } 829 }
814 830
815 /* 831 /*
816 * Return the first buffer from the channel and remove it. 832 * Return the first buffer from channel "channel"/"part" and remove it.
817 * The caller must free it. 833 * The caller must free it.
818 * Returns NULL if there is nothing. 834 * Returns NULL if there is nothing.
819 */ 835 */
820 char_u * 836 char_u *
821 channel_get(channel_T *channel) 837 channel_get(channel_T *channel, int part)
822 { 838 {
823 readq_T *head = &channel->ch_head; 839 readq_T *head = &channel->ch_part[part].ch_head;
824 readq_T *node = head->rq_next; 840 readq_T *node = head->rq_next;
825 char_u *p; 841 char_u *p;
826 842
827 if (node == NULL) 843 if (node == NULL)
828 return NULL; 844 return NULL;
836 vim_free(node); 852 vim_free(node);
837 return p; 853 return p;
838 } 854 }
839 855
840 /* 856 /*
841 * Returns the whole buffer contents concatenated. 857 * Returns the whole buffer contents concatenated for "channel"/"part".
842 */ 858 */
843 static char_u * 859 static char_u *
844 channel_get_all(channel_T *channel) 860 channel_get_all(channel_T *channel, int part)
845 { 861 {
846 /* Concatenate everything into one buffer. 862 /* Concatenate everything into one buffer.
847 * TODO: avoid multiple allocations. */ 863 * TODO: avoid multiple allocations. */
848 while (channel_collapse(channel) == OK) 864 while (channel_collapse(channel, part) == OK)
849 ; 865 ;
850 return channel_get(channel); 866 return channel_get(channel, part);
851 } 867 }
852 868
853 /* 869 /*
854 * Collapses the first and second buffer in the channel "channel". 870 * Collapses the first and second buffer for "channel"/"part".
855 * Returns FAIL if that is not possible. 871 * Returns FAIL if that is not possible.
856 */ 872 */
857 int 873 int
858 channel_collapse(channel_T *channel) 874 channel_collapse(channel_T *channel, int part)
859 { 875 {
860 readq_T *head = &channel->ch_head; 876 readq_T *head = &channel->ch_part[part].ch_head;
861 readq_T *node = head->rq_next; 877 readq_T *node = head->rq_next;
862 char_u *p; 878 char_u *p;
863 879
864 if (node == NULL || node->rq_next == NULL) 880 if (node == NULL || node->rq_next == NULL)
865 return FAIL; 881 return FAIL;
880 vim_free(node); 896 vim_free(node);
881 return OK; 897 return OK;
882 } 898 }
883 899
884 /* 900 /*
885 * Use the read buffer of channel "channel" and parse a JSON messages that is 901 * Store "buf[len]" on "channel"/"part".
902 * Returns OK or FAIL.
903 */
904 static int
905 channel_save(channel_T *channel, int part, char_u *buf, int len)
906 {
907 readq_T *node;
908 readq_T *head = &channel->ch_part[part].ch_head;
909 char_u *p;
910 int i;
911
912 node = (readq_T *)alloc(sizeof(readq_T));
913 if (node == NULL)
914 return FAIL; /* out of memory */
915 node->rq_buffer = alloc(len + 1);
916 if (node->rq_buffer == NULL)
917 {
918 vim_free(node);
919 return FAIL; /* out of memory */
920 }
921
922 if (channel->ch_part[part].ch_mode == MODE_NL)
923 {
924 /* Drop any CR before a NL. */
925 p = node->rq_buffer;
926 for (i = 0; i < len; ++i)
927 if (buf[i] != CAR || i + 1 >= len || buf[i + 1] != NL)
928 *p++ = buf[i];
929 *p = NUL;
930 }
931 else
932 {
933 mch_memmove(node->rq_buffer, buf, len);
934 node->rq_buffer[len] = NUL;
935 }
936
937 /* append node to the tail of the queue */
938 node->rq_next = NULL;
939 node->rq_prev = head->rq_prev;
940 if (head->rq_prev == NULL)
941 head->rq_next = node;
942 else
943 head->rq_prev->rq_next = node;
944 head->rq_prev = node;
945
946 if (log_fd != NULL)
947 {
948 ch_log_lead("RECV ", channel);
949 fprintf(log_fd, "'");
950 if (fwrite(buf, len, 1, log_fd) != 1)
951 return FAIL;
952 fprintf(log_fd, "'\n");
953 }
954 return OK;
955 }
956
957 /*
958 * Use the read buffer of "channel"/"part" and parse a JSON messages that is
886 * complete. The messages are added to the queue. 959 * complete. The messages are added to the queue.
887 * Return TRUE if there is more to read. 960 * Return TRUE if there is more to read.
888 */ 961 */
889 static int 962 static int
890 channel_parse_json(channel_T *channel) 963 channel_parse_json(channel_T *channel, int part)
891 { 964 {
892 js_read_T reader; 965 js_read_T reader;
893 typval_T listtv; 966 typval_T listtv;
894 jsonq_T *item; 967 jsonq_T *item;
895 jsonq_T *head = &channel->ch_json_head; 968 jsonq_T *head = &channel->ch_part[part].ch_json_head;
896 int ret; 969 int ret;
897 970
898 if (channel_peek(channel) == NULL) 971 if (channel_peek(channel, part) == NULL)
899 return FALSE; 972 return FALSE;
900 973
901 /* TODO: make reader work properly */ 974 /* TODO: make reader work properly */
902 /* reader.js_buf = channel_peek(channel); */ 975 /* reader.js_buf = channel_peek(channel, part); */
903 reader.js_buf = channel_get_all(channel); 976 reader.js_buf = channel_get_all(channel, part);
904 reader.js_used = 0; 977 reader.js_used = 0;
905 reader.js_fill = NULL; 978 reader.js_fill = NULL;
906 /* reader.js_fill = channel_fill; */ 979 /* reader.js_fill = channel_fill; */
907 reader.js_cookie = channel; 980 reader.js_cookie = channel;
908 ret = json_decode(&reader, &listtv, 981 ret = json_decode(&reader, &listtv,
909 channel->ch_mode == MODE_JS ? JSON_JS : 0); 982 channel->ch_part[part].ch_mode == MODE_JS ? JSON_JS : 0);
910 if (ret == OK) 983 if (ret == OK)
911 { 984 {
912 /* Only accept the response when it is a list with at least two 985 /* Only accept the response when it is a list with at least two
913 * items. */ 986 * items. */
914 if (listtv.v_type != VAR_LIST || listtv.vval.v_list->lv_len < 2) 987 if (listtv.v_type != VAR_LIST || listtv.vval.v_list->lv_len < 2)
946 1019
947 /* Put the unread part back into the channel. 1020 /* Put the unread part back into the channel.
948 * TODO: insert in front */ 1021 * TODO: insert in front */
949 if (reader.js_buf[reader.js_used] != NUL) 1022 if (reader.js_buf[reader.js_used] != NUL)
950 { 1023 {
951 channel_save(channel, reader.js_buf + reader.js_used, 1024 channel_save(channel, part, reader.js_buf + reader.js_used,
952 (int)(reader.js_end - reader.js_buf) - reader.js_used); 1025 (int)(reader.js_end - reader.js_buf) - reader.js_used);
953 ret = TRUE; 1026 ret = TRUE;
954 } 1027 }
955 else 1028 else
956 ret = FALSE; 1029 ret = FALSE;
1000 * with id ch_block_id. 1073 * with id ch_block_id.
1001 * Return OK when found and return the value in "rettv". 1074 * Return OK when found and return the value in "rettv".
1002 * Return FAIL otherwise. 1075 * Return FAIL otherwise.
1003 */ 1076 */
1004 static int 1077 static int
1005 channel_get_json(channel_T *channel, int id, typval_T **rettv) 1078 channel_get_json(channel_T *channel, int part, int id, typval_T **rettv)
1006 { 1079 {
1007 jsonq_T *head = &channel->ch_json_head; 1080 jsonq_T *head = &channel->ch_part[part].ch_json_head;
1008 jsonq_T *item = head->jq_next; 1081 jsonq_T *item = head->jq_next;
1009 1082
1010 while (item != NULL) 1083 while (item != NULL)
1011 { 1084 {
1012 list_T *l = item->jq_value->vval.v_list; 1085 list_T *l = item->jq_value->vval.v_list;
1013 typval_T *tv = &l->lv_first->li_tv; 1086 typval_T *tv = &l->lv_first->li_tv;
1014 1087
1015 if ((id > 0 && tv->v_type == VAR_NUMBER && tv->vval.v_number == id) 1088 if ((id > 0 && tv->v_type == VAR_NUMBER && tv->vval.v_number == id)
1016 || (id <= 0 && (tv->v_type != VAR_NUMBER 1089 || (id <= 0 && (tv->v_type != VAR_NUMBER
1017 || tv->vval.v_number == 0 1090 || tv->vval.v_number == 0
1018 || tv->vval.v_number != channel->ch_block_id))) 1091 || tv->vval.v_number != channel->ch_part[part].ch_block_id)))
1019 { 1092 {
1020 *rettv = item->jq_value; 1093 *rettv = item->jq_value;
1021 remove_json_node(head, item); 1094 remove_json_node(head, item);
1022 return OK; 1095 return OK;
1023 } 1096 }
1025 } 1098 }
1026 return FAIL; 1099 return FAIL;
1027 } 1100 }
1028 1101
1029 /* 1102 /*
1030 * Execute a command received over channel "channel". 1103 * Execute a command received over "channel"/"part"
1031 * "cmd" is the command string, "arg2" the second argument. 1104 * "cmd" is the command string, "arg2" the second argument.
1032 * "arg3" is the third argument, NULL if missing. 1105 * "arg3" is the third argument, NULL if missing.
1033 */ 1106 */
1034 static void 1107 static void
1035 channel_exe_cmd(channel_T *channel, char_u *cmd, typval_T *arg2, typval_T *arg3) 1108 channel_exe_cmd(
1109 channel_T *channel,
1110 int part,
1111 char_u *cmd,
1112 typval_T *arg2,
1113 typval_T *arg3)
1036 { 1114 {
1037 char_u *arg; 1115 char_u *arg;
1038 1116
1039 if (arg2->v_type != VAR_STRING) 1117 if (arg2->v_type != VAR_STRING)
1040 { 1118 {
1088 else 1166 else
1089 { 1167 {
1090 typval_T *tv; 1168 typval_T *tv;
1091 typval_T err_tv; 1169 typval_T err_tv;
1092 char_u *json = NULL; 1170 char_u *json = NULL;
1093 int options = channel->ch_mode == MODE_JS ? JSON_JS : 0; 1171 int options = channel->ch_part[part].ch_mode == MODE_JS
1172 ? JSON_JS : 0;
1094 1173
1095 /* Don't pollute the display with errors. */ 1174 /* Don't pollute the display with errors. */
1096 ++emsg_skip; 1175 ++emsg_skip;
1097 tv = eval_expr(arg, NULL); 1176 tv = eval_expr(arg, NULL);
1098 if (is_eval) 1177 if (is_eval)
1112 json = json_encode_nr_expr(arg3->vval.v_number, tv, 1191 json = json_encode_nr_expr(arg3->vval.v_number, tv,
1113 options); 1192 options);
1114 } 1193 }
1115 if (json != NULL) 1194 if (json != NULL)
1116 { 1195 {
1117 channel_send(channel, json, "eval"); 1196 channel_send(channel, part, json, "eval");
1118 vim_free(json); 1197 vim_free(json);
1119 } 1198 }
1120 } 1199 }
1121 --emsg_skip; 1200 --emsg_skip;
1122 if (tv != &err_tv) 1201 if (tv != &err_tv)
1126 else if (p_verbose > 2) 1205 else if (p_verbose > 2)
1127 EMSG2("E905: received unknown command: %s", cmd); 1206 EMSG2("E905: received unknown command: %s", cmd);
1128 } 1207 }
1129 1208
1130 /* 1209 /*
1131 * Invoke a callback for channel "channel" if needed. 1210 * Invoke a callback for "channel"/"part" if needed.
1132 * TODO: add "which" argument, read stderr.
1133 * Return TRUE when a message was handled, there might be another one. 1211 * Return TRUE when a message was handled, there might be another one.
1134 */ 1212 */
1135 static int 1213 static int
1136 may_invoke_callback(channel_T *channel) 1214 may_invoke_callback(channel_T *channel, int part)
1137 { 1215 {
1138 char_u *msg = NULL; 1216 char_u *msg = NULL;
1139 typval_T *listtv = NULL; 1217 typval_T *listtv = NULL;
1140 list_T *list; 1218 list_T *list;
1141 typval_T *typetv; 1219 typval_T *typetv;
1142 typval_T argv[3]; 1220 typval_T argv[3];
1143 int seq_nr = -1; 1221 int seq_nr = -1;
1144 ch_mode_T ch_mode = channel->ch_mode; 1222 ch_mode_T ch_mode = channel->ch_part[part].ch_mode;
1223 char_u *callback = NULL;
1145 1224
1146 if (channel->ch_close_cb != NULL) 1225 if (channel->ch_close_cb != NULL)
1147 /* this channel is handled elsewhere (netbeans) */ 1226 /* this channel is handled elsewhere (netbeans) */
1148 return FALSE; 1227 return FALSE;
1149 1228
1229 if (channel->ch_part[part].ch_callback != NULL)
1230 callback = channel->ch_part[part].ch_callback;
1231 else
1232 callback = channel->ch_callback;
1233
1150 if (ch_mode == MODE_JSON || ch_mode == MODE_JS) 1234 if (ch_mode == MODE_JSON || ch_mode == MODE_JS)
1151 { 1235 {
1152 /* Get any json message in the queue. */ 1236 /* Get any json message in the queue. */
1153 if (channel_get_json(channel, -1, &listtv) == FAIL) 1237 if (channel_get_json(channel, part, -1, &listtv) == FAIL)
1154 { 1238 {
1155 /* Parse readahead, return when there is still no message. */ 1239 /* Parse readahead, return when there is still no message. */
1156 channel_parse_json(channel); 1240 channel_parse_json(channel, part);
1157 if (channel_get_json(channel, -1, &listtv) == FAIL) 1241 if (channel_get_json(channel, part, -1, &listtv) == FAIL)
1158 return FALSE; 1242 return FALSE;
1159 } 1243 }
1160 1244
1161 list = listtv->vval.v_list; 1245 list = listtv->vval.v_list;
1162 argv[1] = list->lv_first->li_next->li_tv; 1246 argv[1] = list->lv_first->li_next->li_tv;
1168 1252
1169 /* ["cmd", arg] or ["cmd", arg, arg] */ 1253 /* ["cmd", arg] or ["cmd", arg, arg] */
1170 if (list->lv_len == 3) 1254 if (list->lv_len == 3)
1171 arg3 = &list->lv_last->li_tv; 1255 arg3 = &list->lv_last->li_tv;
1172 ch_logs(channel, "Executing %s command", (char *)cmd); 1256 ch_logs(channel, "Executing %s command", (char *)cmd);
1173 channel_exe_cmd(channel, cmd, &argv[1], arg3); 1257 channel_exe_cmd(channel, part, cmd, &argv[1], arg3);
1174 free_tv(listtv); 1258 free_tv(listtv);
1175 return TRUE; 1259 return TRUE;
1176 } 1260 }
1177 1261
1178 if (typetv->v_type != VAR_NUMBER) 1262 if (typetv->v_type != VAR_NUMBER)
1182 free_tv(listtv); 1266 free_tv(listtv);
1183 return FALSE; 1267 return FALSE;
1184 } 1268 }
1185 seq_nr = typetv->vval.v_number; 1269 seq_nr = typetv->vval.v_number;
1186 } 1270 }
1187 else if (channel_peek(channel) == NULL) 1271 else if (channel_peek(channel, part) == NULL)
1188 { 1272 {
1189 /* nothing to read on RAW or NL channel */ 1273 /* nothing to read on RAW or NL channel */
1190 return FALSE; 1274 return FALSE;
1191 } 1275 }
1192 else 1276 else
1193 { 1277 {
1194 /* If there is no callback drop the message. */ 1278 /* If there is no callback drop the message. */
1195 if (channel->ch_callback == NULL) 1279 if (callback == NULL)
1196 { 1280 {
1197 while ((msg = channel_get(channel)) != NULL) 1281 while ((msg = channel_get(channel, part)) != NULL)
1198 vim_free(msg); 1282 vim_free(msg);
1199 return FALSE; 1283 return FALSE;
1200 } 1284 }
1201 1285
1202 if (ch_mode == MODE_NL) 1286 if (ch_mode == MODE_NL)
1206 1290
1207 /* See if we have a message ending in NL in the first buffer. If 1291 /* See if we have a message ending in NL in the first buffer. If
1208 * not try to concatenate the first and the second buffer. */ 1292 * not try to concatenate the first and the second buffer. */
1209 while (TRUE) 1293 while (TRUE)
1210 { 1294 {
1211 buf = channel_peek(channel); 1295 buf = channel_peek(channel, part);
1212 nl = vim_strchr(buf, NL); 1296 nl = vim_strchr(buf, NL);
1213 if (nl != NULL) 1297 if (nl != NULL)
1214 break; 1298 break;
1215 if (channel_collapse(channel) == FAIL) 1299 if (channel_collapse(channel, part) == FAIL)
1216 return FALSE; /* incomplete message */ 1300 return FALSE; /* incomplete message */
1217 } 1301 }
1218 if (nl[1] == NUL) 1302 if (nl[1] == NUL)
1219 /* get the whole buffer */ 1303 /* get the whole buffer */
1220 msg = channel_get(channel); 1304 msg = channel_get(channel, part);
1221 else 1305 else
1222 { 1306 {
1223 /* Copy the message into allocated memory and remove it from 1307 /* Copy the message into allocated memory and remove it from
1224 * the buffer. */ 1308 * the buffer. */
1225 msg = vim_strnsave(buf, (int)(nl - buf)); 1309 msg = vim_strnsave(buf, (int)(nl - buf));
1227 } 1311 }
1228 } 1312 }
1229 else 1313 else
1230 /* For a raw channel we don't know where the message ends, just 1314 /* For a raw channel we don't know where the message ends, just
1231 * get everything we have. */ 1315 * get everything we have. */
1232 msg = channel_get_all(channel); 1316 msg = channel_get_all(channel, part);
1233 1317
1234 argv[1].v_type = VAR_STRING; 1318 argv[1].v_type = VAR_STRING;
1235 argv[1].vval.v_string = msg; 1319 argv[1].vval.v_string = msg;
1236 } 1320 }
1237 1321
1238 if (seq_nr > 0) 1322 if (seq_nr > 0)
1239 { 1323 {
1240 cbq_T *head = &channel->ch_cb_head; 1324 cbq_T *head = &channel->ch_part[part].ch_cb_head;
1241 cbq_T *item = head->cq_next; 1325 cbq_T *item = head->cq_next;
1242 int done = FALSE; 1326 int done = FALSE;
1243 1327
1244 /* invoke the one-time callback with the matching nr */ 1328 /* invoke the one-time callback with the matching nr */
1245 while (item != NULL) 1329 while (item != NULL)
1259 item = item->cq_next; 1343 item = item->cq_next;
1260 } 1344 }
1261 if (!done) 1345 if (!done)
1262 ch_log(channel, "Dropping message without callback"); 1346 ch_log(channel, "Dropping message without callback");
1263 } 1347 }
1264 else if (channel->ch_callback != NULL) 1348 else if (callback != NULL)
1265 { 1349 {
1266 /* invoke the channel callback */ 1350 /* invoke the channel callback */
1267 ch_log(channel, "Invoking channel callback"); 1351 ch_log(channel, "Invoking channel callback");
1268 invoke_callback(channel, channel->ch_callback, argv); 1352 invoke_callback(channel, callback, argv);
1269 } 1353 }
1270 else 1354 else
1271 ch_log(channel, "Dropping message"); 1355 ch_log(channel, "Dropping message");
1272 1356
1273 if (listtv != NULL) 1357 if (listtv != NULL)
1282 * Also returns FALSE or invalid "channel". 1366 * Also returns FALSE or invalid "channel".
1283 */ 1367 */
1284 int 1368 int
1285 channel_can_write_to(channel_T *channel) 1369 channel_can_write_to(channel_T *channel)
1286 { 1370 {
1287 return channel != NULL && (channel->CH_SOCK != CHAN_FD_INVALID 1371 return channel != NULL && (channel->CH_SOCK_FD != INVALID_FD
1288 #ifdef CHANNEL_PIPES 1372 #ifdef CHANNEL_PIPES
1289 || channel->CH_IN != CHAN_FD_INVALID 1373 || channel->CH_IN_FD != INVALID_FD
1290 #endif 1374 #endif
1291 ); 1375 );
1292 } 1376 }
1293 1377
1294 /* 1378 /*
1296 * Also returns FALSE for invalid "channel". 1380 * Also returns FALSE for invalid "channel".
1297 */ 1381 */
1298 int 1382 int
1299 channel_is_open(channel_T *channel) 1383 channel_is_open(channel_T *channel)
1300 { 1384 {
1301 return channel != NULL && (channel->CH_SOCK != CHAN_FD_INVALID 1385 return channel != NULL && (channel->CH_SOCK_FD != INVALID_FD
1302 #ifdef CHANNEL_PIPES 1386 #ifdef CHANNEL_PIPES
1303 || channel->CH_IN != CHAN_FD_INVALID 1387 || channel->CH_IN_FD != INVALID_FD
1304 || channel->CH_OUT != CHAN_FD_INVALID 1388 || channel->CH_OUT_FD != INVALID_FD
1305 || channel->CH_ERR != CHAN_FD_INVALID 1389 || channel->CH_ERR_FD != INVALID_FD
1306 #endif 1390 #endif
1307 ); 1391 );
1308 } 1392 }
1309 1393
1310 /* 1394 /*
1331 1415
1332 #ifdef FEAT_GUI 1416 #ifdef FEAT_GUI
1333 channel_gui_unregister(channel); 1417 channel_gui_unregister(channel);
1334 #endif 1418 #endif
1335 1419
1336 if (channel->CH_SOCK != CHAN_FD_INVALID) 1420 if (channel->CH_SOCK_FD != INVALID_FD)
1337 { 1421 {
1338 sock_close(channel->CH_SOCK); 1422 sock_close(channel->CH_SOCK_FD);
1339 channel->CH_SOCK = CHAN_FD_INVALID; 1423 channel->CH_SOCK_FD = INVALID_FD;
1340 } 1424 }
1341 #if defined(CHANNEL_PIPES) 1425 #if defined(CHANNEL_PIPES)
1342 if (channel->CH_IN != CHAN_FD_INVALID) 1426 if (channel->CH_IN_FD != INVALID_FD)
1343 { 1427 {
1344 fd_close(channel->CH_IN); 1428 fd_close(channel->CH_IN_FD);
1345 channel->CH_IN = CHAN_FD_INVALID; 1429 channel->CH_IN_FD = INVALID_FD;
1346 } 1430 }
1347 if (channel->CH_OUT != CHAN_FD_INVALID) 1431 if (channel->CH_OUT_FD != INVALID_FD)
1348 { 1432 {
1349 fd_close(channel->CH_OUT); 1433 fd_close(channel->CH_OUT_FD);
1350 channel->CH_OUT = CHAN_FD_INVALID; 1434 channel->CH_OUT_FD = INVALID_FD;
1351 } 1435 }
1352 if (channel->CH_ERR != CHAN_FD_INVALID) 1436 if (channel->CH_ERR_FD != INVALID_FD)
1353 { 1437 {
1354 fd_close(channel->CH_ERR); 1438 fd_close(channel->CH_ERR_FD);
1355 channel->CH_ERR = CHAN_FD_INVALID; 1439 channel->CH_ERR_FD = INVALID_FD;
1356 } 1440 }
1357 #endif 1441 #endif
1358 1442
1359 channel->ch_close_cb = NULL; 1443 channel->ch_close_cb = NULL;
1360 channel_clear(channel); 1444 channel_clear(channel);
1361 } 1445 }
1362 1446
1363 /* 1447 /*
1364 * Store "buf[len]" on channel "channel". 1448 * Return the first buffer from "channel"/"part" without removing it.
1365 * Returns OK or FAIL.
1366 */
1367 int
1368 channel_save(channel_T *channel, char_u *buf, int len)
1369 {
1370 readq_T *node;
1371 readq_T *head = &channel->ch_head;
1372 char_u *p;
1373 int i;
1374
1375 node = (readq_T *)alloc(sizeof(readq_T));
1376 if (node == NULL)
1377 return FAIL; /* out of memory */
1378 node->rq_buffer = alloc(len + 1);
1379 if (node->rq_buffer == NULL)
1380 {
1381 vim_free(node);
1382 return FAIL; /* out of memory */
1383 }
1384
1385 if (channel->ch_mode == MODE_NL)
1386 {
1387 /* Drop any CR before a NL. */
1388 p = node->rq_buffer;
1389 for (i = 0; i < len; ++i)
1390 if (buf[i] != CAR || i + 1 >= len || buf[i + 1] != NL)
1391 *p++ = buf[i];
1392 *p = NUL;
1393 }
1394 else
1395 {
1396 mch_memmove(node->rq_buffer, buf, len);
1397 node->rq_buffer[len] = NUL;
1398 }
1399
1400 /* append node to the tail of the queue */
1401 node->rq_next = NULL;
1402 node->rq_prev = head->rq_prev;
1403 if (head->rq_prev == NULL)
1404 head->rq_next = node;
1405 else
1406 head->rq_prev->rq_next = node;
1407 head->rq_prev = node;
1408
1409 if (log_fd != NULL)
1410 {
1411 ch_log_lead("RECV ", channel);
1412 fprintf(log_fd, "'");
1413 if (fwrite(buf, len, 1, log_fd) != 1)
1414 return FAIL;
1415 fprintf(log_fd, "'\n");
1416 }
1417 return OK;
1418 }
1419
1420 /*
1421 * Return the first buffer from the channel without removing it.
1422 * Returns NULL if there is nothing. 1449 * Returns NULL if there is nothing.
1423 */ 1450 */
1424 char_u * 1451 char_u *
1425 channel_peek(channel_T *channel) 1452 channel_peek(channel_T *channel, int part)
1426 { 1453 {
1427 readq_T *head = &channel->ch_head; 1454 readq_T *head = &channel->ch_part[part].ch_head;
1428 1455
1429 if (head->rq_next == NULL) 1456 if (head->rq_next == NULL)
1430 return NULL; 1457 return NULL;
1431 return head->rq_next->rq_buffer; 1458 return head->rq_next->rq_buffer;
1432 } 1459 }
1433 1460
1434 /* 1461 /*
1435 * Clear the read buffer on channel "channel". 1462 * Clear the read buffer on "channel"/"part".
1436 */ 1463 */
1437 void 1464 static void
1438 channel_clear(channel_T *channel) 1465 channel_clear_one(channel_T *channel, int part)
1439 { 1466 {
1440 jsonq_T *json_head = &channel->ch_json_head; 1467 jsonq_T *json_head = &channel->ch_part[part].ch_json_head;
1441 cbq_T *cb_head = &channel->ch_cb_head; 1468 cbq_T *cb_head = &channel->ch_part[part].ch_cb_head;
1442 1469
1443 while (channel_peek(channel) != NULL) 1470 while (channel_peek(channel, part) != NULL)
1444 vim_free(channel_get(channel)); 1471 vim_free(channel_get(channel, part));
1445 1472
1446 while (cb_head->cq_next != NULL) 1473 while (cb_head->cq_next != NULL)
1447 { 1474 {
1448 cbq_T *node = cb_head->cq_next; 1475 cbq_T *node = cb_head->cq_next;
1449 1476
1456 { 1483 {
1457 free_tv(json_head->jq_next->jq_value); 1484 free_tv(json_head->jq_next->jq_value);
1458 remove_json_node(json_head, json_head->jq_next); 1485 remove_json_node(json_head, json_head->jq_next);
1459 } 1486 }
1460 1487
1488 vim_free(channel->ch_part[part].ch_callback);
1489 channel->ch_part[part].ch_callback = NULL;
1490 }
1491
1492 /*
1493 * Clear all the read buffers on "channel".
1494 */
1495 void
1496 channel_clear(channel_T *channel)
1497 {
1498 channel_clear_one(channel, PART_SOCK);
1499 #ifdef CHANNEL_PIPES
1500 channel_clear_one(channel, PART_OUT);
1501 channel_clear_one(channel, PART_ERR);
1502 #endif
1461 vim_free(channel->ch_callback); 1503 vim_free(channel->ch_callback);
1462 channel->ch_callback = NULL; 1504 channel->ch_callback = NULL;
1463 } 1505 }
1464 1506
1465 #if defined(EXITFREE) || defined(PROTO) 1507 #if defined(EXITFREE) || defined(PROTO)
1490 { 1532 {
1491 if (timeout > 0) 1533 if (timeout > 0)
1492 ch_logn(channel, "Waiting for up to %d msec", timeout); 1534 ch_logn(channel, "Waiting for up to %d msec", timeout);
1493 1535
1494 # ifdef WIN32 1536 # ifdef WIN32
1495 if (fd != channel->CH_SOCK) 1537 if (fd != channel->CH_SOCK_FD)
1496 { 1538 {
1497 DWORD nread; 1539 DWORD nread;
1498 int diff; 1540 int diff;
1499 DWORD deadline = GetTickCount() + timeout; 1541 DWORD deadline = GetTickCount() + timeout;
1500 1542
1565 1607
1566 return next_id++; 1608 return next_id++;
1567 } 1609 }
1568 1610
1569 /* 1611 /*
1570 * Get the file descriptor to read from, either the socket or stdout.
1571 * TODO: should have a way to read stderr.
1572 */
1573 static sock_T
1574 get_read_fd(channel_T *channel)
1575 {
1576 if (channel->CH_SOCK != CHAN_FD_INVALID)
1577 return channel->CH_SOCK;
1578 #if defined(CHANNEL_PIPES)
1579 if (channel->CH_OUT != CHAN_FD_INVALID)
1580 return channel->CH_OUT;
1581 #endif
1582 ch_error(channel, "channel_read() called while socket is closed");
1583 return CHAN_FD_INVALID;
1584 }
1585
1586 /*
1587 * Read from channel "channel" for as long as there is something to read. 1612 * Read from channel "channel" for as long as there is something to read.
1588 * "which" is CHAN_SOCK, CHAN_OUT or CHAN_ERR. When -1 use CHAN_SOCK or 1613 * "part" is PART_SOCK, PART_OUT or PART_ERR.
1589 * CHAN_OUT, the one that is open.
1590 * The data is put in the read queue. 1614 * The data is put in the read queue.
1591 */ 1615 */
1592 void 1616 void
1593 channel_read(channel_T *channel, int which, char *func) 1617 channel_read(channel_T *channel, int part, char *func)
1594 { 1618 {
1595 static char_u *buf = NULL; 1619 static char_u *buf = NULL;
1596 int len = 0; 1620 int len = 0;
1597 int readlen = 0; 1621 int readlen = 0;
1598 sock_T fd; 1622 sock_T fd;
1599 int use_socket = FALSE; 1623 int use_socket = FALSE;
1600 1624
1601 if (which < 0) 1625 fd = channel->ch_part[part].ch_fd;
1602 fd = get_read_fd(channel); 1626 if (fd == INVALID_FD)
1603 else 1627 {
1604 fd = channel->ch_pfd[which].ch_fd; 1628 ch_error(channel, "channel_read() called while socket is closed");
1605 if (fd == CHAN_FD_INVALID)
1606 return; 1629 return;
1607 use_socket = fd == channel->CH_SOCK; 1630 }
1631 use_socket = fd == channel->CH_SOCK_FD;
1608 1632
1609 /* Allocate a buffer to read into. */ 1633 /* Allocate a buffer to read into. */
1610 if (buf == NULL) 1634 if (buf == NULL)
1611 { 1635 {
1612 buf = alloc(MAXMSGSIZE); 1636 buf = alloc(MAXMSGSIZE);
1627 len = fd_read(fd, (char *)buf, MAXMSGSIZE); 1651 len = fd_read(fd, (char *)buf, MAXMSGSIZE);
1628 if (len <= 0) 1652 if (len <= 0)
1629 break; /* error or nothing more to read */ 1653 break; /* error or nothing more to read */
1630 1654
1631 /* Store the read message in the queue. */ 1655 /* Store the read message in the queue. */
1632 channel_save(channel, buf, len); 1656 channel_save(channel, part, buf, len);
1633 readlen += len; 1657 readlen += len;
1634 if (len < MAXMSGSIZE) 1658 if (len < MAXMSGSIZE)
1635 break; /* did read everything that's available */ 1659 break; /* did read everything that's available */
1636 } 1660 }
1637 #ifdef FEAT_GUI_W32 1661 #ifdef FEAT_GUI_W32
1658 * -> ui_breakcheck 1682 * -> ui_breakcheck
1659 * -> gui event loop or select loop 1683 * -> gui event loop or select loop
1660 * -> channel_read() 1684 * -> channel_read()
1661 */ 1685 */
1662 ch_errors(channel, "%s(): Cannot read", func); 1686 ch_errors(channel, "%s(): Cannot read", func);
1663 channel_save(channel, (char_u *)DETACH_MSG, (int)STRLEN(DETACH_MSG)); 1687 channel_save(channel, part,
1688 (char_u *)DETACH_MSG, (int)STRLEN(DETACH_MSG));
1664 1689
1665 /* TODO: When reading from stdout is not possible, should we try to 1690 /* TODO: When reading from stdout is not possible, should we try to
1666 * keep stdin and stderr open? Probably not, assume the other side 1691 * keep stdin and stderr open? Probably not, assume the other side
1667 * has died. */ 1692 * has died. */
1668 channel_close(channel); 1693 channel_close(channel);
1682 gtk_main_quit(); 1707 gtk_main_quit();
1683 #endif 1708 #endif
1684 } 1709 }
1685 1710
1686 /* 1711 /*
1687 * Read from RAW or NL channel "channel". Blocks until there is something to 1712 * Read from RAW or NL "channel"/"part". Blocks until there is something to
1688 * read or the timeout expires. 1713 * read or the timeout expires.
1689 * TODO: add "which" argument and read from stderr.
1690 * Returns what was read in allocated memory. 1714 * Returns what was read in allocated memory.
1691 * Returns NULL in case of error or timeout. 1715 * Returns NULL in case of error or timeout.
1692 */ 1716 */
1693 char_u * 1717 char_u *
1694 channel_read_block(channel_T *channel) 1718 channel_read_block(channel_T *channel, int part)
1695 { 1719 {
1696 char_u *buf; 1720 char_u *buf;
1697 char_u *msg; 1721 char_u *msg;
1698 ch_mode_T mode = channel->ch_mode; 1722 ch_mode_T mode = channel->ch_part[part].ch_mode;
1699 sock_T fd = get_read_fd(channel); 1723 int timeout = channel->ch_part[part].ch_timeout;
1724 sock_T fd = channel->ch_part[part].ch_fd;
1700 char_u *nl; 1725 char_u *nl;
1701 1726
1702 ch_logsn(channel, "Blocking %s read, timeout: %d msec", 1727 ch_logsn(channel, "Blocking %s read, timeout: %d msec",
1703 mode == MODE_RAW ? "RAW" : "NL", channel->ch_timeout); 1728 mode == MODE_RAW ? "RAW" : "NL", timeout);
1704 1729
1705 while (TRUE) 1730 while (TRUE)
1706 { 1731 {
1707 buf = channel_peek(channel); 1732 buf = channel_peek(channel, part);
1708 if (buf != NULL && (mode == MODE_RAW 1733 if (buf != NULL && (mode == MODE_RAW
1709 || (mode == MODE_NL && vim_strchr(buf, NL) != NULL))) 1734 || (mode == MODE_NL && vim_strchr(buf, NL) != NULL)))
1710 break; 1735 break;
1711 if (buf != NULL && channel_collapse(channel) == OK) 1736 if (buf != NULL && channel_collapse(channel, part) == OK)
1712 continue; 1737 continue;
1713 1738
1714 /* Wait for up to the channel timeout. */ 1739 /* Wait for up to the channel timeout. */
1715 if (fd == CHAN_FD_INVALID 1740 if (fd == INVALID_FD
1716 || channel_wait(channel, fd, channel->ch_timeout) == FAIL) 1741 || channel_wait(channel, fd, timeout) == FAIL)
1717 return NULL; 1742 return NULL;
1718 channel_read(channel, -1, "channel_read_block"); 1743 channel_read(channel, part, "channel_read_block");
1719 } 1744 }
1720 1745
1721 if (mode == MODE_RAW) 1746 if (mode == MODE_RAW)
1722 { 1747 {
1723 msg = channel_get_all(channel); 1748 msg = channel_get_all(channel, part);
1724 } 1749 }
1725 else 1750 else
1726 { 1751 {
1727 nl = vim_strchr(buf, NL); 1752 nl = vim_strchr(buf, NL);
1728 if (nl[1] == NUL) 1753 if (nl[1] == NUL)
1729 { 1754 {
1730 /* get the whole buffer */ 1755 /* get the whole buffer */
1731 msg = channel_get(channel); 1756 msg = channel_get(channel, part);
1732 *nl = NUL; 1757 *nl = NUL;
1733 } 1758 }
1734 else 1759 else
1735 { 1760 {
1736 /* Copy the message into allocated memory and remove it from the 1761 /* Copy the message into allocated memory and remove it from the
1743 ch_logn(channel, "Returning %d bytes", (int)STRLEN(msg)); 1768 ch_logn(channel, "Returning %d bytes", (int)STRLEN(msg));
1744 return msg; 1769 return msg;
1745 } 1770 }
1746 1771
1747 /* 1772 /*
1748 * Read one JSON message with ID "id" from channel "channel" and store the 1773 * Read one JSON message with ID "id" from "channel"/"part" and store the
1749 * result in "rettv". 1774 * result in "rettv".
1750 * Blocks until the message is received or the timeout is reached. 1775 * Blocks until the message is received or the timeout is reached.
1751 */ 1776 */
1752 int 1777 int
1753 channel_read_json_block(channel_T *channel, int id, typval_T **rettv) 1778 channel_read_json_block(channel_T *channel, int part, int id, typval_T **rettv)
1754 { 1779 {
1755 int more; 1780 int more;
1756 sock_T fd; 1781 sock_T fd;
1757 1782
1758 ch_log(channel, "Reading JSON"); 1783 ch_log(channel, "Reading JSON");
1759 channel->ch_block_id = id; 1784 channel->ch_part[part].ch_block_id = id;
1760 for (;;) 1785 for (;;)
1761 { 1786 {
1762 more = channel_parse_json(channel); 1787 more = channel_parse_json(channel, part);
1763 1788
1764 /* search for messsage "id" */ 1789 /* search for messsage "id" */
1765 if (channel_get_json(channel, id, rettv) == OK) 1790 if (channel_get_json(channel, part, id, rettv) == OK)
1766 { 1791 {
1767 channel->ch_block_id = 0; 1792 channel->ch_part[part].ch_block_id = 0;
1768 return OK; 1793 return OK;
1769 } 1794 }
1770 1795
1771 if (!more) 1796 if (!more)
1772 { 1797 {
1774 * messages may have arrived. */ 1799 * messages may have arrived. */
1775 if (channel_parse_messages()) 1800 if (channel_parse_messages())
1776 continue; 1801 continue;
1777 1802
1778 /* Wait for up to the channel timeout. */ 1803 /* Wait for up to the channel timeout. */
1779 fd = get_read_fd(channel); 1804 fd = channel->ch_part[part].ch_fd;
1780 if (fd == CHAN_FD_INVALID 1805 if (fd == INVALID_FD || channel_wait(channel, fd,
1781 || channel_wait(channel, fd, channel->ch_timeout) == FAIL) 1806 channel->ch_part[part].ch_timeout) == FAIL)
1782 break; 1807 break;
1783 channel_read(channel, -1, "channel_read_json_block"); 1808 channel_read(channel, part, "channel_read_json_block");
1784 } 1809 }
1785 } 1810 }
1786 channel->ch_block_id = 0; 1811 channel->ch_part[part].ch_block_id = 0;
1787 return FAIL; 1812 return FAIL;
1788 } 1813 }
1789 1814
1790 # if defined(WIN32) || defined(PROTO) 1815 # if defined(WIN32) || defined(PROTO)
1791 /* 1816 /*
1792 * Lookup the channel from the socket. Set "which" to the fd index. 1817 * Lookup the channel from the socket. Set "part" to the fd index.
1793 * Returns NULL when the socket isn't found. 1818 * Returns NULL when the socket isn't found.
1794 */ 1819 */
1795 channel_T * 1820 channel_T *
1796 channel_fd2channel(sock_T fd, int *whichp) 1821 channel_fd2channel(sock_T fd, int *part)
1797 { 1822 {
1798 channel_T *channel; 1823 channel_T *channel;
1799 int i; 1824 int part;
1800 1825
1801 if (fd != CHAN_FD_INVALID) 1826 if (fd != INVALID_FD)
1802 for (channel = first_channel; channel != NULL; 1827 for (channel = first_channel; channel != NULL;
1803 channel = channel->ch_next) 1828 channel = channel->ch_next)
1804 { 1829 {
1805 # ifdef CHANNEL_PIPES 1830 # ifdef CHANNEL_PIPES
1806 for (i = CHAN_SOCK; i < CHAN_IN; ++i) 1831 for (part = PART_SOCK; part < PART_IN; ++part)
1807 # else 1832 # else
1808 i = CHAN_SOCK; 1833 part = PART_SOCK;
1809 # endif 1834 # endif
1810 if (channel->ch_pfd[i].ch_fd == fd) 1835 if (channel->ch_part[part].ch_fd == fd)
1811 { 1836 {
1812 *whichp = i; 1837 *part = part;
1813 return channel; 1838 return channel;
1814 } 1839 }
1815 } 1840 }
1816 return NULL; 1841 return NULL;
1817 } 1842 }
1818 1843
1819 void 1844 void
1820 channel_handle_events(void) 1845 channel_handle_events(void)
1821 { 1846 {
1822 channel_T *channel; 1847 channel_T *channel;
1823 int which; 1848 int part;
1824 static int loop = 0; 1849 static int loop = 0;
1825 1850
1826 /* Skip heavily polling */ 1851 /* Skip heavily polling */
1827 if (loop++ % 2) 1852 if (loop++ % 2)
1828 return; 1853 return;
1829 1854
1830 for (channel = first_channel; channel != NULL; channel = channel->ch_next) 1855 for (channel = first_channel; channel != NULL; channel = channel->ch_next)
1831 { 1856 {
1832 # ifdef FEAT_GUI_W32 1857 # ifdef FEAT_GUI_W32
1833 /* only check the pipes */ 1858 /* only check the pipes */
1834 for (which = CHAN_OUT; which < CHAN_ERR; ++which) 1859 for (part = PART_OUT; part <= PART_ERR; ++part)
1835 # else 1860 # else
1836 # ifdef CHANNEL_PIPES 1861 # ifdef CHANNEL_PIPES
1837 /* check the socket and pipes */ 1862 /* check the socket and pipes */
1838 for (which = CHAN_SOCK; which < CHAN_ERR; ++which) 1863 for (part = PART_SOCK; part <= PART_ERR; ++part)
1839 # else 1864 # else
1840 /* only check the socket */ 1865 /* only check the socket */
1841 which = CHAN_SOCK; 1866 part = PART_SOCK;
1842 # endif 1867 # endif
1843 # endif 1868 # endif
1844 channel_read(channel, which, "channel_handle_events"); 1869 channel_read(channel, part, "channel_handle_events");
1845 } 1870 }
1846 } 1871 }
1847 # endif 1872 # endif
1848 1873
1849 /* 1874 /*
1850 * Write "buf" (NUL terminated string) to channel "channel". 1875 * Write "buf" (NUL terminated string) to "channel"/"part".
1851 * When "fun" is not NULL an error message might be given. 1876 * When "fun" is not NULL an error message might be given.
1852 * Return FAIL or OK. 1877 * Return FAIL or OK.
1853 */ 1878 */
1854 int 1879 int
1855 channel_send(channel_T *channel, char_u *buf, char *fun) 1880 channel_send(channel_T *channel, int part, char_u *buf, char *fun)
1856 { 1881 {
1857 int len = (int)STRLEN(buf); 1882 int len = (int)STRLEN(buf);
1858 int res; 1883 int res;
1859 sock_T fd = CHAN_FD_INVALID; 1884 sock_T fd;
1860 int use_socket = FALSE; 1885
1861 1886 fd = channel->ch_part[part].ch_fd;
1862 if (channel->CH_SOCK != CHAN_FD_INVALID) 1887 if (fd == INVALID_FD)
1863 {
1864 fd = channel->CH_SOCK;
1865 use_socket = TRUE;
1866 }
1867 #if defined(CHANNEL_PIPES)
1868 else if (channel->CH_IN != CHAN_FD_INVALID)
1869 fd = channel->CH_IN;
1870 #endif
1871 if (fd == CHAN_FD_INVALID)
1872 { 1888 {
1873 if (!channel->ch_error && fun != NULL) 1889 if (!channel->ch_error && fun != NULL)
1874 { 1890 {
1875 ch_errors(channel, "%s(): write while not connected", fun); 1891 ch_errors(channel, "%s(): write while not connected", fun);
1876 EMSG2("E630: %s(): write while not connected", fun); 1892 EMSG2("E630: %s(): write while not connected", fun);
1886 ignored = (int)fwrite(buf, len, 1, log_fd); 1902 ignored = (int)fwrite(buf, len, 1, log_fd);
1887 fprintf(log_fd, "'\n"); 1903 fprintf(log_fd, "'\n");
1888 fflush(log_fd); 1904 fflush(log_fd);
1889 } 1905 }
1890 1906
1891 if (use_socket) 1907 if (part == PART_SOCK)
1892 res = sock_write(fd, (char *)buf, len); 1908 res = sock_write(fd, (char *)buf, len);
1893 else 1909 else
1894 res = fd_write(fd, (char *)buf, len); 1910 res = fd_write(fd, (char *)buf, len);
1895 if (res != len) 1911 if (res != len)
1896 { 1912 {
1917 channel_poll_setup(int nfd_in, void *fds_in) 1933 channel_poll_setup(int nfd_in, void *fds_in)
1918 { 1934 {
1919 int nfd = nfd_in; 1935 int nfd = nfd_in;
1920 channel_T *channel; 1936 channel_T *channel;
1921 struct pollfd *fds = fds_in; 1937 struct pollfd *fds = fds_in;
1922 int which; 1938 int part;
1923 1939
1924 for (channel = first_channel; channel != NULL; channel = channel->ch_next) 1940 for (channel = first_channel; channel != NULL; channel = channel->ch_next)
1925 { 1941 {
1926 # ifdef CHANNEL_PIPES 1942 # ifdef CHANNEL_PIPES
1927 for (which = CHAN_SOCK; which < CHAN_IN; ++which) 1943 for (part = PART_SOCK; part < PART_IN; ++part)
1928 # else 1944 # else
1929 which = CHAN_SOCK; 1945 part = PART_SOCK;
1930 # endif 1946 # endif
1931 { 1947 {
1932 if (channel->ch_pfd[which].ch_fd != CHAN_FD_INVALID) 1948 if (channel->ch_part[part].ch_fd != INVALID_FD)
1933 { 1949 {
1934 channel->ch_pfd[which].ch_poll_idx = nfd; 1950 channel->ch_part[part].ch_poll_idx = nfd;
1935 fds[nfd].fd = channel->ch_pfd[which].ch_fd; 1951 fds[nfd].fd = channel->ch_part[part].ch_fd;
1936 fds[nfd].events = POLLIN; 1952 fds[nfd].events = POLLIN;
1937 nfd++; 1953 nfd++;
1938 } 1954 }
1939 else 1955 else
1940 channel->ch_pfd[which].ch_poll_idx = -1; 1956 channel->ch_part[part].ch_poll_idx = -1;
1941 } 1957 }
1942 } 1958 }
1943 1959
1944 return nfd; 1960 return nfd;
1945 } 1961 }
1951 channel_poll_check(int ret_in, void *fds_in) 1967 channel_poll_check(int ret_in, void *fds_in)
1952 { 1968 {
1953 int ret = ret_in; 1969 int ret = ret_in;
1954 channel_T *channel; 1970 channel_T *channel;
1955 struct pollfd *fds = fds_in; 1971 struct pollfd *fds = fds_in;
1956 int which; 1972 int part;
1957 1973
1958 for (channel = first_channel; channel != NULL; channel = channel->ch_next) 1974 for (channel = first_channel; channel != NULL; channel = channel->ch_next)
1959 { 1975 {
1960 # ifdef CHANNEL_PIPES 1976 # ifdef CHANNEL_PIPES
1961 for (which = CHAN_SOCK; which < CH_IN; ++which) 1977 for (part = PART_SOCK; part < PART_IN; ++part)
1962 # else 1978 # else
1963 which = CHAN_SOCK; 1979 part = PART_SOCK;
1964 # endif 1980 # endif
1965 { 1981 {
1966 int idx = channel->ch_pfd[which].ch_poll_idx; 1982 int idx = channel->ch_part[part].ch_poll_idx;
1967 1983
1968 if (ret > 0 && idx != -1 && fds[idx].revents & POLLIN) 1984 if (ret > 0 && idx != -1 && fds[idx].revents & POLLIN)
1969 { 1985 {
1970 channel_read(channel, which, "channel_poll_check"); 1986 channel_read(channel, part, "channel_poll_check");
1971 --ret; 1987 --ret;
1972 } 1988 }
1973 } 1989 }
1974 } 1990 }
1975 1991
1985 channel_select_setup(int maxfd_in, void *rfds_in) 2001 channel_select_setup(int maxfd_in, void *rfds_in)
1986 { 2002 {
1987 int maxfd = maxfd_in; 2003 int maxfd = maxfd_in;
1988 channel_T *channel; 2004 channel_T *channel;
1989 fd_set *rfds = rfds_in; 2005 fd_set *rfds = rfds_in;
1990 int which; 2006 int part;
1991 2007
1992 for (channel = first_channel; channel != NULL; channel = channel->ch_next) 2008 for (channel = first_channel; channel != NULL; channel = channel->ch_next)
1993 { 2009 {
1994 # ifdef CHANNEL_PIPES 2010 # ifdef CHANNEL_PIPES
1995 for (which = CHAN_SOCK; which < CHAN_IN; ++which) 2011 for (part = PART_SOCK; part < PART_IN; ++part)
1996 # else 2012 # else
1997 which = CHAN_SOCK; 2013 part = PART_SOCK;
1998 # endif 2014 # endif
1999 { 2015 {
2000 sock_T fd = channel->ch_pfd[which].ch_fd; 2016 sock_T fd = channel->ch_part[part].ch_fd;
2001 2017
2002 if (fd != CHAN_FD_INVALID) 2018 if (fd != INVALID_FD)
2003 { 2019 {
2004 FD_SET((int)fd, rfds); 2020 FD_SET((int)fd, rfds);
2005 if (maxfd < (int)fd) 2021 if (maxfd < (int)fd)
2006 maxfd = (int)fd; 2022 maxfd = (int)fd;
2007 } 2023 }
2018 channel_select_check(int ret_in, void *rfds_in) 2034 channel_select_check(int ret_in, void *rfds_in)
2019 { 2035 {
2020 int ret = ret_in; 2036 int ret = ret_in;
2021 channel_T *channel; 2037 channel_T *channel;
2022 fd_set *rfds = rfds_in; 2038 fd_set *rfds = rfds_in;
2023 int which; 2039 int part;
2024 2040
2025 for (channel = first_channel; channel != NULL; channel = channel->ch_next) 2041 for (channel = first_channel; channel != NULL; channel = channel->ch_next)
2026 { 2042 {
2027 # ifdef CHANNEL_PIPES 2043 # ifdef CHANNEL_PIPES
2028 for (which = CHAN_SOCK; which < CHAN_IN; ++which) 2044 for (part = PART_SOCK; part < PART_IN; ++part)
2029 # else 2045 # else
2030 which = CHAN_SOCK; 2046 part = PART_SOCK;
2031 # endif 2047 # endif
2032 { 2048 {
2033 sock_T fd = channel->ch_pfd[which].ch_fd; 2049 sock_T fd = channel->ch_part[part].ch_fd;
2034 2050
2035 if (ret > 0 && fd != CHAN_FD_INVALID && FD_ISSET(fd, rfds)) 2051 if (ret > 0 && fd != INVALID_FD && FD_ISSET(fd, rfds))
2036 { 2052 {
2037 channel_read(channel, which, "channel_select_check"); 2053 channel_read(channel, part, "channel_select_check");
2038 --ret; 2054 --ret;
2039 } 2055 }
2040 } 2056 }
2041 } 2057 }
2042 2058
2053 channel_parse_messages(void) 2069 channel_parse_messages(void)
2054 { 2070 {
2055 channel_T *channel = first_channel; 2071 channel_T *channel = first_channel;
2056 int ret = FALSE; 2072 int ret = FALSE;
2057 int r; 2073 int r;
2074 int part = PART_SOCK;
2058 2075
2059 while (channel != NULL) 2076 while (channel != NULL)
2060 { 2077 {
2061 /* Increase the refcount, in case the handler causes the channel to be 2078 if (channel->ch_part[part].ch_fd != INVALID_FD)
2062 * unreferenced or closed. */ 2079 {
2063 ++channel->ch_refcount; 2080 /* Increase the refcount, in case the handler causes the channel
2064 r = may_invoke_callback(channel); 2081 * to be unreferenced or closed. */
2065 if (channel_unref(channel)) 2082 ++channel->ch_refcount;
2066 /* channel was freed, start over */ 2083 r = may_invoke_callback(channel, part);
2067 channel = first_channel; 2084 if (r == OK)
2068 2085 ret = TRUE;
2069 if (r == OK) 2086 if (channel_unref(channel) || r == OK)
2070 { 2087 {
2071 channel = first_channel; /* something was done, start over */ 2088 /* channel was freed or something was done, start over */
2072 ret = TRUE; 2089 channel = first_channel;
2073 } 2090 part = PART_SOCK;
2091 continue;
2092 }
2093 }
2094 #ifdef CHANNEL_PIPES
2095 if (part < PART_ERR)
2096 ++part;
2074 else 2097 else
2098 #endif
2099 {
2075 channel = channel->ch_next; 2100 channel = channel->ch_next;
2101 part = PART_SOCK;
2102 }
2076 } 2103 }
2077 return ret; 2104 return ret;
2078 } 2105 }
2079 2106
2080 /* 2107 /*
2083 int 2110 int
2084 set_ref_in_channel(int copyID) 2111 set_ref_in_channel(int copyID)
2085 { 2112 {
2086 int abort = FALSE; 2113 int abort = FALSE;
2087 channel_T *channel; 2114 channel_T *channel;
2115 int part;
2088 2116
2089 for (channel = first_channel; channel != NULL; channel = channel->ch_next) 2117 for (channel = first_channel; channel != NULL; channel = channel->ch_next)
2090 { 2118 {
2091 jsonq_T *head = &channel->ch_json_head; 2119 #ifdef CHANNEL_PIPES
2092 jsonq_T *item = head->jq_next; 2120 for (part = PART_SOCK; part < PART_IN; ++part)
2093 2121 #else
2094 while (item != NULL) 2122 part = PART_SOCK;
2095 { 2123 #endif
2096 list_T *l = item->jq_value->vval.v_list; 2124 {
2097 2125 jsonq_T *head = &channel->ch_part[part].ch_json_head;
2098 if (l->lv_copyID != copyID) 2126 jsonq_T *item = head->jq_next;
2127
2128 while (item != NULL)
2099 { 2129 {
2100 l->lv_copyID = copyID; 2130 list_T *l = item->jq_value->vval.v_list;
2101 abort = abort || set_ref_in_list(l, copyID, NULL); 2131
2132 if (l->lv_copyID != copyID)
2133 {
2134 l->lv_copyID = copyID;
2135 abort = abort || set_ref_in_list(l, copyID, NULL);
2136 }
2137 item = item->jq_next;
2102 } 2138 }
2103 item = item->jq_next;
2104 } 2139 }
2105 } 2140 }
2106 return abort; 2141 return abort;
2107 } 2142 }
2108 2143
2109 /* 2144 /*
2110 * Return the mode of channel "channel". 2145 * Return the "part" to write to for "channel".
2146 */
2147 int
2148 channel_part_send(channel_T *channel)
2149 {
2150 #ifdef CHANNEL_PIPES
2151 if (channel->CH_SOCK_FD == INVALID_FD)
2152 return PART_IN;
2153 #endif
2154 return PART_SOCK;
2155 }
2156
2157 /*
2158 * Return the default "part" to read from for "channel".
2159 */
2160 int
2161 channel_part_read(channel_T *channel)
2162 {
2163 #ifdef CHANNEL_PIPES
2164 if (channel->CH_SOCK_FD == INVALID_FD)
2165 return PART_OUT;
2166 #endif
2167 return PART_SOCK;
2168 }
2169
2170 /*
2171 * Return the mode of "channel"/"part"
2111 * If "channel" is invalid returns MODE_JSON. 2172 * If "channel" is invalid returns MODE_JSON.
2112 */ 2173 */
2113 ch_mode_T 2174 ch_mode_T
2114 channel_get_mode(channel_T *channel) 2175 channel_get_mode(channel_T *channel, int part)
2115 { 2176 {
2116 if (channel == NULL) 2177 if (channel == NULL)
2117 return MODE_JSON; 2178 return MODE_JSON;
2118 return channel->ch_mode; 2179 return channel->ch_part[part].ch_mode;
2119 } 2180 }
2120 2181
2121 #endif /* FEAT_CHANNEL */ 2182 #endif /* FEAT_CHANNEL */