view src/testdir/thread_util.py @ 29247:5f314b2ed494 v8.2.5142

patch 8.2.5142: startup test fails if there is a status bar Commit: https://github.com/vim/vim/commit/fa04eae5a5b9394079bde2d37ce6f9f8a5567d48 Author: Bram Moolenaar <Bram@vim.org> Date: Tue Jun 21 14:38:40 2022 +0100 patch 8.2.5142: startup test fails if there is a status bar Problem: Startup test fails if there is a status bar at the top of the screen. (Ernie Rael) Solution: Use a larger vertical offset in the test.
author Bram Moolenaar <Bram@vim.org>
date Tue, 21 Jun 2022 18:45:07 +0200
parents 06e3c6bac36d
children
line wrap: on
line source

import platform

if platform.system() == 'Darwin':
    from ctypes import (
        CDLL,
        POINTER,
        Structure,
        byref,
        c_int,
        c_uint,
        c_uint32,
        c_void_p,
        sizeof
    )
    from ctypes.util import find_library

    class ThreadTimeConstraintPolicy(Structure):
        _fields_ = [
            ("period", c_uint32),
            ("computation", c_uint32),
            ("constraint", c_uint32),
            ("preemptible", c_uint)
        ]

    _libc = CDLL(find_library('c'))

    THREAD_TIME_CONSTRAINT_POLICY = c_uint(2)

    THREAD_TIME_CONSTRAINT_POLICY_COUNT = c_uint(
        int(sizeof(ThreadTimeConstraintPolicy) / sizeof(c_int)))

    _libc.pthread_self.restype = c_void_p

    _libc.pthread_mach_thread_np.restype = c_uint
    _libc.pthread_mach_thread_np.argtypes = [c_void_p]

    _libc.thread_policy_get.restype = c_int
    _libc.thread_policy_get.argtypes = [
        c_uint,
        c_uint,
        c_void_p,
        POINTER(c_uint),
        POINTER(c_uint)
    ]

    _libc.thread_policy_set.restype = c_int
    _libc.thread_policy_set.argtypes = [
        c_uint,
        c_uint,
        c_void_p,
        c_uint
    ]

    def _mach_thread_self():
        return _libc.pthread_mach_thread_np(_libc.pthread_self())

    def _get_time_constraint_policy(default=False):
        thread = _mach_thread_self()
        policy_info = ThreadTimeConstraintPolicy()
        policy_infoCnt = THREAD_TIME_CONSTRAINT_POLICY_COUNT
        get_default = c_uint(default)

        kret = _libc.thread_policy_get(
            thread,
            THREAD_TIME_CONSTRAINT_POLICY,
            byref(policy_info),
            byref(policy_infoCnt),
            byref(get_default))
        if kret != 0:
            return None
        return policy_info

    def _set_time_constraint_policy(policy_info):
        thread = _mach_thread_self()
        policy_infoCnt = THREAD_TIME_CONSTRAINT_POLICY_COUNT

        kret = _libc.thread_policy_set(
            thread,
            THREAD_TIME_CONSTRAINT_POLICY,
            byref(policy_info),
            policy_infoCnt)
        if kret != 0:
            raise OSError(kret)

    def set_high_priority():
        policy_info = _get_time_constraint_policy(default=True)
        if not policy_info:
            return
        policy_info.preemptible = c_uint(False)
        _set_time_constraint_policy(policy_info)