view src/testdir/test_channel_lsp.py @ 28511:d7ca583e5772 v8.2.4780

patch 8.2.4780: parsing an LSP message fails when it is split Commit: https://github.com/vim/vim/commit/03cca297df5210f94be2246cfdb1ee9a30454bea Author: Yegappan Lakshmanan <yegappan@yahoo.com> Date: Mon Apr 18 14:07:46 2022 +0100 patch 8.2.4780: parsing an LSP message fails when it is split Problem: Parsing an LSP message fails when it is split. Solution: Collapse the received data before parsing. (Yegappan Lakshmanan, closes #10215)
author Bram Moolenaar <Bram@vim.org>
date Mon, 18 Apr 2022 15:15:04 +0200
parents 85b07a942518
children 6b1da12297e5
line wrap: on
line source

#!/usr/bin/env python
#
# Server that will accept connections from a Vim channel.
# Used by test_channel.vim to test LSP functionality.
#
# This requires Python 2.6 or later.

from __future__ import print_function
import json
import socket
import sys
import time
import threading

try:
    # Python 3
    import socketserver
except ImportError:
    # Python 2
    import SocketServer as socketserver

class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):

    def setup(self):
        self.request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

    def send_lsp_msg(self, msgid, resp_dict):
        v = {'jsonrpc': '2.0', 'result': resp_dict}
        if msgid != -1:
            v['id'] = msgid
        s = json.dumps(v)
        resp = "Content-Length: " + str(len(s)) + "\r\n"
        resp += "Content-Type: application/vim-jsonrpc; charset=utf-8\r\n"
        resp += "\r\n"
        resp += s
        if self.debug:
            with open("Xlspdebug.log", "a") as myfile:
                myfile.write("\n=> send\n" + resp)
        self.request.sendall(resp.encode('utf-8'))

    def send_wrong_payload(self):
        v = 'wrong-payload'
        s = json.dumps(v)
        resp = "Content-Length: " + str(len(s)) + "\r\n"
        resp += "Content-Type: application/vim-jsonrpc; charset=utf-8\r\n"
        resp += "\r\n"
        resp += s
        self.request.sendall(resp.encode('utf-8'))

    def send_empty_header(self, msgid, resp_dict):
        v = {'jsonrpc': '2.0', 'id': msgid, 'result': resp_dict}
        s = json.dumps(v)
        resp = "\r\n"
        resp += s
        self.request.sendall(resp.encode('utf-8'))

    def send_empty_payload(self):
        resp = "Content-Length: 0\r\n"
        resp += "Content-Type: application/vim-jsonrpc; charset=utf-8\r\n"
        resp += "\r\n"
        self.request.sendall(resp.encode('utf-8'))

    def send_extra_hdr_fields(self, msgid, resp_dict):
        # test for sending extra fields in the http header
        v = {'jsonrpc': '2.0', 'id': msgid, 'result': resp_dict}
        s = json.dumps(v)
        resp = "Host: abc.vim.org\r\n"
        resp += "User-Agent: Python\r\n"
        resp += "Accept-Language: en-US,en\r\n"
        resp += "Content-Type: application/vim-jsonrpc; charset=utf-8\r\n"
        resp += "Content-Length: " + str(len(s)) + "\r\n"
        resp += "\r\n"
        resp += s
        self.request.sendall(resp.encode('utf-8'))

    def send_delayed_payload(self, msgid, resp_dict):
        # test for sending the hdr first and then after some delay, send the
        # payload
        v = {'jsonrpc': '2.0', 'id': msgid, 'result': resp_dict}
        s = json.dumps(v)
        resp = "Content-Length: " + str(len(s)) + "\r\n"
        resp += "\r\n"
        self.request.sendall(resp.encode('utf-8'))
        time.sleep(0.05)
        resp = s
        self.request.sendall(resp.encode('utf-8'))

    def send_hdr_without_len(self, msgid, resp_dict):
        # test for sending the http header without length
        v = {'jsonrpc': '2.0', 'id': msgid, 'result': resp_dict}
        s = json.dumps(v)
        resp = "Content-Type: application/vim-jsonrpc; charset=utf-8\r\n"
        resp += "\r\n"
        resp += s
        self.request.sendall(resp.encode('utf-8'))

    def send_hdr_with_wrong_len(self, msgid, resp_dict):
        # test for sending the http header with wrong length
        v = {'jsonrpc': '2.0', 'id': msgid, 'result': resp_dict}
        s = json.dumps(v)
        resp = "Content-Length: 1000\r\n"
        resp += "\r\n"
        resp += s
        self.request.sendall(resp.encode('utf-8'))

    def send_hdr_with_negative_len(self, msgid, resp_dict):
        # test for sending the http header with negative length
        v = {'jsonrpc': '2.0', 'id': msgid, 'result': resp_dict}
        s = json.dumps(v)
        resp = "Content-Length: -1\r\n"
        resp += "\r\n"
        resp += s
        self.request.sendall(resp.encode('utf-8'))

    def do_ping(self, payload):
        time.sleep(0.2)
        self.send_lsp_msg(payload['id'], 'alive')

    def do_echo(self, payload):
        self.send_lsp_msg(-1, payload)

    def do_simple_rpc(self, payload):
        # test for a simple RPC request
        self.send_lsp_msg(payload['id'], 'simple-rpc')

    def do_rpc_with_notif(self, payload):
        # test for sending a notification before replying to a request message
        self.send_lsp_msg(-1, 'rpc-with-notif-notif')
        # sleep for some time to make sure the notification is delivered
        time.sleep(0.2)
        self.send_lsp_msg(payload['id'], 'rpc-with-notif-resp')

    def do_wrong_payload(self, payload):
        # test for sending a non dict payload
        self.send_wrong_payload()
        time.sleep(0.2)
        self.send_lsp_msg(-1, 'wrong-payload')

    def do_rpc_resp_incorrect_id(self, payload):
        self.send_lsp_msg(-1, 'rpc-resp-incorrect-id-1')
        self.send_lsp_msg(-1, 'rpc-resp-incorrect-id-2')
        self.send_lsp_msg(1, 'rpc-resp-incorrect-id-3')
        time.sleep(0.2)
        self.send_lsp_msg(payload['id'], 'rpc-resp-incorrect-id-4')

    def do_simple_notif(self, payload):
        # notification message test
        self.send_lsp_msg(-1, 'simple-notif')

    def do_multi_notif(self, payload):
        # send multiple notifications
        self.send_lsp_msg(-1, 'multi-notif1')
        self.send_lsp_msg(-1, 'multi-notif2')

    def do_msg_with_id(self, payload):
        self.send_lsp_msg(payload['id'], 'msg-with-id')

    def do_msg_specific_cb(self, payload):
        self.send_lsp_msg(payload['id'], 'msg-specifc-cb')

    def do_server_req(self, payload):
        self.send_lsp_msg(201, {'method': 'checkhealth', 'params': {'a': 20}})

    def do_extra_hdr_fields(self, payload):
        self.send_extra_hdr_fields(payload['id'], 'extra-hdr-fields')

    def do_delayad_payload(self, payload):
        self.send_delayed_payload(payload['id'], 'delayed-payload')

    def do_hdr_without_len(self, payload):
        self.send_hdr_without_len(payload['id'], 'hdr-without-len')

    def do_hdr_with_wrong_len(self, payload):
        self.send_hdr_with_wrong_len(payload['id'], 'hdr-with-wrong-len')

    def do_hdr_with_negative_len(self, payload):
        self.send_hdr_with_negative_len(payload['id'], 'hdr-with-negative-len')

    def do_empty_header(self, payload):
        self.send_empty_header(payload['id'], 'empty-header')

    def do_empty_payload(self, payload):
        self.send_empty_payload()

    def process_msg(self, msg):
        try:
            decoded = json.loads(msg)
            print("Decoded:")
            print(str(decoded))
            if 'method' in decoded:
                test_map = {
                        'ping': self.do_ping,
                        'echo': self.do_echo,
                        'simple-rpc': self.do_simple_rpc,
                        'rpc-with-notif': self.do_rpc_with_notif,
                        'wrong-payload': self.do_wrong_payload,
                        'rpc-resp-incorrect-id': self.do_rpc_resp_incorrect_id,
                        'simple-notif': self.do_simple_notif,
                        'multi-notif': self.do_multi_notif,
                        'msg-with-id': self.do_msg_with_id,
                        'msg-specifc-cb': self.do_msg_specific_cb,
                        'server-req': self.do_server_req,
                        'extra-hdr-fields': self.do_extra_hdr_fields,
                        'delayed-payload': self.do_delayad_payload,
                        'hdr-without-len': self.do_hdr_without_len,
                        'hdr-with-wrong-len': self.do_hdr_with_wrong_len,
                        'hdr-with-negative-len': self.do_hdr_with_negative_len,
                        'empty-header': self.do_empty_header,
                        'empty-payload': self.do_empty_payload
                        }
                if decoded['method'] in test_map:
                    test_map[decoded['method']](decoded)
                else:
                    print("Error: Unsupported method: " + decoded['method'])
            else:
                print("Error: 'method' field is not found")

        except ValueError:
            print("json decoding failed")

    def process_msgs(self, msgbuf):
        while True:
            sidx = msgbuf.find('Content-Length: ')
            if sidx == -1:
                return msgbuf
            sidx += 16
            eidx = msgbuf.find('\r\n')
            if eidx == -1:
                return msgbuf
            msglen = int(msgbuf[sidx:eidx])

            hdrend = msgbuf.find('\r\n\r\n')
            if hdrend == -1:
                return msgbuf

            # Remove the header
            msgbuf = msgbuf[hdrend + 4:]
            payload = msgbuf[:msglen]

            self.process_msg(payload)

            # Remove the processed message
            msgbuf = msgbuf[msglen:]

    def handle(self):
        print("=== socket opened ===")
        self.debug = False
        msgbuf = ''
        while True:
            try:
                received = self.request.recv(4096).decode('utf-8')
            except socket.error:
                print("=== socket error ===")
                break
            except IOError:
                print("=== socket closed ===")
                break
            if received == '':
                print("=== socket closed ===")
                break
            print("\nReceived:\n{0}".format(received))

            # Write the received lines into the file for debugging
            if self.debug:
                with open("Xlspdebug.log", "a") as myfile:
                    myfile.write("\n<= recv\n" + received)

            # Can receive more than one line in a response or a partial line.
            # Accumulate all the received characters and process one line at
            # a time.
            msgbuf += received
            msgbuf = self.process_msgs(msgbuf)

class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    pass

def writePortInFile(port):
    # Write the port number in Xportnr, so that the test knows it.
    f = open("Xportnr", "w")
    f.write("{0}".format(port))
    f.close()

def main(host, port, server_class=ThreadedTCPServer):
    # Wait half a second before opening the port to test waittime in ch_open().
    # We do want to get the port number, get that first.  We cannot open the
    # socket, guess a port is free.
    if len(sys.argv) >= 2 and sys.argv[1] == 'delay':
        port = 13684
        writePortInFile(port)

        print("Wait for it...")
        time.sleep(0.5)

    server = server_class((host, port), ThreadedTCPRequestHandler)
    ip, port = server.server_address[0:2]

    # Start a thread with the server.  That thread will then start a new thread
    # for each connection.
    server_thread = threading.Thread(target=server.serve_forever)
    server_thread.start()

    writePortInFile(port)

    print("Listening on port {0}".format(port))

    # Main thread terminates, but the server continues running
    # until server.shutdown() is called.
    try:
        while server_thread.is_alive():
            server_thread.join(1)
    except (KeyboardInterrupt, SystemExit):
        server.shutdown()

if __name__ == "__main__":
    main("localhost", 0)