view src/testdir/thread_util.py @ 28994:644b0f0541de v8.2.5019

patch 8.2.5019: cannot get the first screen column of a character Commit: https://github.com/vim/vim/commit/0f7a3e1de6f71e8e1423fe594890d6aa7f94e132 Author: LemonBoy <thatlemon@gmail.com> Date: Thu May 26 12:10:37 2022 +0100 patch 8.2.5019: cannot get the first screen column of a character Problem: Cannot get the first screen column of a character. Solution: Let virtcol() optionally return a list. (closes https://github.com/vim/vim/issues/10482, closes #7964)
author Bram Moolenaar <Bram@vim.org>
date Thu, 26 May 2022 13:15:04 +0200
parents 06e3c6bac36d
children
line wrap: on
line source

import platform

if platform.system() == 'Darwin':
    from ctypes import (
        CDLL,
        POINTER,
        Structure,
        byref,
        c_int,
        c_uint,
        c_uint32,
        c_void_p,
        sizeof
    )
    from ctypes.util import find_library

    class ThreadTimeConstraintPolicy(Structure):
        _fields_ = [
            ("period", c_uint32),
            ("computation", c_uint32),
            ("constraint", c_uint32),
            ("preemptible", c_uint)
        ]

    _libc = CDLL(find_library('c'))

    THREAD_TIME_CONSTRAINT_POLICY = c_uint(2)

    THREAD_TIME_CONSTRAINT_POLICY_COUNT = c_uint(
        int(sizeof(ThreadTimeConstraintPolicy) / sizeof(c_int)))

    _libc.pthread_self.restype = c_void_p

    _libc.pthread_mach_thread_np.restype = c_uint
    _libc.pthread_mach_thread_np.argtypes = [c_void_p]

    _libc.thread_policy_get.restype = c_int
    _libc.thread_policy_get.argtypes = [
        c_uint,
        c_uint,
        c_void_p,
        POINTER(c_uint),
        POINTER(c_uint)
    ]

    _libc.thread_policy_set.restype = c_int
    _libc.thread_policy_set.argtypes = [
        c_uint,
        c_uint,
        c_void_p,
        c_uint
    ]

    def _mach_thread_self():
        return _libc.pthread_mach_thread_np(_libc.pthread_self())

    def _get_time_constraint_policy(default=False):
        thread = _mach_thread_self()
        policy_info = ThreadTimeConstraintPolicy()
        policy_infoCnt = THREAD_TIME_CONSTRAINT_POLICY_COUNT
        get_default = c_uint(default)

        kret = _libc.thread_policy_get(
            thread,
            THREAD_TIME_CONSTRAINT_POLICY,
            byref(policy_info),
            byref(policy_infoCnt),
            byref(get_default))
        if kret != 0:
            return None
        return policy_info

    def _set_time_constraint_policy(policy_info):
        thread = _mach_thread_self()
        policy_infoCnt = THREAD_TIME_CONSTRAINT_POLICY_COUNT

        kret = _libc.thread_policy_set(
            thread,
            THREAD_TIME_CONSTRAINT_POLICY,
            byref(policy_info),
            policy_infoCnt)
        if kret != 0:
            raise OSError(kret)

    def set_high_priority():
        policy_info = _get_time_constraint_policy(default=True)
        if not policy_info:
            return
        policy_info.preemptible = c_uint(False)
        _set_time_constraint_policy(policy_info)