File Coverage

XS.xs
Criterion Covered Total %
statement 492 685 71.8
branch 223 374 59.6
condition n/a
subroutine n/a
pod n/a
total 715 1059 67.5


line stmt bran cond sub pod time code
1             #include "easyxs/easyxs.h"
2              
3             #include
4              
5             #define MY_CXT_KEY "Promise::XS::_guts" XS_VERSION
6              
7             #define BASE_CLASS "Promise::XS"
8              
9             #define PROMISE_CLASS "Promise::XS::Promise"
10             #define PROMISE_CLASS_TYPE Promise__XS__Promise
11              
12             #define DEFERRED_CLASS "Promise::XS::Deferred"
13             #define DEFERRED_CLASS_TYPE Promise__XS__Deferred
14              
15             #define CONVERTER_CR_NAME "_convert_to_our_promise"
16              
17             #ifdef PL_phase
18             #define PXS_IS_GLOBAL_DESTRUCTION PL_phase == PERL_PHASE_DESTRUCT
19             #else
20             #define PXS_IS_GLOBAL_DESTRUCTION PL_dirty
21             #endif
22              
23             #define RESULT_IS_RESOLVED(result) (result->state == XSPR_RESULT_RESOLVED)
24             #define RESULT_IS_REJECTED(result) (result->state == XSPR_RESULT_REJECTED)
25              
26             #define UNUSED(x) (void)(x)
27              
28             #define DEBUG_AWAITABLE 0
29             #if DEBUG_AWAITABLE
30             # define _DO_DEBUG_AWAITABLE() fprintf(stderr, "# %s\n", __func__)
31             #else
32             # define _DO_DEBUG_AWAITABLE()
33             #endif
34              
35             #define _MAX_RECURSION 254
36              
37             /* We could look here at the full stack depth
38             (PL_stack_sp - PL_stack_base), but we only really care about
39             our *own* recursion, not the overall Perl stack.
40             */
41              
42             #define _CROAK_IF_LOOKS_LIKE_INFINITE_RECURSION \
43             dMY_CXT; \
44             if (MY_CXT.callback_depth > _MAX_RECURSION) { \
45             croak("Exceeded %u callbacks; infinite recursion detected!", _MAX_RECURSION); \
46             }
47              
48             typedef enum {
49             _DEFER_NONE = 0,
50             _DEFER_ANYEVENT,
51             _DEFER_IOASYNC,
52             _DEFER_MOJO,
53             } event_system_t;
54              
55             typedef struct xspr_callback_s xspr_callback_t;
56             typedef struct xspr_promise_s xspr_promise_t;
57             typedef struct xspr_result_s xspr_result_t;
58             typedef struct xspr_callback_queue_s xspr_callback_queue_t;
59             typedef struct pxs_all_state_s pxs_all_state_t;
60              
61             typedef enum {
62             XSPR_STATE_NONE,
63             XSPR_STATE_PENDING,
64             XSPR_STATE_FINISHED,
65             } xspr_promise_state_t;
66              
67             typedef enum {
68             XSPR_RESULT_NONE,
69             XSPR_RESULT_RESOLVED,
70             XSPR_RESULT_REJECTED,
71             XSPR_RESULT_BOTH
72             } xspr_result_state_t;
73              
74             typedef enum {
75             // from then() or catch()
76             XSPR_CALLBACK_PERL,
77              
78             // from finally()
79             XSPR_CALLBACK_FINALLY,
80              
81              
82             // from a promise returned from a then() or catch() callback
83             XSPR_CALLBACK_CHAIN,
84              
85             // from a promise returned from a finally() callback
86             XSPR_CALLBACK_FINALLY_CHAIN,
87              
88             // from all()
89             XSPR_CALLBACK_ALL
90             } xspr_callback_type_t;
91              
92             struct xspr_callback_s {
93             xspr_callback_type_t type;
94              
95             union {
96             struct {
97             SV* on_resolve;
98             SV* on_reject;
99             xspr_promise_t* next;
100             } perl;
101              
102             struct {
103             SV* on_finally;
104             xspr_promise_t* next;
105             } finally;
106              
107             xspr_promise_t* chain;
108              
109             struct {
110             xspr_result_t* original_result;
111             xspr_promise_t* chain_promise;
112             } finally_chain;
113              
114             struct {
115             pxs_all_state_t* state;
116             unsigned index;
117             } all;
118             };
119             };
120              
121             struct pxs_all_state_s {
122             xspr_promise_t* output;
123             unsigned total;
124             unsigned remaining;
125             bool done;
126             SV** results; /* array[total] of AV refs, one per input */
127             int refs;
128             };
129              
130             struct xspr_result_s {
131             xspr_result_state_t state;
132             SV** results;
133             int count;
134             int refs;
135             bool rejection_should_warn;
136             };
137              
138             struct xspr_promise_s {
139             xspr_promise_state_t state;
140             pid_t detect_leak_pid;
141             int refs;
142             union {
143             struct {
144             xspr_callback_t** callbacks;
145             int callbacks_count;
146             } pending;
147             struct {
148             xspr_result_t *result;
149             } finished;
150             };
151              
152             /* For async/await: */
153             SV* on_ready_immediate;
154             SV* self_sv_ref;
155             };
156              
157             struct xspr_callback_queue_s {
158             xspr_promise_t* origin;
159             xspr_callback_t* callback;
160             xspr_callback_queue_t* next;
161             };
162              
163             xspr_callback_t* xspr_callback_new_perl(pTHX_ SV* on_resolve, SV* on_reject, xspr_promise_t* next);
164             xspr_callback_t* xspr_callback_new_chain(pTHX_ xspr_promise_t* chain);
165             xspr_callback_t* xspr_callback_new_finally_chain(pTHX_ xspr_result_t* original_result, xspr_promise_t* next_promise);
166             void xspr_callback_process(pTHX_ xspr_callback_t* callback, xspr_promise_t* origin);
167             void xspr_callback_free(pTHX_ xspr_callback_t* callback);
168              
169             static xspr_callback_t* xspr_callback_new_all(pTHX_ pxs_all_state_t* state, unsigned index);
170             static void pxs_all_state_decref(pTHX_ pxs_all_state_t* state);
171             static void pxs_all_state_finish_resolved(pTHX_ pxs_all_state_t* state);
172              
173             /* Guard used by SAVEDESTRUCTOR_X to free partially-initialised all() state
174             on exception (croak/die). Both pointers must be set to NULL once they
175             are owned by normal reference-counting so the destructor becomes a no-op. */
176             typedef struct {
177             xspr_promise_t* output;
178             pxs_all_state_t* state;
179             } pxs_all_guard_t;
180              
181             typedef struct {
182             xspr_callback_t* callback;
183             } pxs_callback_guard_t;
184              
185             xspr_promise_t* xspr_promise_new(pTHX);
186             void xspr_promise_then(pTHX_ xspr_promise_t* promise, xspr_callback_t* callback);
187             void xspr_promise_finish(pTHX_ xspr_promise_t* promise, xspr_result_t *result);
188             void xspr_promise_incref(pTHX_ xspr_promise_t* promise);
189             void xspr_promise_decref(pTHX_ xspr_promise_t* promise);
190              
191             xspr_result_t* xspr_result_new(pTHX_ xspr_result_state_t state, unsigned count);
192             xspr_result_t* pxs_result_clone(pTHX_ xspr_result_t* old);
193             xspr_result_t* xspr_result_from_error(pTHX_ const char *error);
194             void xspr_result_incref(pTHX_ xspr_result_t* result);
195             void xspr_result_decref(pTHX_ xspr_result_t* result);
196              
197             xspr_result_t* xspr_invoke_perl(pTHX_ SV* perl_fn, SV** inputs, unsigned input_count);
198             xspr_promise_t* xspr_promise_from_sv(pTHX_ SV* input);
199              
200              
201             typedef struct {
202             xspr_callback_queue_t* queue_head;
203             xspr_callback_queue_t* queue_tail;
204             int in_flush;
205             int backend_scheduled;
206             unsigned char callback_depth;
207             #ifdef USE_ITHREADS
208             tTHX owner;
209             #endif
210             SV* pxs_flush_cr;
211             HV* pxs_base_stash;
212             HV* pxs_promise_stash;
213             HV* pxs_deferred_stash;
214             SV* deferral_cr;
215             SV* deferral_arg;
216             event_system_t event_system;
217             SV* stop_cr;
218             } my_cxt_t;
219              
220             typedef struct {
221             xspr_promise_t* promise;
222             } DEFERRED_CLASS_TYPE;
223              
224             typedef struct {
225             xspr_promise_t* promise;
226             } PROMISE_CLASS_TYPE;
227              
228             //----------------------------------------------------------------------
229              
230             START_MY_CXT
231              
232             /* Process a single callback */
233 1616           void xspr_callback_process(pTHX_ xspr_callback_t* callback, xspr_promise_t* origin)
234             {
235 1616 50         ASSUME(origin->state == XSPR_STATE_FINISHED);
236              
237 1616 100         if (callback->type == XSPR_CALLBACK_CHAIN) {
238 765           xspr_promise_finish(aTHX_ callback->chain, origin->finished.result);
239              
240 851 100         } else if (callback->type == XSPR_CALLBACK_FINALLY_CHAIN) {
241 2           xspr_promise_finish(aTHX_
242             callback->finally_chain.chain_promise,
243 2 100         RESULT_IS_REJECTED(origin->finished.result) ? origin->finished.result : callback->finally_chain.original_result
244             );
245              
246 1681 100         } else if (callback->type == XSPR_CALLBACK_PERL || callback->type == XSPR_CALLBACK_FINALLY) {
    100          
247             SV* callback_fn;
248             xspr_promise_t* next_promise;
249              
250 832 100         if (callback->type == XSPR_CALLBACK_FINALLY) {
251 11           callback_fn = callback->finally.on_finally;
252 11           next_promise = callback->finally.next;
253              
254             /* A finally() “catches” its parent promise, even as it
255             rethrows any failure from it. */
256 11 50         if (callback_fn && SvOK(callback_fn)) {
    50          
257 11           origin->finished.result->rejection_should_warn = false;
258             }
259             } else {
260 821           next_promise = callback->perl.next;
261              
262 821 100         if (RESULT_IS_RESOLVED(origin->finished.result)) {
263 796           callback_fn = callback->perl.on_resolve;
264 25 50         } else if (RESULT_IS_REJECTED(origin->finished.result)) {
265 25           callback_fn = callback->perl.on_reject;
266              
267 25 50         if (callback_fn && SvOK(callback_fn)) {
    50          
268 25           origin->finished.result->rejection_should_warn = false;
269             }
270              
271             } else {
272 0           callback_fn = NULL; /* Be quiet, bad compiler! */
273 0           ASSUME(0);
274             }
275             }
276              
277 832 50         if (callback_fn != NULL) {
278             xspr_result_t* callback_result;
279              
280 832 100         if (callback->type == XSPR_CALLBACK_FINALLY) {
281 11           callback_result = xspr_invoke_perl(aTHX_ callback_fn, NULL, 0);
282             }
283             else {
284 821           callback_result = xspr_invoke_perl(aTHX_
285             callback_fn,
286 821           origin->finished.result->results,
287 821           origin->finished.result->count
288             );
289             }
290              
291 832 100         if (next_promise == NULL) {
292 41 100         if (callback->type == XSPR_CALLBACK_FINALLY && RESULT_IS_RESOLVED(callback_result) && RESULT_IS_REJECTED(origin->finished.result)) {
    50          
    50          
293              
294             /* This handles the case where finally() is called in
295             void context and the parent promise rejects. In this
296             case we need an unhandled-rejection warning right
297             away since, given the absence of a next_promise,
298             by definition we have an unhandled rejection.
299             */
300 1           xspr_result_decref(aTHX_ callback_result);
301 1           callback_result = pxs_result_clone( aTHX_ origin->finished.result );
302             }
303             }
304             else {
305 791           bool finish_promise = true;
306              
307 791 100         if (callback_result->count > 0 && callback_result->state == XSPR_RESULT_RESOLVED) {
    100          
308 778           xspr_promise_t* promise = xspr_promise_from_sv(aTHX_ callback_result->results[0]);
309              
310 778 100         if (promise != NULL) {
311              
312 769 100         if (callback_result->count > 1) {
313 2           warn( BASE_CLASS ": %d extra response(s) returned after promise! Treating promise like normal return.", callback_result->count - 1 );
314             }
315 767 50         else if (promise == next_promise) {
316 0           finish_promise = false;
317              
318             /* This is an extreme corner case the A+ spec made us implement: we need to reject
319             * cases where the promise created from then() is passed back to its own callback */
320 0           xspr_result_t* chain_error = xspr_result_from_error(aTHX_ "TypeError");
321 0           xspr_promise_finish(aTHX_ next_promise, chain_error);
322              
323 0           xspr_result_decref(aTHX_ chain_error);
324             }
325             else {
326 767           finish_promise = false;
327              
328             /* Fairly normal case: we returned a promise from the callback */
329             xspr_callback_t* chainback;
330              
331 767 100         if (callback->type == XSPR_CALLBACK_FINALLY) {
332 2           chainback = xspr_callback_new_finally_chain(aTHX_ origin->finished.result, next_promise);
333             }
334             else {
335 765           chainback = xspr_callback_new_chain(aTHX_ next_promise);
336             }
337              
338 767           xspr_promise_then(aTHX_ promise, chainback);
339             }
340              
341 769           xspr_promise_decref(aTHX_ promise);
342             }
343             }
344              
345 791 100         if (finish_promise) {
346             xspr_result_t* final_result;
347 24           bool final_result_needs_decref = false;;
348              
349 24 100         if ((callback->type == XSPR_CALLBACK_FINALLY) && RESULT_IS_RESOLVED(callback_result)) {
    100          
350 7           final_result = origin->finished.result;
351              
352 7 100         if (RESULT_IS_REJECTED(final_result)) {
353              
354             // If finally()’s callback succeeds, it takes
355             // on the resolution status of the “parent”
356             // promise. If that promise rejected, then,
357             // the finally’s promise also rejects. Notably,
358             // the finally’s promise should STILL trigger
359             // an unhandled-rejection warning, even if the
360             // parent’s rejection is eventually handled.
361 4           final_result = pxs_result_clone(aTHX_ final_result);
362 4           final_result_needs_decref = true;
363             }
364             }
365             else {
366 17           final_result = callback_result;
367             }
368              
369 24           xspr_promise_finish(aTHX_ next_promise, final_result);
370              
371 24 100         if (final_result_needs_decref) {
372 4           xspr_result_decref(aTHX_ final_result);
373             }
374             }
375             }
376              
377 832           xspr_result_decref(aTHX_ callback_result);
378              
379 0 0         } else if (next_promise) {
380             /* No callback, so we're just passing the result along. */
381 0           xspr_result_t* result = origin->finished.result;
382 0           xspr_promise_finish(aTHX_ next_promise, result);
383             }
384              
385 17 50         } else if (callback->type == XSPR_CALLBACK_ALL) {
386 17           pxs_all_state_t* state = callback->all.state;
387 17           unsigned index = callback->all.index;
388              
389 17 100         if (state->done) {
390             /* Already finished - suppress unhandled rejection warnings */
391 4 100         if (RESULT_IS_REJECTED(origin->finished.result)) {
392 3           origin->finished.result->rejection_should_warn = false;
393             }
394 13 100         } else if (RESULT_IS_RESOLVED(origin->finished.result)) {
395             /* Store results for this index as an AV ref */
396 8           AV* av = newAV();
397             unsigned i;
398 20 100         for (i = 0; i < (unsigned)origin->finished.result->count; i++) {
399 12           av_push(av, SvREFCNT_inc(origin->finished.result->results[i]));
400             }
401 8           state->results[index] = newRV_noinc((SV*)av);
402              
403 8           state->remaining--;
404 8 100         if (state->remaining == 0) {
405 4           state->done = true;
406 4           pxs_all_state_finish_resolved(aTHX_ state);
407             }
408             } else {
409 5 50         ASSUME(RESULT_IS_REJECTED(origin->finished.result));
410             /* First rejection - clone the result for output so that output
411             has its own independent rejection_should_warn flag. The user
412             must still .catch() the all() promise; if they don't, the
413             cloned result will warn. Mark the source result as handled
414             since all() is consuming it. */
415 5           state->done = true;
416 5           xspr_result_t* rejection = pxs_result_clone(aTHX_ origin->finished.result);
417 5           origin->finished.result->rejection_should_warn = false;
418 5           xspr_promise_finish(aTHX_ state->output, rejection);
419 5           xspr_result_decref(aTHX_ rejection);
420             }
421              
422             } else {
423 0           ASSUME(0);
424             }
425 1616           }
426              
427             /* Frees the xspr_callback_t structure */
428 1623           void xspr_callback_free(pTHX_ xspr_callback_t *callback)
429             {
430 1623 100         if (callback->type == XSPR_CALLBACK_CHAIN) {
431 765           xspr_promise_decref(aTHX_ callback->chain);
432              
433 858 100         } else if (callback->type == XSPR_CALLBACK_PERL) {
434 824           SvREFCNT_dec(callback->perl.on_resolve);
435 824           SvREFCNT_dec(callback->perl.on_reject);
436 824 100         if (callback->perl.next != NULL)
437 784           xspr_promise_decref(aTHX_ callback->perl.next);
438              
439 34 100         } else if (callback->type == XSPR_CALLBACK_FINALLY) {
440 14           SvREFCNT_dec(callback->finally.on_finally);
441 14 100         if (callback->finally.next != NULL)
442 13           xspr_promise_decref(aTHX_ callback->finally.next);
443              
444 20 100         } else if (callback->type == XSPR_CALLBACK_FINALLY_CHAIN) {
445 2           xspr_promise_decref(aTHX_ callback->finally_chain.chain_promise);
446 2           xspr_result_decref(aTHX_ callback->finally_chain.original_result);
447              
448 18 50         } else if (callback->type == XSPR_CALLBACK_ALL) {
449 18           pxs_all_state_decref(aTHX_ callback->all.state);
450              
451             } else {
452 0           ASSUME(0);
453             }
454              
455 1623           Safefree(callback);
456 1623           }
457              
458             /* Process the queue until it's empty */
459 0           void xspr_queue_flush(pTHX)
460             {
461             dMY_CXT;
462              
463 0 0         if (MY_CXT.in_flush) {
464             /* XXX: is there a reasonable way to trigger this? */
465 0           warn("Rejecting request to flush promises queue: already processing");
466 0           return;
467             }
468 0           MY_CXT.in_flush = 1;
469              
470 0 0         while (MY_CXT.queue_head != NULL) {
471             /* Save some typing... */
472 0           xspr_callback_queue_t *cur = MY_CXT.queue_head;
473              
474             /* Process the callback. This could trigger some Perl code, meaning we
475             * could end up with additional queue entries after this */
476 0           xspr_callback_process(aTHX_ cur->callback, cur->origin);
477              
478             /* Free-ing the callback structure could theoretically trigger DESTROY subs,
479             * enqueueing new callbacks, so we can't assume the loop ends here! */
480 0           MY_CXT.queue_head = cur->next;
481 0 0         if (cur->next == NULL) {
482 0           MY_CXT.queue_tail = NULL;
483             }
484              
485             /* Destroy the structure */
486 0           xspr_callback_free(aTHX_ cur->callback);
487 0           xspr_promise_decref(aTHX_ cur->origin);
488 0           Safefree(cur);
489             }
490              
491 0           MY_CXT.in_flush = 0;
492 0           MY_CXT.backend_scheduled = 0;
493             }
494              
495             /* Add a callback invocation into the queue for the given origin promise.
496             * Takes ownership of the callback structure */
497 0           void xspr_queue_add(pTHX_ xspr_callback_t* callback, xspr_promise_t* origin)
498             {
499             dMY_CXT;
500              
501             xspr_callback_queue_t* entry;
502 0           Newxz(entry, 1, xspr_callback_queue_t);
503 0           entry->origin = origin;
504 0           xspr_promise_incref(aTHX_ entry->origin);
505 0           entry->callback = callback;
506              
507 0 0         if (MY_CXT.queue_head == NULL) {
508 0 0         ASSUME(MY_CXT.queue_tail == NULL);
509             /* Empty queue, so now it's just us */
510 0           MY_CXT.queue_head = entry;
511 0           MY_CXT.queue_tail = entry;
512              
513             } else {
514 0 0         ASSUME(MY_CXT.queue_tail != NULL);
515             /* Existing queue, add to the tail */
516 0           MY_CXT.queue_tail->next = entry;
517 0           MY_CXT.queue_tail = entry;
518             }
519 0           }
520              
521 0           void _call_with_1_or_2_args( pTHX_ SV* cb, SV* maybe_arg0, SV* arg1 ) {
522             // --- Almost all copy-paste from “perlcall” … blegh!
523 0           dSP;
524              
525 0           ENTER;
526 0           SAVETMPS;
527              
528 0 0         PUSHMARK(SP);
529              
530 0 0         if (maybe_arg0) {
531 0 0         EXTEND(SP, 2);
532 0           PUSHs(maybe_arg0);
533             }
534             else {
535 0 0         EXTEND(SP, 1);
536             }
537              
538 0           PUSHs( arg1 );
539 0           PUTBACK;
540              
541 0           call_sv(cb, G_VOID);
542              
543 0 0         FREETMPS;
544 0           LEAVE;
545              
546 0           return;
547             }
548              
549 11           void _call_pv_with_args( pTHX_ const char* subname, SV** args, unsigned argscount )
550             {
551             // --- Almost all copy-paste from “perlcall” … blegh!
552 11           dSP;
553              
554 11           ENTER;
555 11           SAVETMPS;
556              
557 11 50         PUSHMARK(SP);
558 11 50         EXTEND(SP, argscount);
559              
560             unsigned i;
561 22 100         for (i=0; i
562 11           PUSHs(args[i]);
563             }
564              
565 11           PUTBACK;
566              
567 11           call_pv(subname, G_VOID);
568              
569 11 50         FREETMPS;
570 11           LEAVE;
571              
572 11           return;
573             }
574              
575 0           void xspr_queue_maybe_schedule(pTHX)
576             {
577             dMY_CXT;
578 0 0         if (MY_CXT.queue_head == NULL || MY_CXT.backend_scheduled || MY_CXT.in_flush) {
    0          
    0          
579 0           return;
580             }
581              
582 0           MY_CXT.backend_scheduled = 1;
583             /* We trust our backends to be sane, so little guarding against errors here */
584              
585 0 0         if (!MY_CXT.pxs_flush_cr) {
586 0           HV *stash = gv_stashpv(DEFERRED_CLASS, 0);
587 0           GV* method_gv = gv_fetchmethod_autoload(stash, "___flush", FALSE);
588 0 0         if (method_gv != NULL && isGV(method_gv) && GvCV(method_gv) != NULL) {
    0          
    0          
589 0           MY_CXT.pxs_flush_cr = newRV_inc( (SV*)GvCV(method_gv) );
590             }
591             else {
592 0           ASSUME(0);
593             }
594             }
595              
596 0           _call_with_1_or_2_args(aTHX_ MY_CXT.deferral_cr, MY_CXT.deferral_arg, MY_CXT.pxs_flush_cr);
597             }
598              
599             /* Invoke the user's perl code. We need to be really sure this doesn't return early via croak/next/etc. */
600 833           xspr_result_t* xspr_invoke_perl(pTHX_ SV* perl_fn, SV** inputs, unsigned input_count)
601             {
602 833           dSP;
603             unsigned count, i;
604             xspr_result_t* result;
605              
606 833 50         if (!SvROK(perl_fn)) {
607 0           return xspr_result_from_error(aTHX_ "promise callbacks need to be a CODE reference");
608             }
609              
610 833           ENTER;
611 833           SAVETMPS;
612              
613 833 50         PUSHMARK(SP);
614 833 50         EXTEND(SP, input_count);
615 1671 100         for (i = 0; i < input_count; i++) {
616 838           PUSHs(inputs[i]);
617             }
618 833           PUTBACK;
619              
620             /* Clear $_ so that callbacks don't end up talking to each other by accident */
621 833           SAVE_DEFSV;
622 833           DEFSV_set(sv_newmortal());
623              
624 833           count = call_sv(perl_fn, G_EVAL | G_ARRAY);
625              
626 833           SPAGAIN;
627              
628 833 50         if (SvTRUE(ERRSV)) {
    100          
629 6           result = xspr_result_new(aTHX_ XSPR_RESULT_REJECTED, 1);
630 6 50         result->results[0] = newSVsv(ERRSV);
631             } else {
632 827           result = xspr_result_new(aTHX_ XSPR_RESULT_RESOLVED, count);
633 1642 100         for (i = 0; i < count; i++) {
634 815           result->results[count-i-1] = SvREFCNT_inc(POPs);
635             }
636             }
637 833           PUTBACK;
638              
639 833 50         FREETMPS;
640 833           LEAVE;
641              
642 833           return result;
643             }
644              
645             /* Increments the ref count for xspr_result_t */
646 1629           void xspr_result_incref(pTHX_ xspr_result_t* result)
647             {
648 1629           result->refs++;
649 1629           }
650              
651             /* Decrements the ref count for the xspr_result_t, freeing the structure if needed */
652 3307           void xspr_result_decref(pTHX_ xspr_result_t* result)
653             {
654 3307 100         if (--(result->refs) == 0) {
655 1678 100         if (RESULT_IS_REJECTED(result) && result->rejection_should_warn) {
    100          
656 11           SV* warn_args[result->count];
657              
658             // Dupe the results to warn about:
659 11           Copy(result->results, warn_args, result->count, SV*);
660              
661 11           _call_pv_with_args(aTHX_ "Promise::XS::Promise::_warn_unhandled", warn_args, result->count);
662             }
663              
664             unsigned i;
665 3360 100         for (i = 0; i < result->count; i++) {
666 1682           SvREFCNT_dec(result->results[i]);
667             }
668 1678           Safefree(result->results);
669 1678           Safefree(result);
670             }
671 3307           }
672              
673 1616           void xspr_immediate_process(pTHX_ xspr_callback_t* callback, xspr_promise_t* promise)
674             {
675             dMY_CXT;
676              
677 1616           MY_CXT.callback_depth++;
678              
679 1616           xspr_callback_process(aTHX_ callback, promise);
680              
681 1616           MY_CXT.callback_depth--;
682              
683             /* Destroy the structure */
684 1616           xspr_callback_free(aTHX_ callback);
685 1616           }
686              
687             #define _XSPR_FREE_ON_READY_IMMEDIATE(promise) \
688             SvREFCNT_dec(SvRV(promise->on_ready_immediate)); \
689             SvREFCNT_dec(promise->on_ready_immediate);
690              
691             /* Transitions a promise from pending to finished, using the given result */
692 1629           void xspr_promise_finish(pTHX_ xspr_promise_t* promise, xspr_result_t* result)
693             {
694             dMY_CXT;
695              
696 1629 50         ASSUME(promise->state == XSPR_STATE_PENDING);
697 1629           xspr_callback_t** pending_callbacks = promise->pending.callbacks;
698 1629           int count = promise->pending.callbacks_count;
699              
700 1629           promise->state = XSPR_STATE_FINISHED;
701 1629           promise->finished.result = result;
702 1629           xspr_result_incref(aTHX_ promise->finished.result);
703              
704             /* fprintf(stderr, "finishing p=%p (%d callbacks)\n", promise, count); */
705              
706             /* For async/await: */
707 1629 50         if (promise->on_ready_immediate != NULL) {
708 0           xspr_invoke_perl(aTHX_ promise->on_ready_immediate, NULL, 0);
709              
710 0           _XSPR_FREE_ON_READY_IMMEDIATE(promise);
711 0           promise->on_ready_immediate = NULL;
712             }
713              
714             unsigned i;
715 1657 100         for (i = 0; i < count; i++) {
716              
717             // If any of this promise’s callbacks has an on_reject, then
718             // the promise’s result is rejection-handled.
719 28 100         if (pending_callbacks[i]->type == XSPR_CALLBACK_PERL && RESULT_IS_REJECTED(result) && result->rejection_should_warn) {
    100          
    100          
720 6           SV* on_reject = pending_callbacks[i]->perl.on_reject;
721 6 50         if (on_reject && SvOK(on_reject)) {
    50          
722 6           result->rejection_should_warn = false;
723             }
724             }
725              
726 28 50         if (MY_CXT.deferral_cr) {
727 0           xspr_queue_add(aTHX_ pending_callbacks[i], promise);
728             }
729             else {
730 28           xspr_immediate_process(aTHX_ pending_callbacks[i], promise);
731             }
732             }
733              
734 1629 50         if (promise->self_sv_ref != NULL) {
735              
736             // After we set self_sv_ref, Future::AsyncAwait manipulates
737             // things a bit such that WEAKREF is set on the reference and
738             // the referent’s refcount is decremented. Thus, we can forgo
739             // the reference-count decrement here. We still check for the
740             // WEAKREF flag, though, just in case something changed.
741             //
742 0 0         if (!SvWEAKREF(promise->self_sv_ref)) {
743 0           SvREFCNT_dec(SvRV(promise->self_sv_ref));
744             }
745              
746 0           SvREFCNT_dec(promise->self_sv_ref);
747 0           promise->self_sv_ref = NULL;
748             }
749              
750 1629 50         if (MY_CXT.deferral_cr) {
751 0           xspr_queue_maybe_schedule(aTHX);
752             }
753              
754 1629           Safefree(pending_callbacks);
755 1629           }
756              
757             /* Create a new xspr_result_t object with the given number of item slots */
758 1678           xspr_result_t* xspr_result_new(pTHX_ xspr_result_state_t state, unsigned count)
759             {
760             xspr_result_t* result;
761 1678           Newxz(result, 1, xspr_result_t);
762 1678           Newxz(result->results, count, SV*);
763 1678           result->rejection_should_warn = true;
764 1678           result->state = state;
765 1678           result->refs = 1;
766 1678           result->count = count;
767 1678           return result;
768             }
769              
770 12           xspr_result_t* pxs_result_clone(pTHX_ xspr_result_t* old)
771             {
772 12           xspr_result_t* new = xspr_result_new(aTHX_ old->state, old->count);
773              
774             unsigned i;
775 26 100         for (i=0; icount; i++) {
776 14           new->results[i] = SvREFCNT_inc( old->results[i] );
777             }
778              
779 12           return new;
780             }
781              
782 0           xspr_result_t* xspr_result_from_error(pTHX_ const char *error)
783             {
784 0           xspr_result_t* result = xspr_result_new(aTHX_ XSPR_RESULT_REJECTED, 1);
785 0           result->results[0] = newSVpv(error, 0);
786 0           return result;
787             }
788              
789             /* Increments the ref count for xspr_promise_t */
790 2408           void xspr_promise_incref(pTHX_ xspr_promise_t* promise)
791             {
792 2408           (promise->refs)++;
793 2408           }
794              
795             /* Decrements the ref count for the xspr_promise_t, freeing the structure if needed */
796 4053           void xspr_promise_decref(pTHX_ xspr_promise_t *promise)
797             {
798 4053 100         if (--(promise->refs) == 0) {
799 1645 100         if (promise->state == XSPR_STATE_PENDING) {
800             /* XXX: is this a bad thing we should warn for? */
801 16           int count = promise->pending.callbacks_count;
802 16           xspr_callback_t **callbacks = promise->pending.callbacks;
803             int i;
804 23 100         for (i = 0; i < count; i++) {
805 7           xspr_callback_free(aTHX_ callbacks[i]);
806             }
807 16           Safefree(callbacks);
808              
809 1629 50         } else if (promise->state == XSPR_STATE_FINISHED) {
810 1629           xspr_result_decref(aTHX_ promise->finished.result);
811              
812             } else {
813 0           ASSUME(0);
814             }
815              
816 1645 50         if (promise->on_ready_immediate != NULL) {
817 0           _XSPR_FREE_ON_READY_IMMEDIATE(promise);
818             }
819              
820 1645           Safefree(promise);
821             }
822 4053           }
823              
824             /* Creates a new promise. It's that simple. */
825 1645           xspr_promise_t* xspr_promise_new(pTHX)
826             {
827             xspr_promise_t* promise;
828 1645           Newxz(promise, 1, xspr_promise_t);
829              
830 1645           *promise = (xspr_promise_t) {
831             .refs = 1,
832             .state = XSPR_STATE_PENDING,
833             };
834              
835 1645           return promise;
836             }
837              
838 824           xspr_callback_t* xspr_callback_new_perl(pTHX_ SV* on_resolve, SV* on_reject, xspr_promise_t* next)
839             {
840             xspr_callback_t* callback;
841 824           Newxz(callback, 1, xspr_callback_t);
842 824           callback->type = XSPR_CALLBACK_PERL;
843 824 100         if (SvOK(on_resolve))
844 799           callback->perl.on_resolve = newSVsv(on_resolve);
845 824 100         if (SvOK(on_reject))
846 36           callback->perl.on_reject = newSVsv(on_reject);
847 824           callback->perl.next = next;
848 824 100         if (next)
849 784           xspr_promise_incref(aTHX_ callback->perl.next);
850 824           return callback;
851             }
852              
853 14           xspr_callback_t* xspr_callback_new_finally(pTHX_ SV* on_finally, xspr_promise_t* next)
854             {
855             xspr_callback_t* callback;
856 14           Newxz(callback, 1, xspr_callback_t);
857 14           callback->type = XSPR_CALLBACK_FINALLY;
858 14 50         if (SvOK(on_finally))
859 14           callback->finally.on_finally = newSVsv(on_finally);
860 14           callback->finally.next = next;
861 14 100         if (next)
862 13           xspr_promise_incref(aTHX_ callback->finally.next);
863 14           return callback;
864             }
865              
866 765           xspr_callback_t* xspr_callback_new_chain(pTHX_ xspr_promise_t* chain)
867             {
868             xspr_callback_t* callback;
869 765           Newxz(callback, 1, xspr_callback_t);
870 765           callback->type = XSPR_CALLBACK_CHAIN;
871 765           callback->chain = chain;
872 765           xspr_promise_incref(aTHX_ chain);
873 765           return callback;
874             }
875              
876 2           xspr_callback_t* xspr_callback_new_finally_chain(pTHX_ xspr_result_t* original_result, xspr_promise_t* next_promise)
877             {
878             xspr_callback_t* callback;
879 2           Newxz(callback, 1, xspr_callback_t);
880 2           callback->type = XSPR_CALLBACK_FINALLY_CHAIN;
881              
882             /*
883             callback->finally_chain.original_result = original_result;
884             xspr_result_incref(aTHX_ original_result);
885             */
886 2           callback->finally_chain.original_result = pxs_result_clone(aTHX_ original_result);
887              
888 2           callback->finally_chain.chain_promise = next_promise;
889 2           xspr_promise_incref(aTHX_ next_promise);
890              
891 2           return callback;
892             }
893              
894 6           static void pxs_all_state_finish_resolved(pTHX_ pxs_all_state_t* state)
895             {
896 6           xspr_result_t* result = xspr_result_new(aTHX_ XSPR_RESULT_RESOLVED, state->total);
897             unsigned i;
898              
899 18 100         for (i = 0; i < state->total; i++) {
900 12           result->results[i] = SvREFCNT_inc(state->results[i]);
901             }
902              
903 6           xspr_promise_finish(aTHX_ state->output, result);
904 6           xspr_result_decref(aTHX_ result);
905 6           }
906              
907 29           static void pxs_all_state_decref(pTHX_ pxs_all_state_t* state)
908             {
909 29           state->refs--;
910              
911 29 100         if (state->refs == 0) {
912 11           xspr_promise_decref(aTHX_ state->output);
913             unsigned i;
914 33 100         for (i = 0; i < state->total; i++) {
915 22 100         if (state->results[i]) SvREFCNT_dec(state->results[i]);
916             }
917 11           Safefree(state->results);
918 11           Safefree(state);
919             }
920 29           }
921              
922 11           static void _pxs_all_guard_cleanup(pTHX_ void* vp)
923             {
924 11           pxs_all_guard_t* g = (pxs_all_guard_t*)vp;
925             /* Only non-NULL when an exception aborted all() before ref-counting
926             took ownership. Free in reverse-allocation order. */
927 11 50         if (g->state) pxs_all_state_decref(aTHX_ g->state);
928 11 50         if (g->output) xspr_promise_decref(aTHX_ g->output);
929 11           Safefree(g);
930 11           }
931              
932 18           static void _pxs_callback_guard_cleanup(pTHX_ void* vp)
933             {
934 18           pxs_callback_guard_t* g = (pxs_callback_guard_t*)vp;
935 18 50         if (g->callback) xspr_callback_free(aTHX_ g->callback);
936 18           Safefree(g);
937 18           }
938              
939 18           static xspr_callback_t* xspr_callback_new_all(pTHX_ pxs_all_state_t* state, unsigned index)
940             {
941             xspr_callback_t* callback;
942 18           Newxz(callback, 1, xspr_callback_t);
943 18           callback->type = XSPR_CALLBACK_ALL;
944 18           callback->all.state = state;
945 18           callback->all.index = index;
946 18           return callback;
947             }
948              
949             /* Adds a then to the promise. Takes ownership of the callback */
950 1623           void xspr_promise_then(pTHX_ xspr_promise_t* promise, xspr_callback_t* callback)
951             {
952             dMY_CXT;
953              
954 1623 100         if (promise->state == XSPR_STATE_PENDING) {
955 35           promise->pending.callbacks_count++;
956 35           Renew(promise->pending.callbacks, promise->pending.callbacks_count, xspr_callback_t*);
957 35           promise->pending.callbacks[promise->pending.callbacks_count-1] = callback;
958              
959 1588 50         } else if (promise->state == XSPR_STATE_FINISHED) {
960              
961 1588 50         if (MY_CXT.deferral_cr) {
962 0           xspr_queue_add(aTHX_ callback, promise);
963 0           xspr_queue_maybe_schedule(aTHX);
964             }
965             else {
966 1588           xspr_immediate_process(aTHX_ callback, promise);
967             }
968             } else {
969 0           ASSUME(0);
970             }
971 1623           }
972              
973             /* Returns a promise if the given SV is a thenable. Ownership handed to the caller! */
974 800           xspr_promise_t* xspr_promise_from_sv(pTHX_ SV* input)
975             {
976 800 50         if (input == NULL || !sv_isobject(input)) {
    100          
977 13           return NULL;
978             }
979              
980             /* If we got one of our own promises: great, not much to do here! */
981 787 100         if (sv_derived_from(input, PROMISE_CLASS)) {
982 786           IV tmp = SvIV((SV*)SvRV(input));
983 786           PROMISE_CLASS_TYPE* promise = INT2PTR(PROMISE_CLASS_TYPE*, tmp);
984 786           xspr_promise_incref(aTHX_ promise->promise);
985 786           return promise->promise;
986             }
987              
988             /* Maybe we got another type of promise. Let's convert it */
989 1           GV* method_gv = gv_fetchmethod_autoload(SvSTASH(SvRV(input)), "then", FALSE);
990 1 50         if (method_gv != NULL && isGV(method_gv) && GvCV(method_gv) != NULL) {
    50          
    50          
991              
992 1           CV* converter_cv = get_cv(BASE_CLASS "::" CONVERTER_CR_NAME, 0);
993 1 50         if (!converter_cv) croak("Need " CONVERTER_CR_NAME "!");
994              
995 1           SV* converter_svcv = newRV_inc((SV*) converter_cv);
996 1           sv_2mortal(converter_svcv);
997              
998 1           xspr_result_t* new_result = xspr_invoke_perl(aTHX_ converter_svcv, &input, 1);
999 1 50         if (new_result->state == XSPR_RESULT_RESOLVED &&
1000 1 50         new_result->results != NULL &&
1001 1 50         new_result->count == 1 &&
1002 2           SvROK(new_result->results[0]) &&
1003 1           sv_derived_from(new_result->results[0], PROMISE_CLASS)) {
1004             /* This is expected: our conversion function returned us one of our own promises */
1005 1           IV tmp = SvIV((SV*)SvRV(new_result->results[0]));
1006 1           PROMISE_CLASS_TYPE* new_promise = INT2PTR(PROMISE_CLASS_TYPE*, tmp);
1007              
1008 1           xspr_promise_t* promise = new_promise->promise;
1009 1           xspr_promise_incref(aTHX_ promise);
1010              
1011 1           xspr_result_decref(aTHX_ new_result);
1012 1           return promise;
1013              
1014             } else {
1015 0           xspr_promise_t* promise = xspr_promise_new(aTHX);
1016 0           xspr_promise_finish(aTHX_ promise, new_result);
1017 0           xspr_result_decref(aTHX_ new_result);
1018 0           return promise;
1019             }
1020             }
1021              
1022             /* We didn't get a promise. */
1023 0           return NULL;
1024             }
1025              
1026 144           DEFERRED_CLASS_TYPE* _get_deferred_from_sv(pTHX_ SV *self_sv) {
1027 144           SV *referent = SvRV(self_sv);
1028 144           return INT2PTR(DEFERRED_CLASS_TYPE*, SvUV(referent));
1029             }
1030              
1031 2477           PROMISE_CLASS_TYPE* _get_promise_from_sv(pTHX_ SV *self_sv) {
1032 2477           SV *referent = SvRV(self_sv);
1033 2477           return INT2PTR(PROMISE_CLASS_TYPE*, SvUV(referent));
1034             }
1035              
1036 894           SV* _ptr_to_svrv(pTHX_ void* ptr, HV* stash) {
1037 894           SV* referent = newSVuv( PTR2UV(ptr) );
1038 894           SV* retval = newRV_noinc(referent);
1039 894           sv_bless(retval, stash);
1040              
1041 894           return retval;
1042             }
1043              
1044 1645           static inline xspr_promise_t* create_promise(pTHX) {
1045             dMY_CXT;
1046              
1047 1645           xspr_promise_t* promise = xspr_promise_new(aTHX);
1048              
1049 1645           SV *detect_leak_perl = NULL;
1050              
1051 1645           SV** dml_svgv = hv_fetchs( MY_CXT.pxs_base_stash, "DETECT_MEMORY_LEAKS", 0 );
1052              
1053 1645 50         if (dml_svgv) {
1054 1645           detect_leak_perl = GvSV(*dml_svgv);
1055             }
1056              
1057 1645 50         promise->detect_leak_pid = detect_leak_perl && SvTRUE(detect_leak_perl) ? getpid() : 0;
    100          
1058              
1059 1645           return promise;
1060             }
1061              
1062             /* Many promises are just thrown away after the final callback, no need to allocate a next promise for those */
1063 838           static inline xspr_promise_t* create_next_promise_if_needed(pTHX_ SV* original, SV** stack_ptr) {
1064 838 100         if (GIMME_V != G_VOID) {
1065             PROMISE_CLASS_TYPE* next_promise;
1066 797           Newxz(next_promise, 1, PROMISE_CLASS_TYPE);
1067              
1068 797           xspr_promise_t* next = create_promise(aTHX);
1069 797           next_promise->promise = next;
1070              
1071 797           *stack_ptr = sv_newmortal();
1072              
1073             // This would be simpler, but let’s facilitate subclassing.
1074             // sv_setref_pv(*stack_ptr, PROMISE_CLASS, (void*)next_promise);
1075              
1076 797           sv_setref_pv(*stack_ptr, NULL, (void*)next_promise);
1077 797           sv_bless(*stack_ptr, SvSTASH(SvRV(original)));
1078              
1079 797           return next;
1080             }
1081              
1082 41           return NULL;
1083             }
1084              
1085 1691           static inline void _warn_on_destroy_if_needed(pTHX_ xspr_promise_t* promise, SV* self_sv) {
1086 1691 100         if (promise->detect_leak_pid && PXS_IS_GLOBAL_DESTRUCTION && promise->detect_leak_pid == getpid()) {
    100          
    100          
1087 2           warn( "======================================================================\nXXXXXX - %s survived until global destruction; memory leak likely!\n======================================================================\n", SvPV_nolen(self_sv) );
1088             }
1089 1691           }
1090              
1091 6           static inline void _warn_weird_reject_if_needed( pTHX_ SV* self_sv, const char* funcname, I32 my_items ) {
1092              
1093 6           char *pkgname = NULL;
1094              
1095 6 100         HV *stash = (self_sv == NULL) ? NULL : SvSTASH( SvRV(self_sv) );
1096              
1097 6 100         if (stash != NULL) {
1098 3 50         pkgname = HvNAME(stash);
    50          
    50          
    0          
    50          
    50          
1099             }
1100              
1101 6 100         if (pkgname == NULL) pkgname = DEFERRED_CLASS;
1102              
1103 6 100         if (my_items == 0) {
1104 2           warn( "%s: Empty call to %s()", pkgname, funcname );
1105             }
1106             else {
1107 4           warn( "%s: %s() called with only uninitialized values (%" IVdf ")", pkgname, funcname, (IV) my_items);
1108             }
1109 6           }
1110              
1111 797           static inline void _resolve_promise(pTHX_ xspr_promise_t* promise_p, SV** args, I32 argslen) {
1112 797           xspr_result_t* result = xspr_result_new(aTHX_ XSPR_RESULT_RESOLVED, argslen);
1113              
1114             unsigned i;
1115 1601 100         for (i = 0; i < argslen; i++) {
1116 804           result->results[i] = newSVsv(args[i]);
1117             }
1118              
1119 797           xspr_promise_finish(aTHX_ promise_p, result);
1120 797           xspr_result_decref(aTHX_ result);
1121 797           }
1122              
1123 30           static inline void _reject_promise(pTHX_ SV* self_sv, xspr_promise_t* promise_p, SV** args, I32 argslen) {
1124 30           xspr_result_t* result = xspr_result_new(aTHX_ XSPR_RESULT_REJECTED, argslen);
1125              
1126 30           bool has_defined = false;
1127              
1128             unsigned i;
1129 61 100         for (i = 0; i < argslen; i++) {
1130 31           result->results[i] = newSVsv(args[i]);
1131              
1132 31 100         if (!has_defined && SvOK(result->results[i])) {
    100          
1133 24           has_defined = true;
1134             }
1135             }
1136              
1137 30 100         if (!has_defined) {
1138 6 100         const char* funcname = (self_sv == NULL) ? "rejected" : "reject";
1139              
1140 6           _warn_weird_reject_if_needed( aTHX_ self_sv, funcname, argslen );
1141             }
1142              
1143 30           xspr_promise_finish(aTHX_ promise_p, result);
1144 30           xspr_result_decref(aTHX_ result);
1145 30           }
1146              
1147 842           SV* _promise_to_sv(pTHX_ xspr_promise_t* promise_p) {
1148             dMY_CXT;
1149              
1150             PROMISE_CLASS_TYPE* promise_ptr;
1151 842           Newxz(promise_ptr, 1, PROMISE_CLASS_TYPE);
1152 842           promise_ptr->promise = promise_p;
1153              
1154 842           return _ptr_to_svrv(aTHX_ promise_ptr, MY_CXT.pxs_promise_stash);
1155             }
1156              
1157             /* When Future::AsyncAwait creates a promise/future it does NOT
1158             hold a strong reference to that object. Consequently, we have to
1159             ensure that the object lasts until we’re done with it. So introduce
1160             a (temporary!) circular reference. */
1161             #define _IMMORTALIZE_PROMISE_SV(promise_sv, promise_p) \
1162             do { \
1163             /* fprintf(stderr, "making immortal: sv=%p p=%p\n", promise_sv, promise_p); */ \
1164             promise_p->self_sv_ref = promise_sv; \
1165             SvREFCNT_inc(promise_sv); \
1166             SvREFCNT_inc(SvRV(promise_sv)); \
1167             } while (0)
1168              
1169 772           static inline SV* _create_preresolved_promise(pTHX_ SV** args, I32 argslen, bool immortalize) {
1170 772           xspr_promise_t* promise_p = create_promise(aTHX);
1171              
1172 772           _resolve_promise(aTHX_ promise_p, args, argslen);
1173              
1174 772           SV* promise_sv = _promise_to_sv(aTHX_ promise_p);
1175              
1176 772 50         if (immortalize) _IMMORTALIZE_PROMISE_SV(promise_sv, promise_p);
1177              
1178 772           return promise_sv;
1179             }
1180              
1181 11           static inline SV* _create_prerejected_promise(pTHX_ SV** args, I32 argslen, bool immortalize) {
1182 11           xspr_promise_t* promise_p = create_promise(aTHX);
1183              
1184 11           _reject_promise(aTHX_ NULL, promise_p, args, argslen);
1185              
1186 11           SV* promise_sv = _promise_to_sv(aTHX_ promise_p);
1187              
1188 11 50         if (immortalize) _IMMORTALIZE_PROMISE_SV(promise_sv, promise_p);
1189              
1190 11           return promise_sv;
1191             }
1192              
1193             //----------------------------------------------------------------------
1194 0           static SV* _get_nothing_cr_arg (pTHX) {
1195 0           return SvREFCNT_inc( get_sv("Promise::XS::Deferred::_NOTHING_CR", 0) );
1196             }
1197              
1198 0           static void _anyevent_wait_promise (pTHX_ SV* promise_sv) {
1199 0           SV* condvar = exs_call_method_scalar(
1200             sv_2mortal( newSVpvs("AnyEvent") ),
1201             "condvar",
1202             NULL
1203             );
1204              
1205 0           SV* catch_args[] = {
1206 0           _get_nothing_cr_arg(aTHX),
1207             NULL,
1208             };
1209              
1210 0           SV* caught = exs_call_method_scalar(
1211             promise_sv,
1212             "catch",
1213             catch_args
1214             );
1215              
1216 0           SV* finally_args[] = {
1217 0           SvREFCNT_inc(condvar),
1218             NULL,
1219             };
1220              
1221 0 0         exs_call_method_void(
1222             caught,
1223             "finally",
1224             finally_args
1225             );
1226              
1227 0           sv_2mortal(caught);
1228              
1229 0 0         exs_call_method_void(
1230             condvar,
1231             "recv",
1232             NULL
1233             );
1234              
1235 0           sv_2mortal(condvar);
1236 0           }
1237              
1238 0           static void _ioasync_wait_promise (pTHX_ SV* promise_sv, SV* loop_sv, SV* stop_cr) {
1239 0           SV* catch_args[] = {
1240 0           _get_nothing_cr_arg(aTHX),
1241             NULL,
1242             };
1243              
1244 0           SV* caught = exs_call_method_scalar(
1245             promise_sv,
1246             "catch",
1247             catch_args
1248             );
1249              
1250 0           SV* finally_args[] = { SvREFCNT_inc(stop_cr), NULL };
1251              
1252 0 0         exs_call_method_void(
1253             caught,
1254             "finally",
1255             finally_args
1256             );
1257              
1258 0           sv_2mortal(caught);
1259              
1260 0 0         exs_call_method_void(
1261             loop_sv,
1262             "run",
1263             NULL
1264             );
1265 0           }
1266              
1267 0           static void _mojo_wait_promise(pTHX_ SV* promise_sv, SV* stop_cr) {
1268 0           SV* catch_args[] = {
1269 0           _get_nothing_cr_arg(aTHX),
1270             NULL,
1271             };
1272              
1273 0           SV* caught = exs_call_method_scalar(
1274             promise_sv,
1275             "catch",
1276             catch_args
1277             );
1278              
1279 0           SV* finally_args[] = { SvREFCNT_inc(stop_cr), NULL };
1280              
1281 0 0         exs_call_method_void(
1282             caught,
1283             "finally",
1284             finally_args
1285             );
1286              
1287 0           sv_2mortal(caught);
1288              
1289 0 0         exs_call_method_void(
1290             sv_2mortal( newSVpvs("Mojo::IOLoop") ),
1291             "start",
1292             NULL
1293             );
1294 0           }
1295              
1296             //----------------------------------------------------------------------
1297              
1298             MODULE = Promise::XS PACKAGE = Promise::XS
1299              
1300             BOOT:
1301             {
1302             MY_CXT_INIT;
1303             #ifdef USE_ITHREADS
1304             MY_CXT.owner = aTHX;
1305             #endif
1306 28           MY_CXT.queue_head = NULL;
1307 28           MY_CXT.queue_tail = NULL;
1308 28           MY_CXT.in_flush = 0;
1309 28           MY_CXT.backend_scheduled = 0;
1310 28           MY_CXT.callback_depth = 0;
1311              
1312 28           MY_CXT.pxs_base_stash = gv_stashpv(BASE_CLASS, FALSE);
1313 28           MY_CXT.pxs_promise_stash = gv_stashpv(PROMISE_CLASS, FALSE);
1314 28           MY_CXT.pxs_deferred_stash = gv_stashpv(DEFERRED_CLASS, FALSE);
1315              
1316 28           MY_CXT.deferral_cr = NULL;
1317 28           MY_CXT.deferral_arg = NULL;
1318 28           MY_CXT.event_system = _DEFER_NONE;
1319 28           MY_CXT.stop_cr = NULL;
1320 28           MY_CXT.pxs_flush_cr = NULL;
1321             }
1322              
1323             # In some old thread-multi perls sv_dup_inc() wasn’t defined.
1324              
1325             #if defined(USE_ITHREADS) && defined(sv_dup_inc)
1326              
1327             # ithreads would seem to be a very bad idea in Promise-based code,
1328             # but anyway ..
1329              
1330             void
1331             CLONE(...)
1332             PPCODE:
1333              
1334             SV* pxs_flush_cr = NULL;
1335             SV* deferral_cr = NULL;
1336             event_system_t event_system;
1337             SV* deferral_arg = NULL;
1338             SV* stop_cr = NULL;
1339              
1340             {
1341             dMY_CXT;
1342              
1343             CLONE_PARAMS params = {NULL, 0, MY_CXT.owner};
1344              
1345             if ( MY_CXT.pxs_flush_cr ) {
1346             pxs_flush_cr = sv_dup_inc( MY_CXT.pxs_flush_cr, ¶ms );
1347             }
1348              
1349             if ( MY_CXT.deferral_cr ) {
1350             deferral_cr = sv_dup_inc( MY_CXT.deferral_cr, ¶ms );
1351             }
1352              
1353             if ( MY_CXT.deferral_arg ) {
1354             deferral_arg = sv_dup_inc( MY_CXT.deferral_arg, ¶ms );
1355             }
1356              
1357             event_system = MY_CXT.event_system;
1358              
1359             if ( MY_CXT.stop_cr ) {
1360             stop_cr = sv_dup_inc( MY_CXT.stop_cr, ¶ms );
1361             }
1362             }
1363              
1364             {
1365             MY_CXT_CLONE;
1366             MY_CXT.owner = aTHX;
1367              
1368             // Clone SVs
1369             MY_CXT.pxs_flush_cr = pxs_flush_cr;
1370             MY_CXT.deferral_cr = deferral_cr;
1371             MY_CXT.deferral_arg = deferral_arg;
1372             MY_CXT.event_system = event_system;
1373             MY_CXT.stop_cr = stop_cr;
1374              
1375             // Clone HVs
1376             MY_CXT.pxs_base_stash = gv_stashpv(BASE_CLASS, FALSE);
1377             MY_CXT.pxs_promise_stash = gv_stashpv(PROMISE_CLASS, FALSE);
1378             MY_CXT.pxs_deferred_stash = gv_stashpv(DEFERRED_CLASS, FALSE);
1379             }
1380              
1381             XSRETURN_UNDEF;
1382              
1383             #endif /* USE_ITHREADS && defined(sv_dup_inc) */
1384              
1385             SV *
1386             resolved(...)
1387             CODE:
1388 771           RETVAL = _create_preresolved_promise(aTHX_ &(ST(0)), items, false);
1389             OUTPUT:
1390             RETVAL
1391              
1392             SV *
1393             rejected(...)
1394             CODE:
1395 11           RETVAL = _create_prerejected_promise(aTHX_ &(ST(0)), items, false);
1396             OUTPUT:
1397             RETVAL
1398              
1399             #----------------------------------------------------------------------
1400              
1401             MODULE = Promise::XS PACKAGE = Promise::XS::Deferred
1402              
1403             PROTOTYPES: DISABLE
1404              
1405             BOOT:
1406 28           newCONSTSUB( gv_stashpvs(BASE_CLASS "::Deferred", FALSE), "_DEFER_ANYEVENT", newSVuv(_DEFER_ANYEVENT));
1407 28           newCONSTSUB( gv_stashpvs(BASE_CLASS "::Deferred", FALSE), "_DEFER_IOASYNC", newSVuv(_DEFER_IOASYNC));
1408 28           newCONSTSUB( gv_stashpvs(BASE_CLASS "::Deferred", FALSE), "_DEFER_MOJO", newSVuv(_DEFER_MOJO));
1409              
1410             SV *
1411             create()
1412             CODE:
1413             dMY_CXT;
1414              
1415             DEFERRED_CLASS_TYPE* deferred_ptr;
1416 52           Newxz(deferred_ptr, 1, DEFERRED_CLASS_TYPE);
1417              
1418 52           xspr_promise_t* promise = create_promise(aTHX);
1419              
1420 52           deferred_ptr->promise = promise;
1421              
1422 52           RETVAL = _ptr_to_svrv(aTHX_ deferred_ptr, MY_CXT.pxs_deferred_stash);
1423             OUTPUT:
1424             RETVAL
1425              
1426             void
1427             ___set_deferral_generic(SV* deferral_cr, SV* deferral_arg, UV event_system, SV* stop_cr=NULL)
1428             CODE:
1429             dMY_CXT;
1430              
1431             // deferral_cr = SvRV(deferral_cr);
1432              
1433 0 0         if (MY_CXT.deferral_cr) {
1434 0           SvREFCNT_dec(MY_CXT.deferral_cr);
1435             }
1436              
1437 0 0         if (MY_CXT.deferral_arg) {
1438 0           SvREFCNT_dec(MY_CXT.deferral_arg);
1439             }
1440              
1441 0 0         if (MY_CXT.stop_cr) {
1442 0           SvREFCNT_dec(MY_CXT.stop_cr);
1443             }
1444              
1445 0           MY_CXT.deferral_cr = SvREFCNT_inc(deferral_cr);
1446              
1447 0 0         MY_CXT.deferral_arg = SvOK(deferral_arg) ? SvREFCNT_inc(deferral_arg) : NULL;
1448              
1449 0           MY_CXT.event_system = event_system;
1450              
1451 0 0         MY_CXT.stop_cr = stop_cr ? SvREFCNT_inc(stop_cr) : NULL;
1452              
1453             # We don’t care if there are args or not.
1454             void
1455             ___flush(...)
1456             CODE:
1457             UNUSED(items);
1458 0           xspr_queue_flush(aTHX);
1459              
1460             SV*
1461             promise(SV* self_sv)
1462             CODE:
1463 46           DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);
1464              
1465 46           xspr_promise_incref(aTHX_ self->promise);
1466              
1467 46           RETVAL = _promise_to_sv(aTHX_ self->promise);
1468             OUTPUT:
1469             RETVAL
1470              
1471             SV*
1472             resolve(SV *self_sv, ...)
1473             CODE:
1474 25           DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);
1475              
1476 25 50         if (self->promise->state != XSPR_STATE_PENDING) {
1477 0           croak("Cannot resolve deferred: not pending");
1478             }
1479              
1480 25           _resolve_promise(aTHX_ self->promise, &(ST(1)), items - 1);
1481              
1482 25 100         if (GIMME_V == G_VOID) {
1483 23           RETVAL = NULL;
1484             }
1485             else {
1486 2           SvREFCNT_inc(self_sv);
1487 2           RETVAL = self_sv;
1488             }
1489             OUTPUT:
1490             RETVAL
1491              
1492             SV*
1493             reject(SV *self_sv, ...)
1494             CODE:
1495 19           DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);
1496              
1497 19 50         if (self->promise->state != XSPR_STATE_PENDING) {
1498 0           croak("Cannot reject deferred: not pending");
1499             }
1500              
1501 19           _reject_promise(aTHX_ self_sv, self->promise, &(ST(1)), items - 1);
1502              
1503 19 50         if (GIMME_V == G_VOID) {
1504 19           RETVAL = NULL;
1505             }
1506             else {
1507 0           SvREFCNT_inc(self_sv);
1508 0           RETVAL = self_sv;
1509             }
1510             OUTPUT:
1511             RETVAL
1512              
1513             bool
1514             is_pending(SV *self_sv)
1515             CODE:
1516 0           DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);
1517              
1518 0 0         RETVAL = (self->promise->state == XSPR_STATE_PENDING);
1519             OUTPUT:
1520             RETVAL
1521              
1522             SV*
1523             clear_unhandled_rejection(SV *self_sv)
1524             CODE:
1525 2           DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);
1526              
1527 2 50         if (self->promise->state == XSPR_STATE_FINISHED) {
1528 2           self->promise->finished.result->rejection_should_warn = false;
1529             }
1530              
1531 2 50         if (GIMME_V == G_VOID) {
1532 2           RETVAL = NULL;
1533             }
1534             else {
1535 0           SvREFCNT_inc(self_sv);
1536 0           RETVAL = self_sv;
1537             }
1538             OUTPUT:
1539             RETVAL
1540              
1541             void
1542             DESTROY(SV *self_sv)
1543             CODE:
1544 52           DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);
1545              
1546 52           _warn_on_destroy_if_needed(aTHX_ self->promise, self_sv);
1547              
1548 52           xspr_promise_decref(aTHX_ self->promise);
1549 52           Safefree(self);
1550              
1551             # ----------------------------------------------------------------------
1552              
1553             MODULE = Promise::XS PACKAGE = Promise::XS::Promise
1554              
1555             PROTOTYPES: DISABLE
1556              
1557             SV*
1558             all(...)
1559             CODE:
1560             /* items includes the class/self as the first argument;
1561             the actual promises start at ST(1). */
1562 12           unsigned count = (unsigned)(items - 1);
1563              
1564 12 100         if (count == 0) {
1565 1           RETVAL = _create_preresolved_promise(aTHX_ NULL, 0, false);
1566             } else {
1567             /* Guard freed by SAVEDESTRUCTOR_X on both normal and exceptional
1568             exits. Pointers are nulled out once ref-counting takes over,
1569             making the destructor a no-op on the happy path. */
1570             pxs_all_guard_t* guard;
1571 11           Newxz(guard, 1, pxs_all_guard_t);
1572 11           SAVEDESTRUCTOR_X(_pxs_all_guard_cleanup, guard);
1573              
1574 11           xspr_promise_t* output = create_promise(aTHX);
1575              
1576             pxs_all_state_t* state;
1577 11           Newxz(state, 1, pxs_all_state_t);
1578 11           *guard = (pxs_all_guard_t) {
1579             .output = output,
1580             .state = state,
1581             };
1582 11           *state = (pxs_all_state_t) {
1583             .output = output,
1584             .total = count,
1585             .remaining = count,
1586             .refs = 1,
1587             };
1588 11           xspr_promise_incref(aTHX_ output); /* state holds one ref */
1589 11           Newxz(state->results, count, SV*);
1590              
1591             unsigned i;
1592 33 100         for (i = 0; i < count; i++) {
1593 22           SV* input_sv = ST(i + 1);
1594 22           xspr_promise_t* input_promise = xspr_promise_from_sv(aTHX_ input_sv);
1595              
1596 22 100         if (input_promise == NULL) {
1597             /* Plain scalar - treat as already resolved */
1598 4           AV* av = newAV();
1599 4           av_push(av, newSVsv(input_sv));
1600 4           state->results[i] = newRV_noinc((SV*)av);
1601              
1602 4           state->remaining--;
1603              
1604 4 100         if (state->remaining == 0 && !state->done) {
    50          
1605 2           state->done = true;
1606 2           pxs_all_state_finish_resolved(aTHX_ state);
1607             }
1608             } else {
1609             /* Keep the callback guarded until xspr_promise_then()
1610             successfully takes ownership. */
1611             pxs_callback_guard_t* callback_guard;
1612 18           Newxz(callback_guard, 1, pxs_callback_guard_t);
1613 18           SAVEDESTRUCTOR_X(_pxs_callback_guard_cleanup, callback_guard);
1614              
1615 18           xspr_callback_t* callback = xspr_callback_new_all(aTHX_ state, i);
1616 18           callback_guard->callback = callback;
1617 18           state->refs++; /* callback now owns this ref */
1618 18           xspr_promise_then(aTHX_ input_promise, callback);
1619 18           callback_guard->callback = NULL;
1620 18           xspr_promise_decref(aTHX_ input_promise);
1621             }
1622             }
1623              
1624             /* Release our initial ref; state is kept alive by callbacks */
1625 11           pxs_all_state_decref(aTHX_ state);
1626 11           guard->state = NULL; /* ref-counting owns state now */
1627              
1628             /* _promise_to_sv takes ownership of the refs=1 from create_promise */
1629 11           RETVAL = _promise_to_sv(aTHX_ output);
1630 11           guard->output = NULL; /* RETVAL's DESTROY owns output now */
1631             }
1632             OUTPUT:
1633             RETVAL
1634              
1635             void
1636             then(SV* self_sv, SV* on_resolve = NULL, SV* on_reject = NULL)
1637             PPCODE:
1638 802 100         _CROAK_IF_LOOKS_LIKE_INFINITE_RECURSION;
1639              
1640 799           PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
1641              
1642             xspr_promise_t* next;
1643              
1644 799 50         if (on_resolve == NULL) on_resolve = &PL_sv_undef;
1645 799 100         if (on_reject == NULL) on_reject = &PL_sv_undef;
1646              
1647 799           next = create_next_promise_if_needed(aTHX_ self_sv, &ST(0));
1648              
1649 799           xspr_callback_t* callback = xspr_callback_new_perl(aTHX_ on_resolve, on_reject, next);
1650 799           xspr_promise_then(aTHX_ self->promise, callback);
1651              
1652 799           XSRETURN(next ? 1 : 0);
1653              
1654             void
1655             catch(SV* self_sv, SV* on_reject)
1656             PPCODE:
1657 25 50         _CROAK_IF_LOOKS_LIKE_INFINITE_RECURSION;
1658              
1659 25           PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
1660              
1661 25           xspr_promise_t* next = create_next_promise_if_needed(aTHX_ self_sv, &ST(0));
1662              
1663 25           xspr_callback_t* callback = xspr_callback_new_perl(aTHX_ &PL_sv_undef, on_reject, next);
1664 25           xspr_promise_then(aTHX_ self->promise, callback);
1665              
1666 25           XSRETURN(next ? 1 : 0);
1667              
1668             void
1669             finally(SV* self_sv, SV* on_finally)
1670             PPCODE:
1671 14 50         _CROAK_IF_LOOKS_LIKE_INFINITE_RECURSION;
1672              
1673 14           PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
1674              
1675 14           xspr_promise_t* next = create_next_promise_if_needed(aTHX_ self_sv, &ST(0));
1676              
1677 14           xspr_callback_t* callback = xspr_callback_new_finally(aTHX_ on_finally, next);
1678 14           xspr_promise_then(aTHX_ self->promise, callback);
1679              
1680 14           XSRETURN(next ? 1 : 0);
1681              
1682             void
1683             DESTROY(SV* self_sv)
1684             CODE:
1685 1639           PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
1686             /* fprintf(stderr, "DESTROYing sv=%p, p=%p\n", self_sv, self->promise); */
1687              
1688 1639           _warn_on_destroy_if_needed(aTHX_ self->promise, self_sv);
1689              
1690 1639           xspr_promise_decref(aTHX_ self->promise);
1691 1639           Safefree(self);
1692              
1693             # ----------------------------------------------------------------------
1694             # Future::AsyncAwait interface:
1695             # ----------------------------------------------------------------------
1696              
1697             SV*
1698             AWAIT_NEW_DONE(...)
1699             CODE:
1700             _DO_DEBUG_AWAITABLE();
1701             UNUSED(items);
1702 0           RETVAL = _create_preresolved_promise(aTHX_ &(ST(1)), items - 1, true);
1703             OUTPUT:
1704             RETVAL
1705              
1706             SV*
1707             AWAIT_NEW_FAIL(...)
1708             CODE:
1709             _DO_DEBUG_AWAITABLE();
1710             UNUSED(items);
1711 0           RETVAL = _create_prerejected_promise(aTHX_ &(ST(1)), items - 1, true);
1712             OUTPUT:
1713             RETVAL
1714              
1715             SV*
1716             AWAIT_CLONE(...)
1717             CODE:
1718             _DO_DEBUG_AWAITABLE();
1719             UNUSED(items);
1720              
1721 2           xspr_promise_t* promise_p = create_promise(aTHX);
1722              
1723 2           RETVAL = _promise_to_sv(aTHX_ promise_p);
1724              
1725 2           _IMMORTALIZE_PROMISE_SV(RETVAL, promise_p);
1726              
1727             if (DEBUG_AWAITABLE) {
1728             fprintf(stderr, "# SvREFCNT(RETVAL)=%d\n", SvREFCNT(RETVAL));
1729             fprintf(stderr, "# SvREFCNT(SvRV(RETVAL))=%d\n", SvREFCNT(SvRV(RETVAL)));
1730             sv_dump(RETVAL);
1731             }
1732             OUTPUT:
1733             RETVAL
1734              
1735             void
1736             AWAIT_DONE(SV* self_sv, ...)
1737             CODE:
1738             _DO_DEBUG_AWAITABLE();
1739 0           PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
1740 0           _resolve_promise(aTHX_ self->promise, &ST(1), items - 1);
1741              
1742             void
1743             AWAIT_FAIL(SV* self_sv, ...)
1744             CODE:
1745             _DO_DEBUG_AWAITABLE();
1746 0           PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
1747 0           _reject_promise(aTHX_ NULL, self->promise, &ST(1), items - 1);
1748              
1749             bool
1750             AWAIT_IS_READY(SV *self_sv)
1751             CODE:
1752             _DO_DEBUG_AWAITABLE();
1753 0           PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
1754              
1755 0 0         RETVAL = (self->promise->state != XSPR_STATE_PENDING);
1756             OUTPUT:
1757             RETVAL
1758              
1759             void
1760             AWAIT_GET(SV *self_sv)
1761             PPCODE:
1762             _DO_DEBUG_AWAITABLE();
1763 0           DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);
1764              
1765 0 0         ASSUME(self->promise->state == XSPR_STATE_FINISHED);
1766              
1767 0           SV** results = self->promise->finished.result->results;
1768 0           int result_count = self->promise->finished.result->count;
1769              
1770 0 0         if (RESULT_IS_RESOLVED(self->promise->finished.result)) {
1771             int i;
1772              
1773 0 0         if (!result_count) XSRETURN_EMPTY;
1774              
1775 0           switch (GIMME_V) {
1776              
1777 0           case G_ARRAY:
1778 0 0         EXTEND(SP, result_count);
    0          
1779              
1780 0 0         for (i=0; i
1781 0           PUSHs( sv_2mortal( newSVsv(results[i]) ) );
1782             }
1783              
1784 0           XSRETURN(result_count);
1785              
1786 0           case G_SCALAR:
1787 0 0         EXTEND(SP, 1);
1788 0           PUSHs( sv_2mortal( newSVsv(results[0]) ) );
1789 0           XSRETURN(1);
1790              
1791 0           case G_VOID:
1792 0           XSRETURN_EMPTY;
1793              
1794 0           default:
1795 0           ASSUME(0);
1796             }
1797             }
1798             else {
1799             SV* err;
1800 0 0         if (result_count) {
1801 0           err = sv_2mortal( newSVsv( results[0] ) );
1802             }
1803             else {
1804 0           err = &PL_sv_undef;
1805             }
1806              
1807 0           croak_sv(err);
1808             }
1809              
1810             void
1811             AWAIT_CHAIN_CANCEL(...)
1812             CODE:
1813             _DO_DEBUG_AWAITABLE();
1814             UNUSED(items);
1815              
1816             void
1817             AWAIT_ON_CANCEL(...)
1818             CODE:
1819             _DO_DEBUG_AWAITABLE();
1820             UNUSED(items);
1821              
1822             UV
1823             AWAIT_IS_CANCELLED(...)
1824             CODE:
1825             _DO_DEBUG_AWAITABLE();
1826             UNUSED(items);
1827 0 0         RETVAL = 0;
1828             OUTPUT:
1829             RETVAL
1830              
1831             void
1832             AWAIT_ON_READY(SV *self_sv, SV* coderef)
1833             CODE:
1834             _DO_DEBUG_AWAITABLE();
1835 0           PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
1836              
1837 0           self->promise->on_ready_immediate = coderef;
1838 0           SvREFCNT_inc(coderef);
1839 0           SvREFCNT_inc(SvRV(coderef));
1840              
1841             void
1842             AWAIT_WAIT(SV* self_sv)
1843             PPCODE:
1844             _DO_DEBUG_AWAITABLE();
1845             dMY_CXT;
1846              
1847 0           switch (MY_CXT.event_system) {
1848 0           case _DEFER_ANYEVENT:
1849 0           _anyevent_wait_promise(aTHX_ self_sv);
1850 0           break;
1851              
1852 0           case _DEFER_IOASYNC:
1853 0           _ioasync_wait_promise(aTHX_ self_sv, MY_CXT.deferral_arg, MY_CXT.stop_cr);
1854 0           break;
1855              
1856 0           case _DEFER_MOJO:
1857 0           _mojo_wait_promise(aTHX_ self_sv, MY_CXT.stop_cr);
1858 0           break;
1859              
1860 0           default:
1861 0           croak(BASE_CLASS ": No event loop set up! Did you forget to call use_event()?");
1862             }
1863              
1864 0 0         PUSHMARK(SP);
1865              
1866 0           int count = call_method("AWAIT_GET", GIMME_V);
1867 0           XSRETURN(count);