view src/testdir/thread_util.py @ 35074:918b5b6ed9a4 default tip

Added tag v9.1.0380 for changeset 80240c27c4ffb28ca15ee5b439d2605e8648b02e
author Christian Brabandt <cb@256bit.org>
date Sun, 28 Apr 2024 16:30:13 +0200
parents 06e3c6bac36d
children
line wrap: on
line source

import platform

if platform.system() == 'Darwin':
    from ctypes import (
        CDLL,
        POINTER,
        Structure,
        byref,
        c_int,
        c_uint,
        c_uint32,
        c_void_p,
        sizeof
    )
    from ctypes.util import find_library

    class ThreadTimeConstraintPolicy(Structure):
        _fields_ = [
            ("period", c_uint32),
            ("computation", c_uint32),
            ("constraint", c_uint32),
            ("preemptible", c_uint)
        ]

    _libc = CDLL(find_library('c'))

    THREAD_TIME_CONSTRAINT_POLICY = c_uint(2)

    THREAD_TIME_CONSTRAINT_POLICY_COUNT = c_uint(
        int(sizeof(ThreadTimeConstraintPolicy) / sizeof(c_int)))

    _libc.pthread_self.restype = c_void_p

    _libc.pthread_mach_thread_np.restype = c_uint
    _libc.pthread_mach_thread_np.argtypes = [c_void_p]

    _libc.thread_policy_get.restype = c_int
    _libc.thread_policy_get.argtypes = [
        c_uint,
        c_uint,
        c_void_p,
        POINTER(c_uint),
        POINTER(c_uint)
    ]

    _libc.thread_policy_set.restype = c_int
    _libc.thread_policy_set.argtypes = [
        c_uint,
        c_uint,
        c_void_p,
        c_uint
    ]

    def _mach_thread_self():
        return _libc.pthread_mach_thread_np(_libc.pthread_self())

    def _get_time_constraint_policy(default=False):
        thread = _mach_thread_self()
        policy_info = ThreadTimeConstraintPolicy()
        policy_infoCnt = THREAD_TIME_CONSTRAINT_POLICY_COUNT
        get_default = c_uint(default)

        kret = _libc.thread_policy_get(
            thread,
            THREAD_TIME_CONSTRAINT_POLICY,
            byref(policy_info),
            byref(policy_infoCnt),
            byref(get_default))
        if kret != 0:
            return None
        return policy_info

    def _set_time_constraint_policy(policy_info):
        thread = _mach_thread_self()
        policy_infoCnt = THREAD_TIME_CONSTRAINT_POLICY_COUNT

        kret = _libc.thread_policy_set(
            thread,
            THREAD_TIME_CONSTRAINT_POLICY,
            byref(policy_info),
            policy_infoCnt)
        if kret != 0:
            raise OSError(kret)

    def set_high_priority():
        policy_info = _get_time_constraint_policy(default=True)
        if not policy_info:
            return
        policy_info.preemptible = c_uint(False)
        _set_time_constraint_policy(policy_info)