Mercurial > vim
view src/testdir/test_channel.py @ 19812:7cde7ea94dd3
Added tag v8.2.0462 for changeset d1d840cfd135d18f52fb7833ba130289825f1f2a
author | Bram Moolenaar <Bram@vim.org> |
---|---|
date | Fri, 27 Mar 2020 20:30:05 +0100 |
parents | 33a2277b8d4d |
children | e373843e2980 |
line wrap: on
line source
#!/usr/bin/python # # Server that will accept connections from a Vim channel. # Used by test_channel.vim. # # This requires Python 2.6 or later. from __future__ import print_function import json import socket import sys import time import threading try: # Python 3 import socketserver except ImportError: # Python 2 import SocketServer as socketserver class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): def handle(self): print("=== socket opened ===") while True: try: received = self.request.recv(4096).decode('utf-8') except socket.error: print("=== socket error ===") break except IOError: print("=== socket closed ===") break if received == '': print("=== socket closed ===") break print("received: {0}".format(received)) # We may receive two messages at once. Take the part up to the # newline, which should be after the matching "]". todo = received while todo != '': splitidx = todo.find('\n') if splitidx < 0: used = todo todo = '' else: used = todo[:splitidx] todo = todo[splitidx + 1:] if used != received: print("using: {0}".format(used)) try: decoded = json.loads(used) except ValueError: print("json decoding failed") decoded = [-1, ''] # Send a response if the sequence number is positive. if decoded[0] >= 0: if decoded[1] == 'hello!': # simply send back a string response = "got it" elif decoded[1] == 'malformed1': cmd = '["ex",":"]wrong!["ex","smi"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" # Need to wait for Vim to give up, otherwise it # sometimes fails on OS X. time.sleep(0.2) elif decoded[1] == 'malformed2': cmd = '"unterminated string' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" # Need to wait for Vim to give up, otherwise the double # quote in the "ok" response terminates the string. time.sleep(0.2) elif decoded[1] == 'malformed3': cmd = '["ex","missing ]"' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" # Need to wait for Vim to give up, otherwise the ] # in the "ok" response terminates the list. time.sleep(0.2) elif decoded[1] == 'split': cmd = '["ex","let ' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) time.sleep(0.01) cmd = 'g:split = 123"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1].startswith("echo "): # send back the argument response = decoded[1][5:] elif decoded[1] == 'make change': # Send two ex commands at the same time, before # replying to the request. cmd = '["ex","call append(\\"$\\",\\"added1\\")"]' cmd += '["ex","call append(\\"$\\",\\"added2\\")"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'bad command': cmd = '["ex","foo bar"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'do normal': # Send a normal command. cmd = '["normal","G$s more\u001b"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'eval-works': # Send an eval request. We ignore the response. cmd = '["expr","\\"foo\\" . 123", -1]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'eval-special': # Send an eval request. We ignore the response. cmd = '["expr","\\"foo\x7f\x10\x01bar\\"", -2]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'eval-getline': # Send an eval request. We ignore the response. cmd = '["expr","getline(3)", -3]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'eval-fails': # Send an eval request that will fail. cmd = '["expr","xxx", -4]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'eval-error': # Send an eval request that works but the result can't # be encoded. cmd = '["expr","function(\\"tr\\")", -5]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'eval-bad': # Send an eval request missing the third argument. cmd = '["expr","xxx"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'an expr': # Send an expr request. cmd = '["expr","setline(\\"$\\", [\\"one\\",\\"two\\",\\"three\\"])"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'call-func': cmd = '["call","MyFunction",[1,2,3], 0]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'redraw': cmd = '["redraw",""]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'redraw!': cmd = '["redraw","force"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'empty-request': cmd = '[]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'eval-result': # Send back the last received eval result. response = last_eval elif decoded[1] == 'call me': cmd = '[0,"we called you"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'call me again': cmd = '[0,"we did call you"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "" elif decoded[1] == 'send zero': cmd = '[0,"zero index"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "sent zero" elif decoded[1] == 'close me': print("closing") self.request.close() response = "" elif decoded[1] == 'wait a bit': time.sleep(0.2) response = "waited" elif decoded[1] == '!quit!': # we're done self.server.shutdown() return elif decoded[1] == '!crash!': # Crash! 42 / 0 else: response = "what?" if response == "": print("no response") else: encoded = json.dumps([decoded[0], response]) print("sending: {0}".format(encoded)) self.request.sendall(encoded.encode('utf-8')) # Negative numbers are used for "eval" responses. elif decoded[0] < 0: last_eval = decoded class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): pass def writePortInFile(port): # Write the port number in Xportnr, so that the test knows it. f = open("Xportnr", "w") f.write("{0}".format(port)) f.close() if __name__ == "__main__": HOST, PORT = "localhost", 0 # Wait half a second before opening the port to test waittime in ch_open(). # We do want to get the port number, get that first. We cannot open the # socket, guess a port is free. if len(sys.argv) >= 2 and sys.argv[1] == 'delay': PORT = 13684 writePortInFile(PORT) print("Wait for it...") time.sleep(0.5) server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler) ip, port = server.server_address # Start a thread with the server. That thread will then start a new thread # for each connection. server_thread = threading.Thread(target=server.serve_forever) server_thread.start() writePortInFile(port) print("Listening on port {0}".format(port)) # Main thread terminates, but the server continues running # until server.shutdown() is called. try: while server_thread.isAlive(): server_thread.join(1) except (KeyboardInterrupt, SystemExit): server.shutdown()