Mercurial > vim
view src/testdir/test_channel.py @ 7904:14a5de0990a5 v7.4.1248
commit https://github.com/vim/vim/commit/f92591f7f9fc78d2aced99befe444cb423b26df8
Author: Bram Moolenaar <Bram@vim.org>
Date: Wed Feb 3 20:22:32 2016 +0100
patch 7.4.1248
Problem: Can't reliably stop the channel test server. Can't start the
server if the python file is not executable.
Solution: Use "pkill" instead of "killall". Run the python file as an
argument instead of as an executable.
author | Christian Brabandt <cb@256bit.org> |
---|---|
date | Wed, 03 Feb 2016 20:30:04 +0100 |
parents | f12d6235a753 |
children | ea1fd8d750a6 |
line wrap: on
line source
#!/usr/bin/python # # Server that will accept connections from a Vim channel. # Run this server and then in Vim you can open the channel: # :let handle = ch_open('localhost:8765', 'json') # # Then Vim can send requests to the server: # :let response = ch_sendexpr(handle, 'hello!') # # See ":help channel-demo" in Vim. # # This requires Python 2.6 or later. from __future__ import print_function import json import socket import sys import threading try: # Python 3 import socketserver except ImportError: # Python 2 import SocketServer as socketserver thesocket = None class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): def handle(self): print("=== socket opened ===") global thesocket thesocket = self.request while True: try: data = self.request.recv(4096).decode('utf-8') except socket.error: print("=== socket error ===") break except IOError: print("=== socket closed ===") break if data == '': print("=== socket closed ===") break print("received: {}".format(data)) try: decoded = json.loads(data) except ValueError: print("json decoding failed") decoded = [-1, ''] # Send a response if the sequence number is positive. # Negative numbers are used for "eval" responses. if decoded[0] >= 0: if decoded[1] == 'hello!': # simply send back a string response = "got it" elif decoded[1] == 'make change': # Send two ex commands at the same time, before replying to # the request. cmd = '["ex","call append(\\"$\\",\\"added1\\")"]' cmd += '["ex","call append(\\"$\\",\\"added2\\")"]' print("sending: {}".format(cmd)) thesocket.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == '!quit!': # we're done sys.exit(0) else: response = "what?" encoded = json.dumps([decoded[0], response]) print("sending: {}".format(encoded)) thesocket.sendall(encoded.encode('utf-8')) thesocket = None class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): pass if __name__ == "__main__": HOST, PORT = "localhost", 0 server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler) ip, port = server.server_address # Start a thread with the server -- that thread will then start one # more thread for each request server_thread = threading.Thread(target=server.serve_forever) # Exit the server thread when the main thread terminates server_thread.daemon = True server_thread.start() # Write the port number in Xportnr, so that the test knows it. f = open("Xportnr", "w") f.write("{}".format(port)) f.close() # Block here print("Listening on port {}".format(port)) server.serve_forever()