# HG changeset patch # User Christian Brabandt # Date 1459186205 -7200 # Node ID f8707ec9efe4a4e6f1fffd123d2caab9084c632e # Parent e43830c12eb2cb8313e6301a745f497c3c5149b1 commit https://github.com/vim/vim/commit/8b877ac38e96424a08a8b8eb713ef4b3cf0064be Author: Bram Moolenaar Date: Mon Mar 28 19:16:20 2016 +0200 patch 7.4.1669 Problem: When writing buffer lines to a pipe Vim may block. Solution: Avoid blocking, write more lines later. diff --git a/src/channel.c b/src/channel.c --- a/src/channel.c +++ b/src/channel.c @@ -973,6 +973,7 @@ channel_set_job(channel_T *channel, job_ /* Special mode: send last-but-one line when appending a line * to the buffer. */ in_part->ch_buffer->b_write_to_channel = TRUE; + in_part->ch_buf_append = TRUE; in_part->ch_buf_top = in_part->ch_buffer->b_ml.ml_line_count + 1; } @@ -1047,6 +1048,8 @@ channel_set_options(channel_T *channel, channel->ch_part[PART_OUT].ch_timeout = opt->jo_out_timeout; if (opt->jo_set & JO_ERR_TIMEOUT) channel->ch_part[PART_ERR].ch_timeout = opt->jo_err_timeout; + if (opt->jo_set & JO_BLOCK_WRITE) + channel->ch_part[PART_IN].ch_block_write = 1; if (opt->jo_set & JO_CALLBACK) { @@ -1193,9 +1196,78 @@ write_buf_line(buf_T *buf, linenr_T lnum } /* + * Return TRUE if "channel" can be written to. + * Returns FALSE if the input is closed or the write would block. + */ + static int +can_write_buf_line(channel_T *channel) +{ + chanpart_T *in_part = &channel->ch_part[PART_IN]; + + if (in_part->ch_fd == INVALID_FD) + return FALSE; /* pipe was closed */ + + /* for testing: block every other attempt to write */ + if (in_part->ch_block_write == 1) + in_part->ch_block_write = -1; + else if (in_part->ch_block_write == -1) + in_part->ch_block_write = 1; + + /* TODO: Win32 implementation, probably using WaitForMultipleObjects() */ +#ifndef WIN32 + { +# if defined(HAVE_SELECT) + struct timeval tval; + fd_set wfds; + int ret; + + FD_ZERO(&wfds); + FD_SET((int)in_part->ch_fd, &wfds); + tval.tv_sec = 0; + tval.tv_usec = 0; + for (;;) + { + ret = select((int)in_part->ch_fd + 1, NULL, &wfds, NULL, &tval); +# ifdef EINTR + SOCK_ERRNO; + if (ret == -1 && errno == EINTR) + continue; +# endif + if (ret <= 0 || in_part->ch_block_write == 1) + { + if (ret > 0) + ch_log(channel, "FAKED Input not ready for writing"); + else + ch_log(channel, "Input not ready for writing"); + return FALSE; + } + break; + } +# else + struct pollfd fds; + + fds.fd = in_part->ch_fd; + fds.events = POLLOUT; + if (poll(&fds, 1, 0) <= 0) + { + ch_log(channel, "Input not ready for writing"); + return FALSE; + } + if (in_part->ch_block_write == 1) + { + ch_log(channel, "FAKED Input not ready for writing"); + return FALSE; + } +# endif + } +#endif + return TRUE; +} + +/* * Write any lines to the input channel. */ - void + static void channel_write_in(channel_T *channel) { chanpart_T *in_part = &channel->ch_part[PART_IN]; @@ -1203,8 +1275,8 @@ channel_write_in(channel_T *channel) buf_T *buf = in_part->ch_buffer; int written = 0; - if (buf == NULL) - return; + if (buf == NULL || in_part->ch_buf_append) + return; /* no buffer or using appending */ if (!buf_valid(buf) || buf->b_ml.ml_mfp == NULL) { /* buffer was wiped out or unloaded */ @@ -1215,10 +1287,8 @@ channel_write_in(channel_T *channel) for (lnum = in_part->ch_buf_top; lnum <= in_part->ch_buf_bot && lnum <= buf->b_ml.ml_line_count; ++lnum) { - if (in_part->ch_fd == INVALID_FD) - /* pipe was closed */ - return; - /* TODO: check if channel can be written to, do not block on write */ + if (!can_write_buf_line(channel)) + break; write_buf_line(buf, lnum, channel); ++written; } @@ -1229,6 +1299,37 @@ channel_write_in(channel_T *channel) ch_logn(channel, "written %d lines to channel", written); in_part->ch_buf_top = lnum; + if (lnum > buf->b_ml.ml_line_count) + { + /* Writing is done, no longer need the buffer. */ + in_part->ch_buffer = NULL; + ch_log(channel, "Finished writing all lines to channel"); + } + else + ch_logn(channel, "Still %d more lines to write", + buf->b_ml.ml_line_count - lnum + 1); +} + +/* + * Write any lines waiting to be written to a channel. + */ + void +channel_write_any_lines() +{ + channel_T *channel; + + for (channel = first_channel; channel != NULL; channel = channel->ch_next) + { + chanpart_T *in_part = &channel->ch_part[PART_IN]; + + if (in_part->ch_buffer != NULL) + { + if (in_part->ch_buf_append) + channel_write_new_lines(in_part->ch_buffer); + else + channel_write_in(channel); + } + } } /* @@ -1248,15 +1349,16 @@ channel_write_new_lines(buf_T *buf) linenr_T lnum; int written = 0; - if (in_part->ch_buffer == buf) + if (in_part->ch_buffer == buf && in_part->ch_buf_append) { if (in_part->ch_fd == INVALID_FD) - /* pipe was closed */ - continue; + continue; /* pipe was closed */ found_one = TRUE; for (lnum = in_part->ch_buf_bot; lnum < buf->b_ml.ml_line_count; ++lnum) { + if (!can_write_buf_line(channel)) + break; write_buf_line(buf, lnum, channel); ++written; } @@ -1265,6 +1367,9 @@ channel_write_new_lines(buf_T *buf) ch_logn(channel, "written line %d to channel", (int)lnum - 1); else if (written > 1) ch_logn(channel, "written %d lines to channel", written); + if (lnum < buf->b_ml.ml_line_count) + ch_logn(channel, "Still %d more lines to write", + buf->b_ml.ml_line_count - lnum); in_part->ch_buf_bot = lnum; } @@ -2379,6 +2484,57 @@ channel_free_all(void) /* Buffer size for reading incoming messages. */ #define MAXMSGSIZE 4096 +#if defined(HAVE_SELECT) +/* + * Add write fds where we are waiting for writing to be possible. + */ + static int +channel_fill_wfds(int maxfd_arg, fd_set *wfds) +{ + int maxfd = maxfd_arg; + channel_T *ch; + + for (ch = first_channel; ch != NULL; ch = ch->ch_next) + { + chanpart_T *in_part = &ch->ch_part[PART_IN]; + + if (in_part->ch_fd != INVALID_FD && in_part->ch_buffer != NULL) + { + FD_SET((int)in_part->ch_fd, wfds); + if ((int)in_part->ch_fd >= maxfd) + maxfd = (int)in_part->ch_fd + 1; + } + } + return maxfd; +} +#else +/* + * Add write fds where we are waiting for writing to be possible. + */ + static int +channel_fill_poll_write(int nfd_in, struct pollfd *fds) +{ + int nfd = nfd_in; + channel_T *ch; + + for (ch = first_channel; ch != NULL; ch = ch->ch_next) + { + chanpart_T *in_part = &ch->ch_part[PART_IN]; + + if (in_part->ch_fd != INVALID_FD && in_part->ch_buffer != NULL) + { + in_part->ch_poll_idx = nfd; + fds[nfd].fd = in_part->ch_fd; + fds[nfd].events = POLLOUT; + ++nfd; + } + else + in_part->ch_poll_idx = -1; + } + return nfd; +} +#endif + /* * Check for reading from "fd" with "timeout" msec. * Return FAIL when there is nothing to read. @@ -2403,6 +2559,10 @@ channel_wait(channel_T *channel, sock_T if (PeekNamedPipe((HANDLE)fd, NULL, 0, NULL, &nread, NULL) && nread > 0) return OK; + + /* perhaps write some buffer lines */ + channel_write_any_lines(); + sleep_time = deadline - GetTickCount(); if (sleep_time <= 0) break; @@ -2422,31 +2582,56 @@ channel_wait(channel_T *channel, sock_T #if defined(HAVE_SELECT) struct timeval tval; fd_set rfds; - int ret; - - FD_ZERO(&rfds); - FD_SET((int)fd, &rfds); + fd_set wfds; + int ret; + int maxfd; + tval.tv_sec = timeout / 1000; tval.tv_usec = (timeout % 1000) * 1000; for (;;) { - ret = select((int)fd + 1, &rfds, NULL, NULL, &tval); + FD_ZERO(&rfds); + FD_SET((int)fd, &rfds); + + /* Write lines to a pipe when a pipe can be written to. Need to + * set this every time, some buffers may be done. */ + maxfd = (int)fd + 1; + FD_ZERO(&wfds); + maxfd = channel_fill_wfds(maxfd, &wfds); + + ret = select(maxfd, &rfds, &wfds, NULL, &tval); # ifdef EINTR SOCK_ERRNO; if (ret == -1 && errno == EINTR) continue; # endif if (ret > 0) - return OK; + { + if (FD_ISSET(fd, &rfds)) + return OK; + channel_write_any_lines(); + continue; + } break; } #else - struct pollfd fds; - - fds.fd = fd; - fds.events = POLLIN; - if (poll(&fds, 1, timeout) > 0) - return OK; + for (;;) + { + struct pollfd fds[MAX_OPEN_CHANNELS + 1]; + int nfd = 1; + + fds[0].fd = fd; + fds[0].events = POLLIN; + nfd = channel_fill_poll_write(nfd, fds); + if (poll(fds, nfd, timeout) > 0) + { + if (fds[0].revents & POLLIN) + return OK; + channel_write_any_lines(); + continue; + } + break; + } #endif } return FAIL; @@ -3010,10 +3195,12 @@ channel_poll_setup(int nfd_in, void *fds { for (part = PART_SOCK; part < PART_IN; ++part) { - if (channel->ch_part[part].ch_fd != INVALID_FD) + chanpart_T *ch_part = &channel->ch_part[part]; + + if (ch_part->ch_fd != INVALID_FD) { - channel->ch_part[part].ch_poll_idx = nfd; - fds[nfd].fd = channel->ch_part[part].ch_fd; + ch_part->ch_poll_idx = nfd; + fds[nfd].fd = ch_part->ch_fd; fds[nfd].events = POLLIN; nfd++; } @@ -3022,6 +3209,8 @@ channel_poll_setup(int nfd_in, void *fds } } + nfd = channel_fill_poll_write(nfd, fds); + return nfd; } @@ -3035,19 +3224,35 @@ channel_poll_check(int ret_in, void *fds channel_T *channel; struct pollfd *fds = fds_in; int part; + int idx; + chanpart_T *in_part; for (channel = first_channel; channel != NULL; channel = channel->ch_next) { for (part = PART_SOCK; part < PART_IN; ++part) { - int idx = channel->ch_part[part].ch_poll_idx; - - if (ret > 0 && idx != -1 && fds[idx].revents & POLLIN) + idx = channel->ch_part[part].ch_poll_idx; + + if (ret > 0 && idx != -1 && (fds[idx].revents & POLLIN)) { channel_read(channel, part, "channel_poll_check"); --ret; } } + + in_part = &channel->ch_part[PART_IN]; + idx = in_part->ch_poll_idx; + if (ret > 0 && idx != -1 && (fds[idx].revents & POLLOUT)) + { + if (in_part->ch_buf_append) + { + if (in_part->ch_buffer != NULL) + channel_write_new_lines(in_part->ch_buffer); + } + else + channel_write_in(channel); + --ret; + } } return ret; @@ -3056,14 +3261,15 @@ channel_poll_check(int ret_in, void *fds # if (!defined(WIN32) && defined(HAVE_SELECT)) || defined(PROTO) /* - * The type of "rfds" is hidden to avoid problems with the function proto. + * The "fd_set" type is hidden to avoid problems with the function proto. */ int -channel_select_setup(int maxfd_in, void *rfds_in) +channel_select_setup(int maxfd_in, void *rfds_in, void *wfds_in) { int maxfd = maxfd_in; channel_T *channel; fd_set *rfds = rfds_in; + fd_set *wfds = wfds_in; int part; for (channel = first_channel; channel != NULL; channel = channel->ch_next) @@ -3081,19 +3287,23 @@ channel_select_setup(int maxfd_in, void } } + maxfd = channel_fill_wfds(maxfd, wfds); + return maxfd; } /* - * The type of "rfds" is hidden to avoid problems with the function proto. + * The "fd_set" type is hidden to avoid problems with the function proto. */ int -channel_select_check(int ret_in, void *rfds_in) +channel_select_check(int ret_in, void *rfds_in, void *wfds_in) { int ret = ret_in; channel_T *channel; fd_set *rfds = rfds_in; + fd_set *wfds = wfds_in; int part; + chanpart_T *in_part; for (channel = first_channel; channel != NULL; channel = channel->ch_next) { @@ -3107,6 +3317,20 @@ channel_select_check(int ret_in, void *r --ret; } } + + in_part = &channel->ch_part[PART_IN]; + if (ret > 0 && in_part->ch_fd != INVALID_FD + && FD_ISSET(in_part->ch_fd, wfds)) + { + if (in_part->ch_buf_append) + { + if (in_part->ch_buffer != NULL) + channel_write_new_lines(in_part->ch_buffer); + } + else + channel_write_in(channel); + --ret; + } } return ret; @@ -3608,6 +3832,13 @@ get_job_options(typval_T *tv, jobopt_T * return FAIL; } } + else if (STRCMP(hi->hi_key, "block_write") == 0) + { + if (!(supported & JO_BLOCK_WRITE)) + break; + opt->jo_set |= JO_BLOCK_WRITE; + opt->jo_block_write = get_tv_number(item); + } else break; --todo; @@ -3827,8 +4058,8 @@ job_start(typval_T *argvars) clear_job_options(&opt); opt.jo_mode = MODE_NL; if (get_job_options(&argvars[1], &opt, - JO_MODE_ALL + JO_CB_ALL + JO_TIMEOUT_ALL - + JO_STOPONEXIT + JO_EXIT_CB + JO_OUT_IO) == FAIL) + JO_MODE_ALL + JO_CB_ALL + JO_TIMEOUT_ALL + JO_STOPONEXIT + + JO_EXIT_CB + JO_OUT_IO + JO_BLOCK_WRITE) == FAIL) return job; /* Check that when io is "file" that there is a file name. */ diff --git a/src/misc2.c b/src/misc2.c --- a/src/misc2.c +++ b/src/misc2.c @@ -6230,6 +6230,9 @@ parse_queued_messages(void) netbeans_parse_messages(); # endif # ifdef FEAT_JOB_CHANNEL + /* Write any buffer lines still to be written. */ + channel_write_any_lines(); + /* Process the messages queued on channels. */ channel_parse_messages(); # endif diff --git a/src/os_unix.c b/src/os_unix.c --- a/src/os_unix.c +++ b/src/os_unix.c @@ -5539,7 +5539,8 @@ RealWaitForChar(int fd, long msec, int * # endif #endif #ifndef HAVE_SELECT - struct pollfd fds[6 + MAX_OPEN_CHANNELS]; + /* each channel may use in, out and err */ + struct pollfd fds[6 + 3 * MAX_OPEN_CHANNELS]; int nfd; # ifdef FEAT_XCLIPBOARD int xterm_idx = -1; @@ -5652,7 +5653,7 @@ RealWaitForChar(int fd, long msec, int * struct timeval tv; struct timeval *tvp; - fd_set rfds, efds; + fd_set rfds, wfds, efds; int maxfd; long towait = msec; @@ -5685,6 +5686,7 @@ RealWaitForChar(int fd, long msec, int * */ select_eintr: FD_ZERO(&rfds); + FD_ZERO(&wfds); FD_ZERO(&efds); FD_SET(fd, &rfds); # if !defined(__QNX__) && !defined(__CYGWIN32__) @@ -5725,10 +5727,10 @@ select_eintr: } # endif # ifdef FEAT_JOB_CHANNEL - maxfd = channel_select_setup(maxfd, &rfds); -# endif - - ret = select(maxfd + 1, &rfds, NULL, &efds, tvp); + maxfd = channel_select_setup(maxfd, &rfds, &wfds); +# endif + + ret = select(maxfd + 1, &rfds, &wfds, &efds, tvp); result = ret > 0 && FD_ISSET(fd, &rfds); if (result) --ret; @@ -5810,7 +5812,7 @@ select_eintr: # endif #ifdef FEAT_JOB_CHANNEL if (ret > 0) - ret = channel_select_check(ret, &rfds); + ret = channel_select_check(ret, &rfds, &wfds); #endif #endif /* HAVE_SELECT */ diff --git a/src/proto/channel.pro b/src/proto/channel.pro --- a/src/proto/channel.pro +++ b/src/proto/channel.pro @@ -13,7 +13,7 @@ void channel_set_pipes(channel_T *channe void channel_set_job(channel_T *channel, job_T *job, jobopt_T *options); void channel_set_options(channel_T *channel, jobopt_T *opt); void channel_set_req_callback(channel_T *channel, int part, char_u *callback, partial_T *partial, int id); -void channel_write_in(channel_T *channel); +void channel_write_any_lines(void); void channel_write_new_lines(buf_T *buf); char_u *channel_get(channel_T *channel, int part); int channel_collapse(channel_T *channel, int part); @@ -37,8 +37,8 @@ void ch_expr_common(typval_T *argvars, t void ch_raw_common(typval_T *argvars, typval_T *rettv, int eval); int channel_poll_setup(int nfd_in, void *fds_in); int channel_poll_check(int ret_in, void *fds_in); -int channel_select_setup(int maxfd_in, void *rfds_in); -int channel_select_check(int ret_in, void *rfds_in); +int channel_select_setup(int maxfd_in, void *rfds_in, void *wfds_in); +int channel_select_check(int ret_in, void *rfds_in, void *wfds_in); int channel_parse_messages(void); int set_ref_in_channel(int copyID); int channel_part_send(channel_T *channel); diff --git a/src/structs.h b/src/structs.h --- a/src/structs.h +++ b/src/structs.h @@ -1383,12 +1383,15 @@ typedef struct { #else struct timeval ch_deadline; #endif + int ch_block_write; /* for testing: 0 when not used, -1 when write + * does not block, 1 simulate blocking */ cbq_T ch_cb_head; /* dummy node for per-request callbacks */ char_u *ch_callback; /* call when a msg is not handled */ partial_T *ch_partial; buf_T *ch_buffer; /* buffer to read from or write to */ + int ch_buf_append; /* write appended lines instead top-bot */ linenr_T ch_buf_top; /* next line to send */ linenr_T ch_buf_bot; /* last line to send */ } chanpart_T; @@ -1457,7 +1460,8 @@ struct channel_S { #define JO_ERR_BUF 0x2000000 /* "err_buf" (JO_OUT_BUF << 1) */ #define JO_IN_BUF 0x4000000 /* "in_buf" (JO_OUT_BUF << 2) */ #define JO_CHANNEL 0x8000000 /* "channel" */ -#define JO_ALL 0xfffffff +#define JO_BLOCK_WRITE 0x10000000 /* "block_write" */ +#define JO_ALL 0x7fffffff #define JO_MODE_ALL (JO_MODE + JO_IN_MODE + JO_OUT_MODE + JO_ERR_MODE) #define JO_CB_ALL \ @@ -1499,6 +1503,7 @@ typedef struct int jo_timeout; int jo_out_timeout; int jo_err_timeout; + int jo_block_write; /* for testing only */ int jo_part; int jo_id; char_u jo_soe_buf[NUMBUFLEN]; diff --git a/src/testdir/test_channel.vim b/src/testdir/test_channel.vim --- a/src/testdir/test_channel.vim +++ b/src/testdir/test_channel.vim @@ -791,7 +791,7 @@ func Run_test_pipe_from_buffer(use_name) sp pipe-input call setline(1, ['echo one', 'echo two', 'echo three']) - let options = {'in_io': 'buffer'} + let options = {'in_io': 'buffer', 'block_write': 1} if a:use_name let options['in_name'] = 'pipe-input' else @@ -885,7 +885,8 @@ func Test_pipe_io_two_buffers() let job = job_start(s:python . " test_channel_pipe.py", \ {'in_io': 'buffer', 'in_name': 'pipe-input', 'in_top': 0, - \ 'out_io': 'buffer', 'out_name': 'pipe-output'}) + \ 'out_io': 'buffer', 'out_name': 'pipe-output', + \ 'block_write': 1}) call assert_equal("run", job_status(job)) try exe "normal Gaecho hello\" @@ -920,7 +921,8 @@ func Test_pipe_io_one_buffer() let job = job_start(s:python . " test_channel_pipe.py", \ {'in_io': 'buffer', 'in_name': 'pipe-io', 'in_top': 0, - \ 'out_io': 'buffer', 'out_name': 'pipe-io'}) + \ 'out_io': 'buffer', 'out_name': 'pipe-io', + \ 'block_write': 1}) call assert_equal("run", job_status(job)) try exe "normal Goecho hello\" diff --git a/src/version.c b/src/version.c --- a/src/version.c +++ b/src/version.c @@ -749,6 +749,8 @@ static char *(features[]) = static int included_patches[] = { /* Add new patch number below this line */ /**/ + 1669, +/**/ 1668, /**/ 1667, diff --git a/src/vim.h b/src/vim.h --- a/src/vim.h +++ b/src/vim.h @@ -493,13 +493,11 @@ typedef unsigned long u8char_T; /* l #ifndef HAVE_SELECT # ifdef HAVE_SYS_POLL_H # include -# define HAVE_POLL # elif defined(WIN32) # define HAVE_SELECT # else # ifdef HAVE_POLL_H # include -# define HAVE_POLL # endif # endif #endif