aboutsummaryrefslogtreecommitdiff
path: root/kotlinx-coroutines-core/common/src/JobSupport.kt
blob: 5b516ae27fc65a273c37e2dc37ed209c0846ff4e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
/*
 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */
@file:Suppress("DEPRECATION_ERROR")

package kotlinx.coroutines

import kotlinx.atomicfu.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.js.*
import kotlin.jvm.*
import kotlin.native.concurrent.*

/**
 * A concrete implementation of [Job]. It is optionally a child to a parent job.
 *
 * This is an open class designed for extension by more specific classes that might augment the
 * state and mare store addition state information for completed jobs, like their result values.
 *
 * @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
 * @suppress **This is unstable API and it is subject to change.**
 */
@Deprecated(level = DeprecationLevel.ERROR, message = "This is internal API and may be removed in the future releases")
public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
    final override val key: CoroutineContext.Key<*> get() = Job

    /*
       === Internal states ===

       name       state class              public state  description
       ------     ------------             ------------  -----------
       EMPTY_N    EmptyNew               : New           no listeners
       EMPTY_A    EmptyActive            : Active        no listeners
       SINGLE     JobNode                : Active        a single listener
       SINGLE+    JobNode                : Active        a single listener + NodeList added as its next
       LIST_N     InactiveNodeList       : New           a list of listeners (promoted once, does not got back to EmptyNew)
       LIST_A     NodeList               : Active        a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
       COMPLETING Finishing              : Completing    has a list of listeners (promoted once from LIST_*)
       CANCELLING Finishing              : Cancelling    -- " --
       FINAL_C    Cancelled              : Cancelled     Cancelled (final state)
       FINAL_R    <any>                  : Completed     produced some result

       === Transitions ===

           New states      Active states       Inactive states

          +---------+       +---------+                          }
          | EMPTY_N | ----> | EMPTY_A | ----+                    } Empty states
          +---------+       +---------+     |                    }
               |  |           |     ^       |    +----------+
               |  |           |     |       +--> |  FINAL_* |
               |  |           V     |       |    +----------+
               |  |         +---------+     |                    }
               |  |         | SINGLE  | ----+                    } JobNode states
               |  |         +---------+     |                    }
               |  |              |          |                    }
               |  |              V          |                    }
               |  |         +---------+     |                    }
               |  +-------> | SINGLE+ | ----+                    }
               |            +---------+     |                    }
               |                 |          |
               V                 V          |
          +---------+       +---------+     |                    }
          | LIST_N  | ----> | LIST_A  | ----+                    } [Inactive]NodeList states
          +---------+       +---------+     |                    }
             |   |             |   |        |
             |   |    +--------+   |        |
             |   |    |            V        |
             |   |    |    +------------+   |   +------------+   }
             |   +-------> | COMPLETING | --+-- | CANCELLING |   } Finishing states
             |        |    +------------+       +------------+   }
             |        |         |                    ^
             |        |         |                    |
             +--------+---------+--------------------+


       This state machine and its transition matrix are optimized for the common case when a job is created in active
       state (EMPTY_A), at most one completion listener is added to it during its life-time, and it completes
       successfully without children (in this case it directly goes from EMPTY_A or SINGLE state to FINAL_R
       state without going to COMPLETING state)

       Note that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`

       ---------- TIMELINE of state changes and notification in Job lifecycle ----------

       | The longest possible chain of events in shown, shorter versions cut-through intermediate states,
       |  while still performing all the notifications in this order.

         + Job object is created
       ## NEW: state == EMPTY_NEW | is InactiveNodeList
         + initParentJob / initParentJobInternal (invokes attachChild on its parent, initializes parentHandle)
         ~ waits for start
         >> start / join / await invoked
       ## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList
         + onStartInternal / onStart (lazy coroutine is started)
         ~ active coroutine is working (or scheduled to execution)
         >> childCancelled / cancelImpl invoked
       ## CANCELLING: state is Finishing, state.rootCause != null
        ------ cancelling listeners are not admitted anymore, invokeOnCompletion(onCancelling=true) returns NonDisposableHandle
        ------ new children get immediately cancelled, but are still admitted to the list
         + onCancelling
         + notifyCancelling (invoke all cancelling listeners -- cancel all children, suspended functions resume with exception)
         + cancelParent (rootCause of cancellation is communicated to the parent, parent is cancelled, too)
         ~ waits for completion of coroutine body
         >> makeCompleting / makeCompletingOnce invoked
       ## COMPLETING: state is Finishing, state.isCompleting == true
        ------ new children are not admitted anymore, attachChild returns NonDisposableHandle
         ~ waits for children
         >> last child completes
         - computes the final exception
       ## SEALED: state is Finishing, state.isSealed == true
        ------ cancel/childCancelled returns false (cannot handle exceptions anymore)
         + cancelParent (final exception is communicated to the parent, parent incorporates it)
         + handleJobException ("launch" StandaloneCoroutine invokes CoroutineExceptionHandler)
       ## COMPLETE: state !is Incomplete (CompletedExceptionally | Cancelled)
        ------ completion listeners are not admitted anymore, invokeOnCompletion returns NonDisposableHandle
         + parentHandle.dispose
         + notifyCompletion (invoke all completion listeners)
         + onCompletionInternal / onCompleted / onCancelled

       ---------------------------------------------------------------------------------
     */

    // Note: use shared objects while we have no listeners
    private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)

    private val _parentHandle = atomic<ChildHandle?>(null)
    internal var parentHandle: ChildHandle?
        get() = _parentHandle.value
        set(value) { _parentHandle.value = value }

    // ------------ initialization ------------

    /**
     * Initializes parent job.
     * It shall be invoked at most once after construction after all other initialization.
     */
    internal fun initParentJobInternal(parent: Job?) {
        assert { parentHandle == null }
        if (parent == null) {
            parentHandle = NonDisposableHandle
            return
        }
        parent.start() // make sure the parent is started
        @Suppress("DEPRECATION")
        val handle = parent.attachChild(this)
        parentHandle = handle
        // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
        if (isCompleted) {
            handle.dispose()
            parentHandle = NonDisposableHandle // release it just in case, to aid GC
        }
    }

    // ------------ state query ------------
    /**
     * Returns current state of this job.
     * If final state of the job is [Incomplete], then it is boxed into [IncompleteStateBox]
     * and should be [unboxed][unboxState] before returning to user code.
     */
    internal val state: Any? get() {
        _state.loop { state -> // helper loop on state (complete in-progress atomic operations)
            if (state !is OpDescriptor) return state
            state.perform(this)
        }
    }

    /**
     * @suppress **This is unstable API and it is subject to change.**
     */
    private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
        while (true) {
            block(state)
        }
    }

    public override val isActive: Boolean get() {
        val state = this.state
        return state is Incomplete && state.isActive
    }

    public final override val isCompleted: Boolean get() = state !is Incomplete

    public final override val isCancelled: Boolean get() {
        val state = this.state
        return state is CompletedExceptionally || (state is Finishing && state.isCancelling)
    }

    // ------------ state update ------------

    // Finalizes Finishing -> Completed (terminal state) transition.
    // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
    // Returns final state that was created and updated to
    private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? {
        /*
         * Note: proposed state can be Incomplete, e.g.
         * async {
         *     something.invokeOnCompletion {} // <- returns handle which implements Incomplete under the hood
         * }
         */
        assert { this.state === state } // consistency check -- it cannot change
        assert { !state.isSealed } // consistency check -- cannot be sealed yet
        assert { state.isCompleting } // consistency check -- must be marked as completing
        val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
        // Create the final exception and seal the state so that no more exceptions can be added
        var wasCancelling = false // KLUDGE: we cannot have contract for our own expect fun synchronized
        val finalException = synchronized(state) {
            wasCancelling = state.isCancelling
            val exceptions = state.sealLocked(proposedException)
            val finalCause = getFinalRootCause(state, exceptions)
            if (finalCause != null) addSuppressedExceptions(finalCause, exceptions)
            finalCause
        }
        // Create the final state object
        val finalState = when {
            // was not cancelled (no exception) -> use proposed update value
            finalException == null -> proposedUpdate
            // small optimization when we can used proposeUpdate object as is on cancellation
            finalException === proposedException -> proposedUpdate
            // cancelled job final state
            else -> CompletedExceptionally(finalException)
        }
        // Now handle the final exception
        if (finalException != null) {
            val handled = cancelParent(finalException) || handleJobException(finalException)
            if (handled) (finalState as CompletedExceptionally).makeHandled()
        }
        // Process state updates for the final state before the state of the Job is actually set to the final state
        // to avoid races where outside observer may see the job in the final state, yet exception is not handled yet.
        if (!wasCancelling) onCancelling(finalException)
        onCompletionInternal(finalState)
        // Then CAS to completed state -> it must succeed
        val casSuccess = _state.compareAndSet(state, finalState.boxIncomplete())
        assert { casSuccess }
        // And process all post-completion actions
        completeStateFinalization(state, finalState)
        return finalState
    }

    private fun getFinalRootCause(state: Finishing, exceptions: List<Throwable>): Throwable? {
        // A case of no exceptions
        if (exceptions.isEmpty()) {
            // materialize cancellation exception if it was not materialized yet
            if (state.isCancelling) return defaultCancellationException()
            return null
        }
        /*
         * 1) If we have non-CE, use it as root cause
         * 2) If our original cause was TCE, use *non-original* TCE because of the special nature of TCE
         *    * It is a CE, so it's not reported by children
         *    * The first instance (cancellation cause) is created by timeout coroutine and has no meaningful stacktrace
         *    * The potential second instance is thrown by withTimeout lexical block itself, then it has recovered stacktrace
         * 3) Just return the very first CE
         */
        val firstNonCancellation = exceptions.firstOrNull { it !is CancellationException }
        if (firstNonCancellation != null) return firstNonCancellation
        val first = exceptions[0]
        if (first is TimeoutCancellationException) {
            val detailedTimeoutException = exceptions.firstOrNull { it !== first && it is TimeoutCancellationException }
            if (detailedTimeoutException != null) return detailedTimeoutException
        }
        return first
    }

    private fun addSuppressedExceptions(rootCause: Throwable, exceptions: List<Throwable>) {
        if (exceptions.size <= 1) return // nothing more to do here
        val seenExceptions = identitySet<Throwable>(exceptions.size)
        /*
         * Note that root cause may be a recovered exception as well.
         * To avoid cycles we unwrap the root cause and check for self-suppression against unwrapped cause,
         * but add suppressed exceptions to the recovered root cause (as it is our final exception)
         */
        val unwrappedCause = unwrap(rootCause)
        for (exception in exceptions) {
            val unwrapped = unwrap(exception)
            if (unwrapped !== rootCause && unwrapped !== unwrappedCause &&
                unwrapped !is CancellationException && seenExceptions.add(unwrapped)) {
                rootCause.addSuppressedThrowable(unwrapped)
            }
        }
    }

    // fast-path method to finalize normally completed coroutines without children
    // returns true if complete, and afterCompletion(update) shall be called
    private fun tryFinalizeSimpleState(state: Incomplete, update: Any?): Boolean {
        assert { state is Empty || state is JobNode } // only simple state without lists where children can concurrently add
        assert { update !is CompletedExceptionally } // only for normal completion
        if (!_state.compareAndSet(state, update.boxIncomplete())) return false
        onCancelling(null) // simple state is not a failure
        onCompletionInternal(update)
        completeStateFinalization(state, update)
        return true
    }

    // suppressed == true when any exceptions were suppressed while building the final completion cause
    private fun completeStateFinalization(state: Incomplete, update: Any?) {
        /*
         * Now the job in THE FINAL state. We need to properly handle the resulting state.
         * Order of various invocations here is important.
         *
         * 1) Unregister from parent job.
         */
        parentHandle?.let {
            it.dispose() // volatile read parentHandle _after_ state was updated
            parentHandle = NonDisposableHandle // release it just in case, to aid GC
        }
        val cause = (update as? CompletedExceptionally)?.cause
        /*
         * 2) Invoke completion handlers: .join(), callbacks etc.
         *    It's important to invoke them only AFTER exception handling and everything else, see #208
         */
        if (state is JobNode) { // SINGLE/SINGLE+ state -- one completion handler (common case)
            try {
                state.invoke(cause)
            } catch (ex: Throwable) {
                handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this", ex))
            }
        } else {
            state.list?.notifyCompletion(cause)
        }
    }

    private fun notifyCancelling(list: NodeList, cause: Throwable) {
        // first cancel our own children
        onCancelling(cause)
        notifyHandlers<JobCancellingNode>(list, cause)
        // then cancel parent
        cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
    }

    /**
     * The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent.
     * Returns `true` if the parent is responsible for handling the exception, `false` otherwise.
     *
     * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
     * may leak to the [CoroutineExceptionHandler].
     */
    private fun cancelParent(cause: Throwable): Boolean {
        // Is scoped coroutine -- don't propagate, will be rethrown
        if (isScopedCoroutine) return true

        /* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
         * This allow parent to cancel its children (normally) without being cancelled itself, unless
         * child crashes and produce some other exception during its completion.
         */
        val isCancellation = cause is CancellationException
        val parent = parentHandle
        // No parent -- ignore CE, report other exceptions.
        if (parent === null || parent === NonDisposableHandle) {
            return isCancellation
        }

        // Notify parent but don't forget to check cancellation
        return parent.childCancelled(cause) || isCancellation
    }

    private fun NodeList.notifyCompletion(cause: Throwable?) =
        notifyHandlers<JobNode>(this, cause)

    private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) {
        var exception: Throwable? = null
        list.forEach<T> { node ->
            try {
                node.invoke(cause)
            } catch (ex: Throwable) {
                exception?.apply { addSuppressedThrowable(ex) } ?: run {
                    exception =  CompletionHandlerException("Exception in completion handler $node for $this", ex)
                }
            }
        }
        exception?.let { handleOnCompletionException(it) }
    }

    public final override fun start(): Boolean {
        loopOnState { state ->
            when (startInternal(state)) {
                FALSE -> return false
                TRUE -> return true
            }
        }
    }

    // returns: RETRY/FALSE/TRUE:
    //   FALSE when not new,
    //   TRUE  when started
    //   RETRY when need to retry
    private fun startInternal(state: Any?): Int {
        when (state) {
            is Empty -> { // EMPTY_X state -- no completion handlers
                if (state.isActive) return FALSE // already active
                if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY
                onStartInternal()
                return TRUE
            }
            is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
                if (!_state.compareAndSet(state, state.list)) return RETRY
                onStartInternal()
                return TRUE
            }
            else -> return FALSE // not a new state
        }
    }

    /**
     * Override to provide the actual [start] action.
     * This function is invoked exactly once when non-active coroutine is [started][start].
     */
    internal open fun onStartInternal() {}

    public final override fun getCancellationException(): CancellationException =
        when (val state = this.state) {
            is Finishing -> state.rootCause?.toCancellationException("$classSimpleName is cancelling")
                ?: error("Job is still new or active: $this")
            is Incomplete -> error("Job is still new or active: $this")
            is CompletedExceptionally -> state.cause.toCancellationException()
            else -> JobCancellationException("$classSimpleName has completed normally", null, this)
        }

    protected fun Throwable.toCancellationException(message: String? = null): CancellationException =
        this as? CancellationException ?: defaultCancellationException(message, this)

    /**
     * Returns the cause that signals the completion of this job -- it returns the original
     * [cancel] cause, [CancellationException] or **`null` if this job had completed normally**.
     * This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
     * is being cancelled yet.
     */
    protected val completionCause: Throwable?
        get() = when (val state = state) {
            is Finishing -> state.rootCause
                ?: error("Job is still new or active: $this")
            is Incomplete -> error("Job is still new or active: $this")
            is CompletedExceptionally -> state.cause
            else -> null
        }

    /**
     * Returns `true` when [completionCause] exception was handled by parent coroutine.
     */
    protected val completionCauseHandled: Boolean
        get() = state.let { it is CompletedExceptionally && it.handled }

    @Suppress("OverridingDeprecatedMember")
    public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
        invokeOnCompletion(onCancelling = false, invokeImmediately = true, handler = handler)

    public final override fun invokeOnCompletion(
        onCancelling: Boolean,
        invokeImmediately: Boolean,
        handler: CompletionHandler
    ): DisposableHandle {
        // Create node upfront -- for common cases it just initializes JobNode.job field,
        // for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok.
        val node: JobNode = makeNode(handler, onCancelling)
        loopOnState { state ->
            when (state) {
                is Empty -> { // EMPTY_X state -- no completion handlers
                    if (state.isActive) {
                        // try move to SINGLE state
                        if (_state.compareAndSet(state, node)) return node
                    } else
                        promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
                }
                is Incomplete -> {
                    val list = state.list
                    if (list == null) { // SINGLE/SINGLE+
                        promoteSingleToNodeList(state as JobNode)
                    } else {
                        var rootCause: Throwable? = null
                        var handle: DisposableHandle = NonDisposableHandle
                        if (onCancelling && state is Finishing) {
                            synchronized(state) {
                                // check if we are installing cancellation handler on job that is being cancelled
                                rootCause = state.rootCause // != null if cancelling job
                                // We add node to the list in two cases --- either the job is not being cancelled
                                // or we are adding a child to a coroutine that is not completing yet
                                if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
                                    // Note: add node the list while holding lock on state (make sure it cannot change)
                                    if (!addLastAtomic(state, list, node)) return@loopOnState // retry
                                    // just return node if we don't have to invoke handler (not cancelling yet)
                                    if (rootCause == null) return node
                                    // otherwise handler is invoked immediately out of the synchronized section & handle returned
                                    handle = node
                                }
                            }
                        }
                        if (rootCause != null) {
                            // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
                            if (invokeImmediately) handler.invokeIt(rootCause)
                            return handle
                        } else {
                            if (addLastAtomic(state, list, node)) return node
                        }
                    }
                }
                else -> { // is complete
                    // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
                    // because we play type tricks on Kotlin/JS and handler is not necessarily a function there
                    if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
                    return NonDisposableHandle
                }
            }
        }
    }

    private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode {
        val node = if (onCancelling) {
            (handler as? JobCancellingNode)
                ?: InvokeOnCancelling(handler)
        } else {
            (handler as? JobNode)
                ?.also { assert { it !is JobCancellingNode } }
                ?: InvokeOnCompletion(handler)
        }
        node.job = this
        return node
    }

    private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode) =
        list.addLastIf(node) { this.state === expect }

    private fun promoteEmptyToNodeList(state: Empty) {
        // try to promote it to LIST state with the corresponding state
        val list = NodeList()
        val update = if (state.isActive) list else InactiveNodeList(list)
        _state.compareAndSet(state, update)
    }

    private fun promoteSingleToNodeList(state: JobNode) {
        // try to promote it to list (SINGLE+ state)
        state.addOneIfEmpty(NodeList())
        // it must be in SINGLE+ state or state has changed (node could have need removed from state)
        val list = state.nextNode // either our NodeList or somebody else won the race, updated state
        // just attempt converting it to list if state is still the same, then we'll continue lock-free loop
        _state.compareAndSet(state, list)
    }

    public final override suspend fun join() {
        if (!joinInternal()) { // fast-path no wait
            coroutineContext.checkCompletion()
            return // do not suspend
        }
        return joinSuspend() // slow-path wait
    }

    private fun joinInternal(): Boolean {
        loopOnState { state ->
            if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait
            if (startInternal(state) >= 0) return true // wait unless need to retry
        }
    }

    private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
        // We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers
        cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(cont).asHandler))
    }

    public final override val onJoin: SelectClause0
        get() = this

    // registerSelectJoin
    public final override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
        // fast-path -- check state and select/return if needed
        loopOnState { state ->
            if (select.isSelected) return
            if (state !is Incomplete) {
                // already complete -- select result
                if (select.trySelect()) {
                    block.startCoroutineUnintercepted(select.completion)
                }
                return
            }
            if (startInternal(state) == 0) {
                // slow-path -- register waiter for completion
                select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(select, block).asHandler))
                return
            }
        }
    }

    /**
     * @suppress **This is unstable API and it is subject to change.**
     */
    internal fun removeNode(node: JobNode) {
        // remove logic depends on the state of the job
        loopOnState { state ->
            when (state) {
                is JobNode -> { // SINGE/SINGLE+ state -- one completion handler
                    if (state !== node) return // a different job node --> we were already removed
                    // try remove and revert back to empty state
                    if (_state.compareAndSet(state, EMPTY_ACTIVE)) return
                }
                is Incomplete -> { // may have a list of completion handlers
                    // remove node from the list if there is a list
                    if (state.list != null) node.remove()
                    return
                }
                else -> return // it is complete and does not have any completion handlers
            }
        }
    }

    /**
     * Returns `true` for job that do not have "body block" to complete and should immediately go into
     * completing state and start waiting for children.
     *
     * @suppress **This is unstable API and it is subject to change.**
     */
    internal open val onCancelComplete: Boolean get() = false

    // external cancel with cause, never invoked implicitly from internal machinery
    public override fun cancel(cause: CancellationException?) {
        cancelInternal(cause ?: defaultCancellationException())
    }

    protected open fun cancellationExceptionMessage(): String = "Job was cancelled"

    // HIDDEN in Job interface. Invoked only by legacy compiled code.
    // external cancel with (optional) cause, never invoked implicitly from internal machinery
    @Deprecated(level = DeprecationLevel.HIDDEN, message = "Added since 1.2.0 for binary compatibility with versions <= 1.1.x")
    public override fun cancel(cause: Throwable?): Boolean {
        cancelInternal(cause?.toCancellationException() ?: defaultCancellationException())
        return true
    }

    // It is overridden in channel-linked implementation
    public open fun cancelInternal(cause: Throwable) {
        cancelImpl(cause)
    }

    // Parent is cancelling child
    public final override fun parentCancelled(parentJob: ParentJob) {
        cancelImpl(parentJob)
    }

    /**
     * Child was cancelled with a cause.
     * In this method parent decides whether it cancels itself (e.g. on a critical failure) and whether it handles the exception of the child.
     * It is overridden in supervisor implementations to completely ignore any child cancellation.
     * Returns `true` if exception is handled, `false` otherwise (then caller is responsible for handling an exception)
     *
     * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
     * may leak to the [CoroutineExceptionHandler].
     */
    public open fun childCancelled(cause: Throwable): Boolean {
        if (cause is CancellationException) return true
        return cancelImpl(cause) && handlesException
    }

    /**
     * Makes this [Job] cancelled with a specified [cause].
     * It is used in [AbstractCoroutine]-derived classes when there is an internal failure.
     */
    public fun cancelCoroutine(cause: Throwable?): Boolean = cancelImpl(cause)

    // cause is Throwable or ParentJob when cancelChild was invoked
    // returns true is exception was handled, false otherwise
    internal fun cancelImpl(cause: Any?): Boolean {
        var finalState: Any? = COMPLETING_ALREADY
        if (onCancelComplete) {
            // make sure it is completing, if cancelMakeCompleting returns state it means it had make it
            // completing and had recorded exception
            finalState = cancelMakeCompleting(cause)
            if (finalState === COMPLETING_WAITING_CHILDREN) return true
        }
        if (finalState === COMPLETING_ALREADY) {
            finalState = makeCancelling(cause)
        }
        return when {
            finalState === COMPLETING_ALREADY -> true
            finalState === COMPLETING_WAITING_CHILDREN -> true
            finalState === TOO_LATE_TO_CANCEL -> false
            else -> {
                afterCompletion(finalState)
                true
            }
        }
    }

    // cause is Throwable or ParentJob when cancelChild was invoked
    // It contains a loop and never returns COMPLETING_RETRY, can return
    // COMPLETING_ALREADY -- if already completed/completing
    // COMPLETING_WAITING_CHILDREN -- if started waiting for children
    // final state -- when completed, for call to afterCompletion
    private fun cancelMakeCompleting(cause: Any?): Any? {
        loopOnState { state ->
            if (state !is Incomplete || state is Finishing && state.isCompleting) {
                // already completed/completing, do not even create exception to propose update
                return COMPLETING_ALREADY
            }
            val proposedUpdate = CompletedExceptionally(createCauseException(cause))
            val finalState = tryMakeCompleting(state, proposedUpdate)
            if (finalState !== COMPLETING_RETRY) return finalState
        }
    }

    @Suppress("NOTHING_TO_INLINE") // Save a stack frame
    internal inline fun defaultCancellationException(message: String? = null, cause: Throwable? = null) =
        JobCancellationException(message ?: cancellationExceptionMessage(), cause, this)

    override fun getChildJobCancellationCause(): CancellationException {
        // determine root cancellation cause of this job (why is it cancelling its children?)
        val state = this.state
        val rootCause = when (state) {
            is Finishing -> state.rootCause
            is CompletedExceptionally -> state.cause
            is Incomplete -> error("Cannot be cancelling child in this state: $state")
            else -> null // create exception with the below code on normal completion
        }
        return (rootCause as? CancellationException) ?: JobCancellationException("Parent job is ${stateString(state)}", rootCause, this)
    }

    // cause is Throwable or ParentJob when cancelChild was invoked
    private fun createCauseException(cause: Any?): Throwable = when (cause) {
        is Throwable? -> cause ?: defaultCancellationException()
        else -> (cause as ParentJob).getChildJobCancellationCause()
    }

    // transitions to Cancelling state
    // cause is Throwable or ParentJob when cancelChild was invoked
    // It contains a loop and never returns COMPLETING_RETRY, can return
    // COMPLETING_ALREADY -- if already completing or successfully made cancelling, added exception
    // COMPLETING_WAITING_CHILDREN -- if started waiting for children, added exception
    // TOO_LATE_TO_CANCEL -- too late to cancel, did not add exception
    // final state -- when completed, for call to afterCompletion
    private fun makeCancelling(cause: Any?): Any? {
        var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause)
        loopOnState { state ->
            when (state) {
                is Finishing -> { // already finishing -- collect exceptions
                    val notifyRootCause = synchronized(state) {
                        if (state.isSealed) return TOO_LATE_TO_CANCEL // already sealed -- cannot add exception nor mark cancelled
                        // add exception, do nothing is parent is cancelling child that is already being cancelled
                        val wasCancelling = state.isCancelling // will notify if was not cancelling
                        // Materialize missing exception if it is the first exception (otherwise -- don't)
                        if (cause != null || !wasCancelling) {
                            val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
                            state.addExceptionLocked(causeException)
                        }
                        // take cause for notification if was not in cancelling state before
                        state.rootCause.takeIf { !wasCancelling }
                    }
                    notifyRootCause?.let { notifyCancelling(state.list, it) }
                    return COMPLETING_ALREADY
                }
                is Incomplete -> {
                    // Not yet finishing -- try to make it cancelling
                    val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
                    if (state.isActive) {
                        // active state becomes cancelling
                        if (tryMakeCancelling(state, causeException)) return COMPLETING_ALREADY
                    } else {
                        // non active state starts completing
                        val finalState = tryMakeCompleting(state, CompletedExceptionally(causeException))
                        when {
                            finalState === COMPLETING_ALREADY -> error("Cannot happen in $state")
                            finalState === COMPLETING_RETRY -> return@loopOnState
                            else -> return finalState
                        }
                    }
                }
                else -> return TOO_LATE_TO_CANCEL // already complete
            }
        }
    }

    // Performs promotion of incomplete coroutine state to NodeList for the purpose of
    // converting coroutine state to Cancelling, returns null when need to retry
    private fun getOrPromoteCancellingList(state: Incomplete): NodeList? = state.list ?:
        when (state) {
            is Empty -> NodeList() // we can allocate new empty list that'll get integrated into Cancelling state
            is JobNode -> {
                // SINGLE/SINGLE+ must be promoted to NodeList first, because otherwise we cannot
                // correctly capture a reference to it
                promoteSingleToNodeList(state)
                null // retry
            }
            else -> error("State should have list: $state")
        }

    // try make new Cancelling state on the condition that we're still in the expected state
    private fun tryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean {
        assert { state !is Finishing } // only for non-finishing states
        assert { state.isActive } // only for active states
        // get state's list or else promote to list to correctly operate on child lists
        val list = getOrPromoteCancellingList(state) ?: return false
        // Create cancelling state (with rootCause!)
        val cancelling = Finishing(list, false, rootCause)
        if (!_state.compareAndSet(state, cancelling)) return false
        // Notify listeners
        notifyCancelling(list, rootCause)
        return true
    }

    /**
     * Completes this job. Used by [CompletableDeferred.complete] (and exceptionally)
     * and by [JobImpl.cancel]. It returns `false` on repeated invocation
     * (when this job is already completing).
     */
    internal fun makeCompleting(proposedUpdate: Any?): Boolean {
        loopOnState { state ->
            val finalState = tryMakeCompleting(state, proposedUpdate)
            when {
                finalState === COMPLETING_ALREADY -> return false
                finalState === COMPLETING_WAITING_CHILDREN -> return true
                finalState === COMPLETING_RETRY -> return@loopOnState
                else -> {
                    afterCompletion(finalState)
                    return true
                }
            }
        }
    } 

    /**
     * Completes this job. Used by [AbstractCoroutine.resume].
     * It throws [IllegalStateException] on repeated invocation (when this job is already completing).
     * Returns:
     * * [COMPLETING_WAITING_CHILDREN] if started waiting for children.
     * * Final state otherwise (caller should do [afterCompletion])
     */
    internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
        loopOnState { state ->
            val finalState = tryMakeCompleting(state, proposedUpdate)
            when {
                finalState === COMPLETING_ALREADY ->
                    throw IllegalStateException(
                        "Job $this is already complete or completing, " +
                            "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
                    )
                finalState === COMPLETING_RETRY -> return@loopOnState
                else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
            }
        }
    }

    // Returns one of COMPLETING symbols or final state:
    // COMPLETING_ALREADY -- when already complete or completing
    // COMPLETING_RETRY -- when need to retry due to interference
    // COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
    // final state -- when completed, for call to afterCompletion
    private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
        if (state !is Incomplete)
            return COMPLETING_ALREADY
        /*
         * FAST PATH -- no children to wait for && simple state (no list) && not cancelling => can complete immediately
         * Cancellation (failures) always have to go through Finishing state to serialize exception handling.
         * Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join)
         * which may miss unhandled exception.
         */
        if ((state is Empty || state is JobNode) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) {
            if (tryFinalizeSimpleState(state, proposedUpdate)) {
                // Completed successfully on fast path -- return updated state
                return proposedUpdate
            }
            return COMPLETING_RETRY
        }
        // The separate slow-path function to simplify profiling
        return tryMakeCompletingSlowPath(state, proposedUpdate)
    }

    // Returns one of COMPLETING symbols or final state:
    // COMPLETING_ALREADY -- when already complete or completing
    // COMPLETING_RETRY -- when need to retry due to interference
    // COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
    // final state -- when completed, for call to afterCompletion
    private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
        // get state's list or else promote to list to correctly operate on child lists
        val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
        // promote to Finishing state if we are not in it yet
        // This promotion has to be atomic w.r.t to state change, so that a coroutine that is not active yet
        // atomically transition to finishing & completing state
        val finishing = state as? Finishing ?: Finishing(list, false, null)
        // must synchronize updates to finishing state
        var notifyRootCause: Throwable? = null
        synchronized(finishing) {
            // check if this state is already completing
            if (finishing.isCompleting) return COMPLETING_ALREADY
            // mark as completing
            finishing.isCompleting = true
            // if we need to promote to finishing then atomically do it here.
            // We do it as early is possible while still holding the lock. This ensures that we cancelImpl asap
            // (if somebody else is faster) and we synchronize all the threads on this finishing lock asap.
            if (finishing !== state) {
                if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
            }
            // ## IMPORTANT INVARIANT: Only one thread (that had set isCompleting) can go past this point
            assert { !finishing.isSealed } // cannot be sealed
            // add new proposed exception to the finishing state
            val wasCancelling = finishing.isCancelling
            (proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
            // If it just becomes cancelling --> must process cancelling notifications
            notifyRootCause = finishing.rootCause.takeIf { !wasCancelling }
        }
        // process cancelling notification here -- it cancels all the children _before_ we start to to wait them (sic!!!)
        notifyRootCause?.let { notifyCancelling(list, it) }
        // now wait for children
        val child = firstChild(state)
        if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
            return COMPLETING_WAITING_CHILDREN
        // otherwise -- we have not children left (all were already cancelled?)
        return finalizeFinishingState(finishing, proposedUpdate)
    }

    private val Any?.exceptionOrNull: Throwable?
        get() = (this as? CompletedExceptionally)?.cause

    private fun firstChild(state: Incomplete) =
        state as? ChildHandleNode ?: state.list?.nextChild()

    // return false when there is no more incomplete children to wait
    // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
    private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean {
        val handle = child.childJob.invokeOnCompletion(
            invokeImmediately = false,
            handler = ChildCompletion(this, state, child, proposedUpdate).asHandler
        )
        if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
        val nextChild = child.nextChild() ?: return false
        return tryWaitForChild(state, nextChild, proposedUpdate)
    }

    // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
    private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) {
        assert { this.state === state } // consistency check -- it cannot change while we are waiting for children
        // figure out if we need to wait for next child
        val waitChild = lastChild.nextChild()
        // try wait for next child
        if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
        // no more children to wait -- try update state
        val finalState = finalizeFinishingState(state, proposedUpdate)
        afterCompletion(finalState)
    }

    private fun LockFreeLinkedListNode.nextChild(): ChildHandleNode? {
        var cur = this
        while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head)
        while (true) {
            cur = cur.nextNode
            if (cur.isRemoved) continue
            if (cur is ChildHandleNode) return cur
            if (cur is NodeList) return null // checked all -- no more children
        }
    }

    public final override val children: Sequence<Job> get() = sequence {
        when (val state = this@JobSupport.state) {
            is ChildHandleNode -> yield(state.childJob)
            is Incomplete -> state.list?.let { list ->
                list.forEach<ChildHandleNode> { yield(it.childJob) }
            }
        }
    }

    @Suppress("OverridingDeprecatedMember")
    public final override fun attachChild(child: ChildJob): ChildHandle {
        /*
         * Note: This function attaches a special ChildHandleNode node object. This node object
         * is handled in a special way on completion on the coroutine (we wait for all of them) and
         * is handled specially by invokeOnCompletion itself -- it adds this node to the list even
         * if the job is already cancelling. For cancelling state child is attached under state lock.
         * It's required to properly wait all children before completion and provide linearizable hierarchy view:
         * If child is attached when the job is already being cancelled, such child will receive immediate notification on
         * cancellation, but parent *will* wait for that child before completion and will handle its exception.
         */
        return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(child).asHandler) as ChildHandle
    }

    /**
     * Override to process any exceptions that were encountered while invoking completion handlers
     * installed via [invokeOnCompletion].
     *
     * @suppress **This is unstable API and it is subject to change.**
     */
    internal open fun handleOnCompletionException(exception: Throwable) {
        throw exception
    }

    /**
     * This function is invoked once as soon as this job is being cancelled for any reason or completes,
     * similarly to [invokeOnCompletion] with `onCancelling` set to `true`.
     *
     * The meaning of [cause] parameter:
     * * Cause is `null` when the job has completed normally.
     * * Cause is an instance of [CancellationException] when the job was cancelled _normally_.
     *   **It should not be treated as an error**. In particular, it should not be reported to error logs.
     * * Otherwise, the job had been cancelled or failed with exception.
     *
     * The specified [cause] is not the final cancellation cause of this job.
     * A job may produce other exceptions while it is failing and the final cause might be different.
     *
     * @suppress **This is unstable API and it is subject to change.*
     */
    protected open fun onCancelling(cause: Throwable?) {}

    /**
     * Returns `true` for scoped coroutines.
     * Scoped coroutine is a coroutine that is executed sequentially within the enclosing scope without any concurrency.
     * Scoped coroutines always handle any exception happened within -- they just rethrow it to the enclosing scope.
     * Examples of scoped coroutines are `coroutineScope`, `withTimeout` and `runBlocking`.
     */
    protected open val isScopedCoroutine: Boolean get() = false

    /**
     * Returns `true` for jobs that handle their exceptions or integrate them into the job's result via [onCompletionInternal].
     * A valid implementation of this getter should recursively check parent as well before returning `false`.
     *
     * The only instance of the [Job] that does not handle its exceptions is [JobImpl] and its subclass [SupervisorJobImpl].
     * @suppress **This is unstable API and it is subject to change.*
     */
    internal open val handlesException: Boolean get() = true

    /**
     * Handles the final job [exception] that was not handled by the parent coroutine.
     * Returns `true` if it handles exception (so handling at later stages is not needed).
     * It is designed to be overridden by launch-like coroutines
     * (`StandaloneCoroutine` and `ActorCoroutine`) that don't have a result type
     * that can represent exceptions.
     *
     * This method is invoked **exactly once** when the final exception of the job is determined
     * and before it becomes complete. At the moment of invocation the job and all its children are complete.
     */
    protected open fun handleJobException(exception: Throwable): Boolean = false

    /**
     * Override for completion actions that need to update some external object depending on job's state,
     * right before all the waiters for coroutine's completion are notified.
     *
     * @param state the final state.
     *
     * @suppress **This is unstable API and it is subject to change.**
     */
    protected open fun onCompletionInternal(state: Any?) {}

    /**
     * Override for the very last action on job's completion to resume the rest of the code in
     * scoped coroutines. It is called when this job is externally completed in an unknown
     * context and thus should resume with a default mode.
     *
     * @suppress **This is unstable API and it is subject to change.**
     */
    protected open fun afterCompletion(state: Any?) {}

    // for nicer debugging
    public override fun toString(): String =
        "${toDebugString()}@$hexAddress"

    @InternalCoroutinesApi
    public fun toDebugString(): String = "${nameString()}{${stateString(state)}}"

    /**
     * @suppress **This is unstable API and it is subject to change.**
     */
    internal open fun nameString(): String = classSimpleName

    private fun stateString(state: Any?): String = when (state) {
        is Finishing -> when {
            state.isCancelling -> "Cancelling"
            state.isCompleting -> "Completing"
            else -> "Active"
        }
        is Incomplete -> if (state.isActive) "Active" else "New"
        is CompletedExceptionally -> "Cancelled"
        else -> "Completed"
    }

    // Completing & Cancelling states,
    // All updates are guarded by synchronized(this), reads are volatile
    @Suppress("UNCHECKED_CAST")
    private class Finishing(
        override val list: NodeList,
        isCompleting: Boolean,
        rootCause: Throwable?
    ) : SynchronizedObject(), Incomplete {
        private val _isCompleting = atomic(isCompleting)
        var isCompleting: Boolean
            get() = _isCompleting.value
            set(value) { _isCompleting.value = value }

        private val _rootCause = atomic(rootCause)
        var rootCause: Throwable? // NOTE: rootCause is kept even when SEALED
            get() = _rootCause.value
            set(value) { _rootCause.value = value }

        private val _exceptionsHolder = atomic<Any?>(null)
        private var exceptionsHolder: Any? // Contains null | Throwable | ArrayList | SEALED
            get() = _exceptionsHolder.value
            set(value) { _exceptionsHolder.value = value }

        // Note: cannot be modified when sealed
        val isSealed: Boolean get() = exceptionsHolder === SEALED
        val isCancelling: Boolean get() = rootCause != null
        override val isActive: Boolean get() = rootCause == null // !isCancelling

        // Seals current state and returns list of exceptions
        // guarded by `synchronized(this)`
        fun sealLocked(proposedException: Throwable?): List<Throwable> {
            val list = when(val eh = exceptionsHolder) { // volatile read
                null -> allocateList()
                is Throwable -> allocateList().also { it.add(eh) }
                is ArrayList<*> -> eh as ArrayList<Throwable>
                else -> error("State is $eh") // already sealed -- cannot happen
            }
            val rootCause = this.rootCause // volatile read
            rootCause?.let { list.add(0, it) } // note -- rootCause goes to the beginning
            if (proposedException != null && proposedException != rootCause) list.add(proposedException)
            exceptionsHolder = SEALED
            return list
        }

        // guarded by `synchronized(this)`
        fun addExceptionLocked(exception: Throwable) {
            val rootCause = this.rootCause // volatile read
            if (rootCause == null) {
                this.rootCause = exception
                return
            }
            if (exception === rootCause) return // nothing to do
            when (val eh = exceptionsHolder) { // volatile read
                null -> exceptionsHolder = exception
                is Throwable -> {
                    if (exception === eh) return // nothing to do
                    exceptionsHolder = allocateList().apply {
                        add(eh)
                        add(exception)

                    }
                }
                is ArrayList<*> -> (eh as ArrayList<Throwable>).add(exception)
                else -> error("State is $eh") // already sealed -- cannot happen
            }
        }

        private fun allocateList() = ArrayList<Throwable>(4)

        override fun toString(): String =
            "Finishing[cancelling=$isCancelling, completing=$isCompleting, rootCause=$rootCause, exceptions=$exceptionsHolder, list=$list]"
    }

    private val Incomplete.isCancelling: Boolean
        get() = this is Finishing && isCancelling

    // Used by parent that is waiting for child completion
    private class ChildCompletion(
        private val parent: JobSupport,
        private val state: Finishing,
        private val child: ChildHandleNode,
        private val proposedUpdate: Any?
    ) : JobNode() {
        override fun invoke(cause: Throwable?) {
            parent.continueCompleting(state, child, proposedUpdate)
        }
    }

    private class AwaitContinuation<T>(
        delegate: Continuation<T>,
        private val job: JobSupport
    ) : CancellableContinuationImpl<T>(delegate, MODE_CANCELLABLE) {
        override fun getContinuationCancellationCause(parent: Job): Throwable {
            val state = job.state
            /*
             * When the job we are waiting for had already completely completed exceptionally or
             * is failing, we shall use its root/completion cause for await's result.
             */
            if (state is Finishing) state.rootCause?.let { return it }
            if (state is CompletedExceptionally) return state.cause
            return parent.getCancellationException()
        }

        protected override fun nameString(): String =
            "AwaitContinuation"
    }

    /*
     * =================================================================================================
     * This is ready-to-use implementation for Deferred interface.
     * However, it is not type-safe. Conceptually it just exposes the value of the underlying
     * completed state as `Any?`
     * =================================================================================================
     */

    public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally

    public fun getCompletionExceptionOrNull(): Throwable? {
        val state = this.state
        check(state !is Incomplete) { "This job has not completed yet" }
        return state.exceptionOrNull
    }

    /**
     * @suppress **This is unstable API and it is subject to change.**
     */
    internal fun getCompletedInternal(): Any? {
        val state = this.state
        check(state !is Incomplete) { "This job has not completed yet" }
        if (state is CompletedExceptionally) throw state.cause
        return state.unboxState()
    }

    /**
     * @suppress **This is unstable API and it is subject to change.**
     */
    internal suspend fun awaitInternal(): Any? {
        // fast-path -- check state (avoid extra object creation)
        while (true) { // lock-free loop on state
            val state = this.state
            if (state !is Incomplete) {
                // already complete -- just return result
                if (state is CompletedExceptionally) { // Slow path to recover stacktrace
                    recoverAndThrow(state.cause)
                }
                return state.unboxState()

            }
            if (startInternal(state) >= 0) break // break unless needs to retry
        }
        return awaitSuspend() // slow-path
    }

    private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
        /*
         * Custom code here, so that parent coroutine that is using await
         * on its child deferred (async) coroutine would throw the exception that this child had
         * thrown and not a JobCancellationException.
         */
        val cont = AwaitContinuation(uCont.intercepted(), this)
        cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler))
        cont.getResult()
    }

    /**
     * @suppress **This is unstable API and it is subject to change.**
     */
    // registerSelectAwaitInternal
    @Suppress("UNCHECKED_CAST")
    internal fun <T, R> registerSelectClause1Internal(select: SelectInstance<R>, block: suspend (T) -> R) {
        // fast-path -- check state and select/return if needed
        loopOnState { state ->
            if (select.isSelected) return
            if (state !is Incomplete) {
                // already complete -- select result
                if (select.trySelect()) {
                    if (state is CompletedExceptionally) {
                        select.resumeSelectWithException(state.cause)
                    }
                    else {
                        block.startCoroutineUnintercepted(state.unboxState() as T, select.completion)
                    }
                }
                return
            }
            if (startInternal(state) == 0) {
                // slow-path -- register waiter for completion
                select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(select, block).asHandler))
                return
            }
        }
    }

    /**
     * @suppress **This is unstable API and it is subject to change.**
     */
    @Suppress("UNCHECKED_CAST")
    internal fun <T, R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R) {
        val state = this.state
        // Note: await is non-atomic (can be cancelled while dispatched)
        if (state is CompletedExceptionally)
            select.resumeSelectWithException(state.cause)
        else
            block.startCoroutineCancellable(state.unboxState() as T, select.completion)
    }
}

/*
 * Class to represent object as the final state of the Job
 */
private class IncompleteStateBox(@JvmField val state: Incomplete)
internal fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this
internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: this

// --------------- helper classes & constants for job implementation

@SharedImmutable
private val COMPLETING_ALREADY = Symbol("COMPLETING_ALREADY")
@JvmField
@SharedImmutable
internal val COMPLETING_WAITING_CHILDREN = Symbol("COMPLETING_WAITING_CHILDREN")
@SharedImmutable
private val COMPLETING_RETRY = Symbol("COMPLETING_RETRY")
@SharedImmutable
private val TOO_LATE_TO_CANCEL = Symbol("TOO_LATE_TO_CANCEL")

private const val RETRY = -1
private const val FALSE = 0
private const val TRUE = 1

@SharedImmutable
private val SEALED = Symbol("SEALED")
@SharedImmutable
private val EMPTY_NEW = Empty(false)
@SharedImmutable
private val EMPTY_ACTIVE = Empty(true)

private class Empty(override val isActive: Boolean) : Incomplete {
    override val list: NodeList? get() = null
    override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
}

internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {
    init { initParentJobInternal(parent) }
    override val onCancelComplete get() = true
    /*
     * Check whether parent is able to handle exceptions as well.
     * With this check, an exception in that pattern will be handled once:
     * ```
     * launch {
     *     val child = Job(coroutineContext[Job])
     *     launch(child) { throw ... }
     * }
     * ```
     */
    override val handlesException: Boolean = handlesException()
    override fun complete() = makeCompleting(Unit)
    override fun completeExceptionally(exception: Throwable): Boolean =
        makeCompleting(CompletedExceptionally(exception))

    @JsName("handlesExceptionF")
    private fun handlesException(): Boolean {
        var parentJob = (parentHandle as? ChildHandleNode)?.job ?: return false
        while (true) {
            if (parentJob.handlesException) return true
            parentJob = (parentJob.parentHandle as? ChildHandleNode)?.job ?: return false
        }
    }
}

// -------- invokeOnCompletion nodes

internal interface Incomplete {
    val isActive: Boolean
    val list: NodeList? // is null only for Empty and JobNode incomplete state objects
}

internal abstract class JobNode : CompletionHandlerBase(), DisposableHandle, Incomplete {
    /**
     * Initialized by [JobSupport.makeNode].
     */
    lateinit var job: JobSupport
    override val isActive: Boolean get() = true
    override val list: NodeList? get() = null
    override fun dispose() = job.removeNode(this)
    override fun toString() = "$classSimpleName@$hexAddress[job@${job.hexAddress}]"
}

internal class NodeList : LockFreeLinkedListHead(), Incomplete {
    override val isActive: Boolean get() = true
    override val list: NodeList get() = this

    fun getString(state: String) = buildString {
        append("List{")
        append(state)
        append("}[")
        var first = true
        this@NodeList.forEach<JobNode> { node ->
            if (first) first = false else append(", ")
            append(node)
        }
        append("]")
    }

    override fun toString(): String =
        if (DEBUG) getString("Active") else super.toString()
}

internal class InactiveNodeList(
    override val list: NodeList
) : Incomplete {
    override val isActive: Boolean get() = false
    override fun toString(): String = if (DEBUG) list.getString("New") else super.toString()
}

private class InvokeOnCompletion(
    private val handler: CompletionHandler
) : JobNode()  {
    override fun invoke(cause: Throwable?) = handler.invoke(cause)
}

private class ResumeOnCompletion(
    private val continuation: Continuation<Unit>
) : JobNode() {
    override fun invoke(cause: Throwable?) = continuation.resume(Unit)
}

private class ResumeAwaitOnCompletion<T>(
    private val continuation: CancellableContinuationImpl<T>
) : JobNode() {
    override fun invoke(cause: Throwable?) {
        val state = job.state
        assert { state !is Incomplete }
        if (state is CompletedExceptionally) {
            // Resume with with the corresponding exception to preserve it
            continuation.resumeWithException(state.cause)
        } else {
            // Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
            @Suppress("UNCHECKED_CAST")
            continuation.resume(state.unboxState() as T)
        }
    }
}

internal class DisposeOnCompletion(
    private val handle: DisposableHandle
) : JobNode() {
    override fun invoke(cause: Throwable?) = handle.dispose()
}

private class SelectJoinOnCompletion<R>(
    private val select: SelectInstance<R>,
    private val block: suspend () -> R
) : JobNode() {
    override fun invoke(cause: Throwable?) {
        if (select.trySelect())
            block.startCoroutineCancellable(select.completion)
    }
}

private class SelectAwaitOnCompletion<T, R>(
    private val select: SelectInstance<R>,
    private val block: suspend (T) -> R
) : JobNode() {
    override fun invoke(cause: Throwable?) {
        if (select.trySelect())
            job.selectAwaitCompletion(select, block)
    }
}

// -------- invokeOnCancellation nodes

/**
 * Marker for node that shall be invoked on in _cancelling_ state.
 * **Note: may be invoked multiple times.**
 */
internal abstract class JobCancellingNode : JobNode()

private class InvokeOnCancelling(
    private val handler: CompletionHandler
) : JobCancellingNode()  {
    // delegate handler shall be invoked at most once, so here is an additional flag
    private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu
    override fun invoke(cause: Throwable?) {
        if (_invoked.compareAndSet(0, 1)) handler.invoke(cause)
    }
}

internal class ChildHandleNode(
    @JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
    override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
    override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
}

// Same as ChildHandleNode, but for cancellable continuation
internal class ChildContinuation(
    @JvmField val child: CancellableContinuationImpl<*>
) : JobCancellingNode() {
    override fun invoke(cause: Throwable?) {
        child.parentCancelled(child.getContinuationCancellationCause(job))
    }
}