aboutsummaryrefslogtreecommitdiff
path: root/kotlinx-coroutines-core/native/src/Builders.kt
blob: a49ba6c53d3b47822ae3b3cf0e82fd08874eb9a8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
@file:OptIn(ExperimentalContracts::class, ObsoleteWorkersApi::class)
package kotlinx.coroutines

import kotlinx.cinterop.*
import platform.posix.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.native.concurrent.*

/**
 * Runs new coroutine and **blocks** current thread _interruptibly_ until its completion.
 * This function should not be used from coroutine. It is designed to bridge regular blocking code
 * to libraries that are written in suspending style, to be used in `main` functions and in tests.
 *
 * The default [CoroutineDispatcher] for this builder in an implementation of [EventLoop] that processes continuations
 * in this blocked thread until the completion of this coroutine.
 * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
 *
 * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of
 * the specified dispatcher while the current thread is blocked. If the specified dispatcher implements [EventLoop]
 * interface and this `runBlocking` invocation is performed from inside of the this event loop's thread, then
 * this event loop is processed using its [processNextEvent][EventLoop.processNextEvent] method until coroutine completes.
 *
 * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
 * this `runBlocking` invocation throws [InterruptedException].
 *
 * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
 *
 * @param context context of the coroutine. The default value is an implementation of [EventLoop].
 * @param block the coroutine code.
 */
public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    val contextInterceptor = context[ContinuationInterceptor]
    val eventLoop: EventLoop?
    val newContext: CoroutineContext
    if (contextInterceptor == null) {
        // create or use private event loop if no dispatcher is specified
        eventLoop = ThreadLocalEventLoop.eventLoop
        newContext = GlobalScope.newCoroutineContext(context + eventLoop)
    } else {
        // See if context's interceptor is an event loop that we shall use (to support TestContext)
        // or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
        eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
            ?: ThreadLocalEventLoop.currentOrNull()
        newContext = GlobalScope.newCoroutineContext(context)
    }
    val coroutine = BlockingCoroutine<T>(newContext, eventLoop)
    var completed = false
    ThreadLocalKeepAlive.addCheck { !completed }
    try {
        coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
        return coroutine.joinBlocking()
    } finally {
        completed = true
    }
}

@ThreadLocal
private object ThreadLocalKeepAlive {
    /** If any of these checks passes, this means this [Worker] is still used. */
    private var checks = mutableListOf<() -> Boolean>()

    /** Whether the worker currently tries to keep itself alive. */
    private var keepAliveLoopActive = false

    /** Adds another stopgap that must be passed before the [Worker] can be terminated. */
    fun addCheck(terminationForbidden: () -> Boolean) {
        checks.add(terminationForbidden)
        if (!keepAliveLoopActive) keepAlive()
    }

    /**
     * Send a ping to the worker to prevent it from terminating while this coroutine is running,
     * ensuring that continuations don't get dropped and forgotten.
     */
    private fun keepAlive() {
        // only keep the checks that still forbid the termination
        checks = checks.filter { it() }.toMutableList()
        // if there are no checks left, we no longer keep the worker alive, it can be terminated
        keepAliveLoopActive = checks.isNotEmpty()
        if (keepAliveLoopActive) {
            Worker.current.executeAfter(afterMicroseconds = 100_000) {
                keepAlive()
            }
        }
    }
}

private class BlockingCoroutine<T>(
    parentContext: CoroutineContext,
    private val eventLoop: EventLoop?
) : AbstractCoroutine<T>(parentContext, true, true) {
    private val joinWorker = Worker.current

    override val isScopedCoroutine: Boolean get() = true

    override fun afterCompletion(state: Any?) {
        // wake up blocked thread
        if (joinWorker != Worker.current) {
            // Unpark waiting worker
            joinWorker.executeAfter(0L, {}) // send an empty task to unpark the waiting event loop
        }
    }

    @Suppress("UNCHECKED_CAST")
    fun joinBlocking(): T {
        try {
            eventLoop?.incrementUseCount()
            while (true) {
                var parkNanos: Long
                // Workaround for bug in BE optimizer that cannot eliminate boxing here
                if (eventLoop != null) {
                    parkNanos = eventLoop.processNextEvent()
                } else {
                    parkNanos = Long.MAX_VALUE
                }
                // note: processNextEvent may lose unpark flag, so check if completed before parking
                if (isCompleted) break
                joinWorker.park(parkNanos / 1000L, true)
            }
        } finally { // paranoia
            eventLoop?.decrementUseCount()
        }
        // now return result
        val state = state.unboxState()
        (state as? CompletedExceptionally)?.let { throw it.cause }
        return state as T
    }
}