comparison src/channel.c @ 12154:71e10b81226d v8.0.0957

patch 8.0.0957: a terminal job can deadlock when sending many keys commit https://github.com/vim/vim/commit/97bd5e6527bf2b48acdd1550acba161e82a5bc99 Author: Bram Moolenaar <Bram@vim.org> Date: Fri Aug 18 20:50:30 2017 +0200 patch 8.0.0957: a terminal job can deadlock when sending many keys Problem: When term_sendkeys() sends many keys it may get stuck in writing to the job. Solution: Make the write non-blocking, buffer keys to be sent.
author Christian Brabandt <cb@256bit.org>
date Fri, 18 Aug 2017 21:00:05 +0200
parents 0a61213afdd2
children b3e39486880a
comparison
equal deleted inserted replaced
12153:7264e03147e1 12154:71e10b81226d
1371 #endif 1371 #endif
1372 return TRUE; 1372 return TRUE;
1373 } 1373 }
1374 1374
1375 /* 1375 /*
1376 * Write any lines to the input channel. 1376 * Write any buffer lines to the input channel.
1377 */ 1377 */
1378 static void 1378 static void
1379 channel_write_in(channel_T *channel) 1379 channel_write_in(channel_T *channel)
1380 { 1380 {
1381 chanpart_T *in_part = &channel->ch_part[PART_IN]; 1381 chanpart_T *in_part = &channel->ch_part[PART_IN];
1444 } 1444 }
1445 } 1445 }
1446 } 1446 }
1447 1447
1448 /* 1448 /*
1449 * Write any lines waiting to be written to "channel".
1450 */
1451 static void
1452 channel_write_input(channel_T *channel)
1453 {
1454 chanpart_T *in_part = &channel->ch_part[PART_IN];
1455
1456 if (in_part->ch_writeque.wq_next != NULL)
1457 channel_send(channel, PART_IN, (char_u *)"", 0, "channel_write_input");
1458 else if (in_part->ch_bufref.br_buf != NULL)
1459 {
1460 if (in_part->ch_buf_append)
1461 channel_write_new_lines(in_part->ch_bufref.br_buf);
1462 else
1463 channel_write_in(channel);
1464 }
1465 }
1466
1467 /*
1449 * Write any lines waiting to be written to a channel. 1468 * Write any lines waiting to be written to a channel.
1450 */ 1469 */
1451 void 1470 void
1452 channel_write_any_lines(void) 1471 channel_write_any_lines(void)
1453 { 1472 {
1454 channel_T *channel; 1473 channel_T *channel;
1455 1474
1456 for (channel = first_channel; channel != NULL; channel = channel->ch_next) 1475 for (channel = first_channel; channel != NULL; channel = channel->ch_next)
1457 { 1476 channel_write_input(channel);
1458 chanpart_T *in_part = &channel->ch_part[PART_IN];
1459
1460 if (in_part->ch_bufref.br_buf != NULL)
1461 {
1462 if (in_part->ch_buf_append)
1463 channel_write_new_lines(in_part->ch_bufref.br_buf);
1464 else
1465 channel_write_in(channel);
1466 }
1467 }
1468 } 1477 }
1469 1478
1470 /* 1479 /*
1471 * Write appended lines above the last one in "buf" to the channel. 1480 * Write appended lines above the last one in "buf" to the channel.
1472 */ 1481 */
2982 2991
2983 for (ch = first_channel; ch != NULL; ch = ch->ch_next) 2992 for (ch = first_channel; ch != NULL; ch = ch->ch_next)
2984 { 2993 {
2985 chanpart_T *in_part = &ch->ch_part[PART_IN]; 2994 chanpart_T *in_part = &ch->ch_part[PART_IN];
2986 2995
2987 if (in_part->ch_fd != INVALID_FD && in_part->ch_bufref.br_buf != NULL) 2996 if (in_part->ch_fd != INVALID_FD
2997 && (in_part->ch_bufref.br_buf != NULL
2998 || in_part->ch_writeque.wq_next != NULL))
2988 { 2999 {
2989 FD_SET((int)in_part->ch_fd, wfds); 3000 FD_SET((int)in_part->ch_fd, wfds);
2990 if ((int)in_part->ch_fd >= maxfd) 3001 if ((int)in_part->ch_fd >= maxfd)
2991 maxfd = (int)in_part->ch_fd + 1; 3002 maxfd = (int)in_part->ch_fd + 1;
2992 } 3003 }
3528 } 3539 }
3529 } 3540 }
3530 # endif 3541 # endif
3531 3542
3532 /* 3543 /*
3544 * Set "channel"/"part" to non-blocking.
3545 */
3546 void
3547 channel_set_nonblock(channel_T *channel, ch_part_T part)
3548 {
3549 chanpart_T *ch_part = &channel->ch_part[part];
3550 int fd = ch_part->ch_fd;
3551
3552 if (fd != INVALID_FD)
3553 {
3554 #ifdef _WIN32
3555 if (part == PART_SOCK)
3556 {
3557 u_long val = 1;
3558
3559 ioctlsocket(fd, FIONBIO, &val);
3560 }
3561 else
3562 #endif
3563 fcntl(fd, F_SETFL, O_NONBLOCK);
3564 ch_part->ch_nonblocking = TRUE;
3565 }
3566 }
3567
3568 /*
3533 * Write "buf" (NUL terminated string) to "channel"/"part". 3569 * Write "buf" (NUL terminated string) to "channel"/"part".
3534 * When "fun" is not NULL an error message might be given. 3570 * When "fun" is not NULL an error message might be given.
3535 * Return FAIL or OK. 3571 * Return FAIL or OK.
3536 */ 3572 */
3537 int 3573 int
3538 channel_send( 3574 channel_send(
3539 channel_T *channel, 3575 channel_T *channel,
3540 ch_part_T part, 3576 ch_part_T part,
3541 char_u *buf, 3577 char_u *buf_arg,
3542 int len, 3578 int len_arg,
3543 char *fun) 3579 char *fun)
3544 { 3580 {
3545 int res; 3581 int res;
3546 sock_T fd; 3582 sock_T fd;
3547 3583 chanpart_T *ch_part = &channel->ch_part[part];
3548 fd = channel->ch_part[part].ch_fd; 3584 int did_use_queue = FALSE;
3585
3586 fd = ch_part->ch_fd;
3549 if (fd == INVALID_FD) 3587 if (fd == INVALID_FD)
3550 { 3588 {
3551 if (!channel->ch_error && fun != NULL) 3589 if (!channel->ch_error && fun != NULL)
3552 { 3590 {
3553 ch_error(channel, "%s(): write while not connected", fun); 3591 ch_error(channel, "%s(): write while not connected", fun);
3559 3597
3560 if (log_fd != NULL) 3598 if (log_fd != NULL)
3561 { 3599 {
3562 ch_log_lead("SEND ", channel); 3600 ch_log_lead("SEND ", channel);
3563 fprintf(log_fd, "'"); 3601 fprintf(log_fd, "'");
3564 ignored = (int)fwrite(buf, len, 1, log_fd); 3602 ignored = (int)fwrite(buf_arg, len_arg, 1, log_fd);
3565 fprintf(log_fd, "'\n"); 3603 fprintf(log_fd, "'\n");
3566 fflush(log_fd); 3604 fflush(log_fd);
3567 did_log_msg = TRUE; 3605 did_log_msg = TRUE;
3568 } 3606 }
3569 3607
3570 if (part == PART_SOCK) 3608 for (;;)
3571 res = sock_write(fd, (char *)buf, len); 3609 {
3572 else 3610 writeq_T *wq = &ch_part->ch_writeque;
3573 res = fd_write(fd, (char *)buf, len); 3611 char_u *buf;
3574 if (res != len) 3612 int len;
3575 { 3613
3576 if (!channel->ch_error && fun != NULL) 3614 if (wq->wq_next != NULL)
3577 { 3615 {
3578 ch_error(channel, "%s(): write failed", fun); 3616 /* first write what was queued */
3579 EMSG2(_("E631: %s(): write failed"), fun); 3617 buf = wq->wq_next->wq_ga.ga_data;
3580 } 3618 len = wq->wq_next->wq_ga.ga_len;
3581 channel->ch_error = TRUE; 3619 did_use_queue = TRUE;
3582 return FAIL; 3620 }
3583 } 3621 else
3584 3622 {
3585 channel->ch_error = FALSE; 3623 if (len_arg == 0)
3586 return OK; 3624 /* nothing to write, called from channel_select_check() */
3625 return OK;
3626 buf = buf_arg;
3627 len = len_arg;
3628 }
3629
3630 if (part == PART_SOCK)
3631 res = sock_write(fd, (char *)buf, len);
3632 else
3633 res = fd_write(fd, (char *)buf, len);
3634 if (res < 0 && (errno == EWOULDBLOCK
3635 #ifdef EAGAIN
3636 || errno == EAGAIN
3637 #endif
3638 ))
3639 res = 0; /* nothing got written */
3640
3641 if (res >= 0 && ch_part->ch_nonblocking)
3642 {
3643 writeq_T *entry = wq->wq_next;
3644
3645 if (did_use_queue)
3646 ch_log(channel, "Sent %d bytes now", res);
3647 if (res == len)
3648 {
3649 /* Wrote all the buf[len] bytes. */
3650 if (entry != NULL)
3651 {
3652 /* Remove the entry from the write queue. */
3653 ga_clear(&entry->wq_ga);
3654 wq->wq_next = entry->wq_next;
3655 if (wq->wq_next == NULL)
3656 wq->wq_prev = NULL;
3657 else
3658 wq->wq_next->wq_prev = NULL;
3659 continue;
3660 }
3661 if (did_use_queue)
3662 ch_log(channel, "Write queue empty");
3663 }
3664 else
3665 {
3666 /* Wrote only buf[res] bytes, can't write more now. */
3667 if (entry != NULL)
3668 {
3669 if (res > 0)
3670 {
3671 /* Remove the bytes that were written. */
3672 mch_memmove(entry->wq_ga.ga_data,
3673 (char *)entry->wq_ga.ga_data + res,
3674 len - res);
3675 entry->wq_ga.ga_len -= res;
3676 }
3677 buf = buf_arg;
3678 len = len_arg;
3679 }
3680 else
3681 {
3682 buf += res;
3683 len -= res;
3684 }
3685 ch_log(channel, "Adding %d bytes to the write queue", len);
3686
3687 /* Append the not written bytes of the argument to the write
3688 * buffer. Limit entries to 4000 bytes. */
3689 if (wq->wq_prev != NULL
3690 && wq->wq_prev->wq_ga.ga_len + len < 4000)
3691 {
3692 writeq_T *last = wq->wq_prev;
3693
3694 /* append to the last entry */
3695 if (ga_grow(&last->wq_ga, len) == OK)
3696 {
3697 mch_memmove((char *)last->wq_ga.ga_data
3698 + last->wq_ga.ga_len,
3699 buf, len);
3700 last->wq_ga.ga_len += len;
3701 }
3702 }
3703 else
3704 {
3705 writeq_T *last = (writeq_T *)alloc((int)sizeof(writeq_T));
3706
3707 if (last != NULL)
3708 {
3709 ch_log(channel, "Creating new entry");
3710 last->wq_prev = wq->wq_prev;
3711 last->wq_next = NULL;
3712 if (wq->wq_prev == NULL)
3713 wq->wq_next = last;
3714 else
3715 wq->wq_prev->wq_next = last;
3716 wq->wq_prev = last;
3717 ga_init2(&last->wq_ga, 1, 1000);
3718 if (ga_grow(&last->wq_ga, len) == OK)
3719 {
3720 mch_memmove(last->wq_ga.ga_data, buf, len);
3721 last->wq_ga.ga_len = len;
3722 }
3723 }
3724 }
3725 }
3726 }
3727 else if (res != len)
3728 {
3729 if (!channel->ch_error && fun != NULL)
3730 {
3731 ch_error(channel, "%s(): write failed", fun);
3732 EMSG2(_("E631: %s(): write failed"), fun);
3733 }
3734 channel->ch_error = TRUE;
3735 return FAIL;
3736 }
3737
3738 channel->ch_error = FALSE;
3739 return OK;
3740 }
3587 } 3741 }
3588 3742
3589 /* 3743 /*
3590 * Common for "ch_sendexpr()" and "ch_sendraw()". 3744 * Common for "ch_sendexpr()" and "ch_sendraw()".
3591 * Returns the channel if the caller should read the response. 3745 * Returns the channel if the caller should read the response.
3871 4025
3872 in_part = &channel->ch_part[PART_IN]; 4026 in_part = &channel->ch_part[PART_IN];
3873 if (ret > 0 && in_part->ch_fd != INVALID_FD 4027 if (ret > 0 && in_part->ch_fd != INVALID_FD
3874 && FD_ISSET(in_part->ch_fd, wfds)) 4028 && FD_ISSET(in_part->ch_fd, wfds))
3875 { 4029 {
3876 if (in_part->ch_buf_append) 4030 channel_write_input(channel);
3877 {
3878 if (in_part->ch_bufref.br_buf != NULL)
3879 channel_write_new_lines(in_part->ch_bufref.br_buf);
3880 }
3881 else
3882 channel_write_in(channel);
3883 --ret; 4031 --ret;
3884 } 4032 }
3885 } 4033 }
3886 4034
3887 return ret; 4035 return ret;