comparison src/channel.c @ 8655:1eb302bf2475 v7.4.1617

commit https://github.com/vim/vim/commit/ba61ac0d61f46de7d29c64bb0de6d25c2e378be0 Author: Bram Moolenaar <Bram@vim.org> Date: Sun Mar 20 16:40:37 2016 +0100 patch 7.4.1617 Problem: When a JSON message is split it isn't decoded. Solution: Wait a short time for the rest of the message to arrive.
author Christian Brabandt <cb@256bit.org>
date Sun, 20 Mar 2016 16:45:04 +0100
parents d80edead9675
children c70eea7a7677
comparison
equal deleted inserted replaced
8654:12ff4422343e 8655:1eb302bf2475
1360 /* 1360 /*
1361 * Store "buf[len]" on "channel"/"part". 1361 * Store "buf[len]" on "channel"/"part".
1362 * Returns OK or FAIL. 1362 * Returns OK or FAIL.
1363 */ 1363 */
1364 static int 1364 static int
1365 channel_save(channel_T *channel, int part, char_u *buf, int len) 1365 channel_save(channel_T *channel, int part, char_u *buf, int len, char *lead)
1366 { 1366 {
1367 readq_T *node; 1367 readq_T *node;
1368 readq_T *head = &channel->ch_part[part].ch_head; 1368 readq_T *head = &channel->ch_part[part].ch_head;
1369 char_u *p; 1369 char_u *p;
1370 int i; 1370 int i;
1401 head->rq_next = node; 1401 head->rq_next = node;
1402 else 1402 else
1403 head->rq_prev->rq_next = node; 1403 head->rq_prev->rq_next = node;
1404 head->rq_prev = node; 1404 head->rq_prev = node;
1405 1405
1406 if (log_fd != NULL) 1406 if (log_fd != NULL && lead != NULL)
1407 { 1407 {
1408 ch_log_lead("RECV ", channel); 1408 ch_log_lead(lead, channel);
1409 fprintf(log_fd, "'"); 1409 fprintf(log_fd, "'");
1410 if (fwrite(buf, len, 1, log_fd) != 1) 1410 if (fwrite(buf, len, 1, log_fd) != 1)
1411 return FAIL; 1411 return FAIL;
1412 fprintf(log_fd, "'\n"); 1412 fprintf(log_fd, "'\n");
1413 } 1413 }
1414 return OK; 1414 return OK;
1415 } 1415 }
1416 1416
1417 /* 1417 /*
1418 * Use the read buffer of "channel"/"part" and parse a JSON messages that is 1418 * Use the read buffer of "channel"/"part" and parse a JSON message that is
1419 * complete. The messages are added to the queue. 1419 * complete. The messages are added to the queue.
1420 * Return TRUE if there is more to read. 1420 * Return TRUE if there is more to read.
1421 */ 1421 */
1422 static int 1422 static int
1423 channel_parse_json(channel_T *channel, int part) 1423 channel_parse_json(channel_T *channel, int part)
1424 { 1424 {
1425 js_read_T reader; 1425 js_read_T reader;
1426 typval_T listtv; 1426 typval_T listtv;
1427 jsonq_T *item; 1427 jsonq_T *item;
1428 jsonq_T *head = &channel->ch_part[part].ch_json_head; 1428 chanpart_T *chanpart = &channel->ch_part[part];
1429 jsonq_T *head = &chanpart->ch_json_head;
1430 int status;
1429 int ret; 1431 int ret;
1430 1432
1431 if (channel_peek(channel, part) == NULL) 1433 if (channel_peek(channel, part) == NULL)
1432 return FALSE; 1434 return FALSE;
1433 1435
1436 reader.js_buf = channel_get_all(channel, part); 1438 reader.js_buf = channel_get_all(channel, part);
1437 reader.js_used = 0; 1439 reader.js_used = 0;
1438 reader.js_fill = NULL; 1440 reader.js_fill = NULL;
1439 /* reader.js_fill = channel_fill; */ 1441 /* reader.js_fill = channel_fill; */
1440 reader.js_cookie = channel; 1442 reader.js_cookie = channel;
1441 ret = json_decode(&reader, &listtv, 1443
1442 channel->ch_part[part].ch_mode == MODE_JS ? JSON_JS : 0); 1444 /* When a message is incomplete we wait for a short while for more to
1443 if (ret == OK) 1445 * arrive. After the delay drop the input, otherwise a truncated string
1446 * or list will make us hang. */
1447 status = json_decode(&reader, &listtv,
1448 chanpart->ch_mode == MODE_JS ? JSON_JS : 0);
1449 if (status == OK)
1444 { 1450 {
1445 /* Only accept the response when it is a list with at least two 1451 /* Only accept the response when it is a list with at least two
1446 * items. */ 1452 * items. */
1447 if (listtv.v_type != VAR_LIST || listtv.vval.v_list->lv_len < 2) 1453 if (listtv.v_type != VAR_LIST || listtv.vval.v_list->lv_len < 2)
1448 { 1454 {
1449 /* TODO: give error */ 1455 if (listtv.v_type != VAR_LIST)
1456 ch_error(channel, "Did not receive a list, discarding");
1457 else
1458 ch_errorn(channel, "Expected list with two items, got %d",
1459 listtv.vval.v_list->lv_len);
1450 clear_tv(&listtv); 1460 clear_tv(&listtv);
1451 } 1461 }
1452 else 1462 else
1453 { 1463 {
1454 item = (jsonq_T *)alloc((unsigned)sizeof(jsonq_T)); 1464 item = (jsonq_T *)alloc((unsigned)sizeof(jsonq_T));
1475 } 1485 }
1476 } 1486 }
1477 } 1487 }
1478 } 1488 }
1479 1489
1480 /* Put the unread part back into the channel. 1490 if (status == OK)
1481 * TODO: insert in front */ 1491 chanpart->ch_waiting = FALSE;
1482 if (reader.js_buf[reader.js_used] != NUL) 1492 else if (status == MAYBE)
1483 { 1493 {
1484 if (ret == FAIL) 1494 if (!chanpart->ch_waiting)
1485 { 1495 {
1486 ch_error(channel, "Decoding failed - discarding input"); 1496 /* First time encountering incomplete message, set a deadline of
1487 ret = FALSE; 1497 * 100 msec. */
1498 ch_log(channel, "Incomplete message - wait for more");
1499 reader.js_used = 0;
1500 chanpart->ch_waiting = TRUE;
1501 #ifdef WIN32
1502 chanpart->ch_deadline = GetTickCount() + 100L;
1503 #else
1504 gettimeofday(&chanpart->ch_deadline, NULL);
1505 chanpart->ch_deadline.tv_usec += 100 * 1000;
1506 if (chanpart->ch_deadline.tv_usec > 1000 * 1000)
1507 {
1508 chanpart->ch_deadline.tv_usec -= 1000 * 1000;
1509 ++chanpart->ch_deadline.tv_sec;
1510 }
1511 #endif
1488 } 1512 }
1489 else 1513 else
1490 { 1514 {
1491 channel_save(channel, part, reader.js_buf + reader.js_used, 1515 int timeout;
1492 (int)(reader.js_end - reader.js_buf) - reader.js_used); 1516 #ifdef WIN32
1493 ret = TRUE; 1517 timeout = GetTickCount() > chanpart->ch_deadline;
1494 } 1518 #else
1519 {
1520 struct timeval now_tv;
1521
1522 gettimeofday(&now_tv, NULL);
1523 timeout = now_tv.tv_sec > chanpart->ch_deadline.tv_sec
1524 || (now_tv.tv_sec == chanpart->ch_deadline.tv_sec
1525 && now_tv.tv_usec > chanpart->ch_deadline.tv_usec);
1526 }
1527 #endif
1528 if (timeout)
1529 {
1530 status = FAIL;
1531 chanpart->ch_waiting = FALSE;
1532 }
1533 else
1534 {
1535 reader.js_used = 0;
1536 ch_log(channel, "still waiting on incomplete message");
1537 }
1538 }
1539 }
1540
1541 if (status == FAIL)
1542 {
1543 ch_error(channel, "Decoding failed - discarding input");
1544 ret = FALSE;
1545 chanpart->ch_waiting = FALSE;
1546 }
1547 else if (reader.js_buf[reader.js_used] != NUL)
1548 {
1549 /* Put the unread part back into the channel.
1550 * TODO: insert in front */
1551 channel_save(channel, part, reader.js_buf + reader.js_used,
1552 (int)(reader.js_end - reader.js_buf) - reader.js_used, NULL);
1553 ret = status == MAYBE ? FALSE: TRUE;
1495 } 1554 }
1496 else 1555 else
1497 ret = FALSE; 1556 ret = FALSE;
1498 1557
1499 vim_free(reader.js_buf); 1558 vim_free(reader.js_buf);
1557 || (id <= 0 && (tv->v_type != VAR_NUMBER 1616 || (id <= 0 && (tv->v_type != VAR_NUMBER
1558 || tv->vval.v_number == 0 1617 || tv->vval.v_number == 0
1559 || tv->vval.v_number != channel->ch_part[part].ch_block_id))) 1618 || tv->vval.v_number != channel->ch_part[part].ch_block_id)))
1560 { 1619 {
1561 *rettv = item->jq_value; 1620 *rettv = item->jq_value;
1621 if (tv->v_type == VAR_NUMBER)
1622 ch_logn(channel, "Getting JSON message %d", tv->vval.v_number);
1562 remove_json_node(head, item); 1623 remove_json_node(head, item);
1563 return OK; 1624 return OK;
1564 } 1625 }
1565 item = item->jq_next; 1626 item = item->jq_next;
1566 } 1627 }
2287 len = fd_read(fd, (char *)buf, MAXMSGSIZE); 2348 len = fd_read(fd, (char *)buf, MAXMSGSIZE);
2288 if (len <= 0) 2349 if (len <= 0)
2289 break; /* error or nothing more to read */ 2350 break; /* error or nothing more to read */
2290 2351
2291 /* Store the read message in the queue. */ 2352 /* Store the read message in the queue. */
2292 channel_save(channel, part, buf, len); 2353 channel_save(channel, part, buf, len, "RECV ");
2293 readlen += len; 2354 readlen += len;
2294 if (len < MAXMSGSIZE) 2355 if (len < MAXMSGSIZE)
2295 break; /* did read everything that's available */ 2356 break; /* did read everything that's available */
2296 } 2357 }
2297 2358
2314 * Don't send "DETACH" for a JS or JSON channel. 2375 * Don't send "DETACH" for a JS or JSON channel.
2315 */ 2376 */
2316 if (channel->ch_part[part].ch_mode == MODE_RAW 2377 if (channel->ch_part[part].ch_mode == MODE_RAW
2317 || channel->ch_part[part].ch_mode == MODE_NL) 2378 || channel->ch_part[part].ch_mode == MODE_NL)
2318 channel_save(channel, part, (char_u *)DETACH_MSG_RAW, 2379 channel_save(channel, part, (char_u *)DETACH_MSG_RAW,
2319 (int)STRLEN(DETACH_MSG_RAW)); 2380 (int)STRLEN(DETACH_MSG_RAW), "PUT ");
2320 2381
2321 /* TODO: When reading from stdout is not possible, should we try to 2382 /* TODO: When reading from stdout is not possible, should we try to
2322 * keep stdin and stderr open? Probably not, assume the other side 2383 * keep stdin and stderr open? Probably not, assume the other side
2323 * has died. */ 2384 * has died. */
2324 channel_close(channel, TRUE); 2385 channel_close(channel, TRUE);
2359 break; 2420 break;
2360 if (buf != NULL && channel_collapse(channel, part) == OK) 2421 if (buf != NULL && channel_collapse(channel, part) == OK)
2361 continue; 2422 continue;
2362 2423
2363 /* Wait for up to the channel timeout. */ 2424 /* Wait for up to the channel timeout. */
2364 if (fd == INVALID_FD 2425 if (fd == INVALID_FD)
2365 || channel_wait(channel, fd, timeout) == FAIL)
2366 return NULL; 2426 return NULL;
2427 if (channel_wait(channel, fd, timeout) == FAIL)
2428 {
2429 ch_log(channel, "Timed out");
2430 return NULL;
2431 }
2367 channel_read(channel, part, "channel_read_block"); 2432 channel_read(channel, part, "channel_read_block");
2368 } 2433 }
2369 2434
2370 if (mode == MODE_RAW) 2435 if (mode == MODE_RAW)
2371 { 2436 {
2401 */ 2466 */
2402 int 2467 int
2403 channel_read_json_block( 2468 channel_read_json_block(
2404 channel_T *channel, 2469 channel_T *channel,
2405 int part, 2470 int part,
2406 int timeout, 2471 int timeout_arg,
2407 int id, 2472 int id,
2408 typval_T **rettv) 2473 typval_T **rettv)
2409 { 2474 {
2410 int more; 2475 int more;
2411 sock_T fd; 2476 sock_T fd;
2477 int timeout;
2478 chanpart_T *chanpart = &channel->ch_part[part];
2412 2479
2413 ch_log(channel, "Reading JSON"); 2480 ch_log(channel, "Reading JSON");
2414 if (id != -1) 2481 if (id != -1)
2415 channel->ch_part[part].ch_block_id = id; 2482 chanpart->ch_block_id = id;
2416 for (;;) 2483 for (;;)
2417 { 2484 {
2418 more = channel_parse_json(channel, part); 2485 more = channel_parse_json(channel, part);
2419 2486
2420 /* search for messsage "id" */ 2487 /* search for messsage "id" */
2421 if (channel_get_json(channel, part, id, rettv) == OK) 2488 if (channel_get_json(channel, part, id, rettv) == OK)
2422 { 2489 {
2423 channel->ch_part[part].ch_block_id = 0; 2490 chanpart->ch_block_id = 0;
2424 return OK; 2491 return OK;
2425 } 2492 }
2426 2493
2427 if (!more) 2494 if (!more)
2428 { 2495 {
2429 /* Handle any other messages in the queue. If done some more 2496 /* Handle any other messages in the queue. If done some more
2430 * messages may have arrived. */ 2497 * messages may have arrived. */
2431 if (channel_parse_messages()) 2498 if (channel_parse_messages())
2432 continue; 2499 continue;
2433 2500
2434 /* Wait for up to the timeout. */ 2501 /* Wait for up to the timeout. If there was an incomplete message
2435 fd = channel->ch_part[part].ch_fd; 2502 * use the deadline for that. */
2503 timeout = timeout_arg;
2504 if (chanpart->ch_waiting)
2505 {
2506 #ifdef WIN32
2507 timeout = chanpart->ch_deadline - GetTickCount() + 1;
2508 #else
2509 {
2510 struct timeval now_tv;
2511
2512 gettimeofday(&now_tv, NULL);
2513 timeout = (chanpart->ch_deadline.tv_sec
2514 - now_tv.tv_sec) * 1000
2515 + (chanpart->ch_deadline.tv_usec
2516 - now_tv.tv_usec) / 1000
2517 + 1;
2518 }
2519 #endif
2520 if (timeout < 0)
2521 {
2522 /* Something went wrong, channel_parse_json() didn't
2523 * discard message. Cancel waiting. */
2524 chanpart->ch_waiting = FALSE;
2525 timeout = timeout_arg;
2526 }
2527 else if (timeout > timeout_arg)
2528 timeout = timeout_arg;
2529 }
2530 fd = chanpart->ch_fd;
2436 if (fd == INVALID_FD || channel_wait(channel, fd, timeout) == FAIL) 2531 if (fd == INVALID_FD || channel_wait(channel, fd, timeout) == FAIL)
2437 break; 2532 {
2438 channel_read(channel, part, "channel_read_json_block"); 2533 if (timeout == timeout_arg)
2439 } 2534 {
2440 } 2535 if (fd != INVALID_FD)
2441 channel->ch_part[part].ch_block_id = 0; 2536 ch_log(channel, "Timed out");
2537 break;
2538 }
2539 }
2540 else
2541 channel_read(channel, part, "channel_read_json_block");
2542 }
2543 }
2544 chanpart->ch_block_id = 0;
2442 return FAIL; 2545 return FAIL;
2443 } 2546 }
2444 2547
2445 /* 2548 /*
2446 * Common for ch_read() and ch_readraw(). 2549 * Common for ch_read() and ch_readraw().