summaryrefslogtreecommitdiff
path: root/src/thread_parker/windows/waitaddress.rs
blob: ef6cb44e3db43316c5f26162c9c627d39222c827 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// Copyright 2016 Amanieu d'Antras
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.

use core::{
    mem,
    sync::atomic::{AtomicUsize, Ordering},
};
use std::{ffi, time::Instant};
use windows_sys::Win32::{
    Foundation::{GetLastError, BOOL, ERROR_TIMEOUT},
    System::{
        LibraryLoader::{GetModuleHandleA, GetProcAddress},
        WindowsProgramming::INFINITE,
    },
};

#[allow(non_snake_case)]
pub struct WaitAddress {
    WaitOnAddress: extern "system" fn(
        Address: *mut ffi::c_void,
        CompareAddress: *mut ffi::c_void,
        AddressSize: usize,
        dwMilliseconds: u32,
    ) -> BOOL,
    WakeByAddressSingle: extern "system" fn(Address: *mut ffi::c_void),
}

impl WaitAddress {
    #[allow(non_snake_case)]
    pub fn create() -> Option<WaitAddress> {
        // MSDN claims that that WaitOnAddress and WakeByAddressSingle are
        // located in kernel32.dll, but they are lying...
        let synch_dll = unsafe { GetModuleHandleA(b"api-ms-win-core-synch-l1-2-0.dll\0".as_ptr()) };
        if synch_dll == 0 {
            return None;
        }

        let WaitOnAddress = unsafe { GetProcAddress(synch_dll, b"WaitOnAddress\0".as_ptr())? };
        let WakeByAddressSingle =
            unsafe { GetProcAddress(synch_dll, b"WakeByAddressSingle\0".as_ptr())? };

        Some(WaitAddress {
            WaitOnAddress: unsafe { mem::transmute(WaitOnAddress) },
            WakeByAddressSingle: unsafe { mem::transmute(WakeByAddressSingle) },
        })
    }

    #[inline]
    pub fn prepare_park(&'static self, key: &AtomicUsize) {
        key.store(1, Ordering::Relaxed);
    }

    #[inline]
    pub fn timed_out(&'static self, key: &AtomicUsize) -> bool {
        key.load(Ordering::Relaxed) != 0
    }

    #[inline]
    pub fn park(&'static self, key: &AtomicUsize) {
        while key.load(Ordering::Acquire) != 0 {
            let r = self.wait_on_address(key, INFINITE);
            debug_assert!(r == true.into());
        }
    }

    #[inline]
    pub fn park_until(&'static self, key: &AtomicUsize, timeout: Instant) -> bool {
        while key.load(Ordering::Acquire) != 0 {
            let now = Instant::now();
            if timeout <= now {
                return false;
            }
            let diff = timeout - now;
            let timeout = diff
                .as_secs()
                .checked_mul(1000)
                .and_then(|x| x.checked_add((diff.subsec_nanos() as u64 + 999999) / 1000000))
                .map(|ms| {
                    if ms > std::u32::MAX as u64 {
                        INFINITE
                    } else {
                        ms as u32
                    }
                })
                .unwrap_or(INFINITE);
            if self.wait_on_address(key, timeout) == false.into() {
                debug_assert_eq!(unsafe { GetLastError() }, ERROR_TIMEOUT);
            }
        }
        true
    }

    #[inline]
    pub fn unpark_lock(&'static self, key: &AtomicUsize) -> UnparkHandle {
        // We don't need to lock anything, just clear the state
        key.store(0, Ordering::Release);

        UnparkHandle {
            key: key,
            waitaddress: self,
        }
    }

    #[inline]
    fn wait_on_address(&'static self, key: &AtomicUsize, timeout: u32) -> BOOL {
        let cmp = 1usize;
        (self.WaitOnAddress)(
            key as *const _ as *mut ffi::c_void,
            &cmp as *const _ as *mut ffi::c_void,
            mem::size_of::<usize>(),
            timeout,
        )
    }
}

// Handle for a thread that is about to be unparked. We need to mark the thread
// as unparked while holding the queue lock, but we delay the actual unparking
// until after the queue lock is released.
pub struct UnparkHandle {
    key: *const AtomicUsize,
    waitaddress: &'static WaitAddress,
}

impl UnparkHandle {
    // Wakes up the parked thread. This should be called after the queue lock is
    // released to avoid blocking the queue for too long.
    #[inline]
    pub fn unpark(self) {
        (self.waitaddress.WakeByAddressSingle)(self.key as *mut ffi::c_void);
    }
}