view src/testdir/test_channel.py @ 33461:6448d7b2ce20 v9.0.1984

patch 9.0.1984: CI: Test_open_delay*() fails on FreeBSD 14 Commit: https://github.com/vim/vim/commit/85ff0c1912f6d94ed94a6112517e96f64801ba56 Author: Christian Brabandt <cb@256bit.org> Date: Wed Oct 4 21:58:24 2023 +0200 patch 9.0.1984: CI: Test_open_delay*() fails on FreeBSD 14 Problem: CI: Test_open_delay*() fails on FreeBSD 14 Solution: Skip it on BSD Signed-off-by: Christian Brabandt <cb@256bit.org>
author Christian Brabandt <cb@256bit.org>
date Wed, 04 Oct 2023 22:15:03 +0200
parents 8492bbc9f533
children
line wrap: on
line source

#!/usr/bin/env python
#
# Server that will accept connections from a Vim channel.
# Used by test_channel.vim.
#
# This requires Python 2.6 or later.

from __future__ import print_function
import json
import socket
import sys
import time
import threading

try:
    # Python 3
    import socketserver
except ImportError:
    # Python 2
    import SocketServer as socketserver

class TestingRequestHandler(socketserver.BaseRequestHandler):
    def handle(self):
        print("=== socket opened ===")
        while True:
            try:
                received = self.request.recv(4096).decode('utf-8')
            except socket.error:
                print("=== socket error ===")
                break
            except IOError:
                print("=== socket closed ===")
                break
            if received == '':
                print("=== socket closed ===")
                break
            print("received: {0}".format(received))

            # We may receive two messages at once. Take the part up to the
            # newline, which should be after the matching "]".
            todo = received
            while todo != '':
                splitidx = todo.find('\n')
                if splitidx < 0:
                     used = todo
                     todo = ''
                else:
                     used = todo[:splitidx]
                     todo = todo[splitidx + 1:]
                if used != received:
                    print("using: {0}".format(used))

                try:
                    decoded = json.loads(used)
                except ValueError:
                    print("json decoding failed")
                    decoded = [-1, '']

                # Send a response if the sequence number is positive.
                if decoded[0] >= 0:
                    if decoded[1] == 'hello!':
                        # simply send back a string
                        response = "got it"
                    elif decoded[1] == 'malformed1':
                        cmd = '["ex",":"]wrong!["ex","smi"]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                        # Need to wait for Vim to give up, otherwise it
                        # sometimes fails on OS X.
                        time.sleep(0.2)
                    elif decoded[1] == 'malformed2':
                        cmd = '"unterminated string'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                        # Need to wait for Vim to give up, otherwise the double
                        # quote in the "ok" response terminates the string.
                        time.sleep(0.2)
                    elif decoded[1] == 'malformed3':
                        cmd = '["ex","missing ]"'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                        # Need to wait for Vim to give up, otherwise the ]
                        # in the "ok" response terminates the list.
                        time.sleep(0.2)
                    elif decoded[1] == 'split':
                        cmd = '["ex","let '
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        time.sleep(0.01)
                        cmd = 'g:split = 123"]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                    elif decoded[1].startswith("echo "):
                        # send back the argument
                        response = decoded[1][5:]
                    elif decoded[1] == 'make change':
                        # Send two ex commands at the same time, before
                        # replying to the request.
                        cmd = '["ex","call append(\\"$\\",\\"added1\\")"]'
                        cmd += '["ex","call append(\\"$\\",\\"added2\\")"]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                    elif decoded[1] == 'echoerr':
                        cmd = '["ex","echoerr \\\"this is an error\\\""]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                        # Wait a bit, so that the "ex" command is handled
                        # before the "ch_evalexpr() returns.  Otherwise we are
                        # outside the try/catch when the "ex" command is
                        # handled.
                        time.sleep(0.02)
                    elif decoded[1] == 'bad command':
                        cmd = '["ex","foo bar"]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                    elif decoded[1] == 'do normal':
                        # Send a normal command.
                        cmd = '["normal","G$s more\u001b"]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                    elif decoded[1] == 'eval-works':
                        # Send an eval request.  We ignore the response.
                        cmd = '["expr","\\"foo\\" . 123", -1]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                    elif decoded[1] == 'eval-special':
                        # Send an eval request.  We ignore the response.
                        cmd = '["expr","\\"foo\x7f\x10\x01bar\\"", -2]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                    elif decoded[1] == 'eval-getline':
                        # Send an eval request.  We ignore the response.
                        cmd = '["expr","getline(3)", -3]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                    elif decoded[1] == 'eval-fails':
                        # Send an eval request that will fail.
                        cmd = '["expr","xxx", -4]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                    elif decoded[1] == 'eval-error':
                        # Send an eval request that works but the result can't
                        # be encoded.
                        cmd = '["expr","function(\\"tr\\")", -5]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                    elif decoded[1] == 'eval-bad':
                        # Send an eval request missing the third argument.
                        cmd = '["expr","xxx"]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                    elif decoded[1] == 'an expr':
                        # Send an expr request.
                        cmd = '["expr","setline(\\"$\\", [\\"one\\",\\"two\\",\\"three\\"])"]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                    elif decoded[1] == 'call-func':
                        cmd = '["call","MyFunction",[1,2,3], 0]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                    elif decoded[1] == 'redraw':
                        cmd = '["redraw",""]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                    elif decoded[1] == 'redraw!':
                        cmd = '["redraw","force"]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                    elif decoded[1] == 'empty-request':
                        cmd = '[]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                    elif decoded[1] == 'eval-result':
                        # Send back the last received eval result.
                        response = last_eval
                    elif decoded[1] == 'call me':
                        cmd = '[0,"we called you"]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                    elif decoded[1] == 'call me again':
                        cmd = '[0,"we did call you"]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = ""
                    elif decoded[1] == 'send zero':
                        cmd = '[0,"zero index"]'
                        print("sending: {0}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "sent zero"
                    elif decoded[1] == 'close me':
                        print("closing")
                        self.request.close()
                        response = ""
                    elif decoded[1] == 'wait a bit':
                        time.sleep(0.2)
                        response = "waited"
                    elif decoded[1] == '!quit!':
                        # we're done
                        self.server.shutdown()
                        return
                    elif decoded[1] == '!crash!':
                        # Crash!
                        42 / 0
                    else:
                        response = "what?"

                    if response == "":
                        print("no response")
                    else:
                        encoded = json.dumps([decoded[0], response])
                        print("sending: {0}".format(encoded))
                        self.request.sendall(encoded.encode('utf-8'))

                # Negative numbers are used for "eval" responses.
                elif decoded[0] < 0:
                    last_eval = decoded

class ThreadedTCPRequestHandler(TestingRequestHandler):
    def setup(self):
        self.request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    pass

def writePortInFile(port):
    # Write the port number in Xportnr, so that the test knows it.
    f = open("Xportnr", "w")
    f.write("{0}".format(port))
    f.close()

def main(host, port, server_class=ThreadedTCPServer):
    # Wait half a second before opening the port to test waittime in ch_open().
    # We do want to get the port number, get that first.  We cannot open the
    # socket, guess a port is free.
    if len(sys.argv) >= 2 and sys.argv[1] == 'delay':
        port = 13684
        writePortInFile(port)

        print("Wait for it...")
        time.sleep(0.5)

    addrs = socket.getaddrinfo(host, port, 0, 0, socket.IPPROTO_TCP)
    # Each addr is a (family, type, proto, canonname, sockaddr) tuple
    sockaddr = addrs[0][4]
    server_class.address_family = addrs[0][0]

    server = server_class(sockaddr[0:2], ThreadedTCPRequestHandler)
    ip, port = server.server_address[0:2]

    # Start a thread with the server.  That thread will then start a new thread
    # for each connection.
    server_thread = threading.Thread(target=server.serve_forever)
    server_thread.start()

    writePortInFile(port)

    print("Listening on port {0}".format(port))

    # Main thread terminates, but the server continues running
    # until server.shutdown() is called.
    try:
        while server_thread.is_alive():
            server_thread.join(1)
    except (KeyboardInterrupt, SystemExit):
        server.shutdown()

if __name__ == "__main__":
    main("localhost", 0)