File Coverage

Future.xs
Criterion Covered Total %
statement 400 406 98.5
branch 267 372 71.7
condition n/a
subroutine n/a
pod n/a
total 667 778 85.7


line stmt bran cond sub pod time code
1             #define PERL_NO_GET_CONTEXT
2             #include "EXTERN.h"
3             #include "perl.h"
4             #include "XSUB.h"
5              
6             #include "EVAPI.h"
7              
8             #define IS_PVCV(sv) (sv && SvROK(sv) && SvTYPE(SvRV(sv)) == SVt_PVCV)
9              
10             typedef struct {
11             AV *tasks;
12             SV *final_cb;
13             I32 remaining;
14             CV **cvs;
15             I32 num_cvs;
16             CV *shared_cv; /* for unsafe mode */
17             int *is_freed_ptr;
18             } parallel_ctx;
19              
20             typedef struct {
21             AV *tasks;
22             SV *final_cb;
23             I32 current_idx;
24             int running;
25             int delayed;
26             I32 total_tasks;
27             int unsafe;
28             CV *current_cv;
29             int *is_freed_ptr;
30             } series_ctx;
31              
32             typedef struct {
33             AV *tasks;
34             SV *final_cb;
35             I32 remaining;
36             I32 current_idx;
37             I32 total_tasks;
38             I32 limit;
39             I32 active;
40             int unsafe;
41             int running;
42             int delayed;
43             CV **cvs;
44             I32 num_cvs;
45             CV *shared_cv;
46             int *is_freed_ptr;
47             } plimit_ctx;
48              
49 23           static void parallel_cleanup(pTHX_ parallel_ctx **ctx_ptr) {
50 23 50         if (!ctx_ptr || !*ctx_ptr) return;
    50          
51 23           parallel_ctx *ctx = *ctx_ptr;
52 23           *ctx_ptr = NULL;
53              
54 23 100         if (ctx->is_freed_ptr) {
55 20           *(ctx->is_freed_ptr) = 1;
56             }
57              
58 23 100         if (ctx->shared_cv) {
59 3           CvXSUBANY(ctx->shared_cv).any_ptr = NULL;
60 3           SvREFCNT_dec((SV*)ctx->shared_cv);
61             }
62 23 100         if (ctx->cvs) {
63             I32 i;
64 6053 100         for (i = 0; i < ctx->num_cvs; i++) {
65 6033 100         if (ctx->cvs[i]) {
66 6023           CvXSUBANY(ctx->cvs[i]).any_ptr = NULL;
67 6023           SvREFCNT_dec((SV*)ctx->cvs[i]);
68             }
69             }
70 20           Safefree(ctx->cvs);
71             }
72 23 50         if (ctx->tasks) SvREFCNT_dec((SV*)ctx->tasks);
73 23 50         if (ctx->final_cb) SvREFCNT_dec(ctx->final_cb);
74 23           Safefree(ctx);
75             }
76              
77 6125           static void parallel_task_done(pTHX_ CV *cv) {
78 6125           dXSARGS;
79 6125 50         SvREFCNT_inc_simple_void(cv);
80 6125           sv_2mortal((SV*)cv);
81 6125           parallel_ctx *ctx = (parallel_ctx *)CvXSUBANY(cv).any_ptr;
82 6125 100         if (!ctx) {
83 2           XSRETURN_EMPTY;
84             }
85              
86 6123 100         if (ctx->cvs) {
87 6019           CvXSUBANY(cv).any_ptr = NULL;
88             }
89              
90 6123 100         if (--ctx->remaining <= 0) {
91 16           SV *cb = ctx->final_cb;
92 30 50         if (IS_PVCV(cb)) {
    100          
    50          
93 15           dSP;
94 15           ENTER;
95 15           SAVETMPS;
96 15           SvREFCNT_inc(cb);
97 15           sv_2mortal(cb);
98 15 50         PUSHMARK(SP);
99 15           PUTBACK;
100              
101 15           parallel_cleanup(aTHX_ &ctx);
102              
103 15           call_sv(cb, G_DISCARD | G_VOID);
104              
105 14 50         FREETMPS;
106 14           LEAVE;
107             } else {
108 1           parallel_cleanup(aTHX_ &ctx);
109             }
110             }
111 6122           XSRETURN_EMPTY;
112             }
113              
114 30           static void series_cleanup(pTHX_ series_ctx **ctx_ptr) {
115 30 50         if (!ctx_ptr || !*ctx_ptr) return;
    50          
116 30           series_ctx *ctx = *ctx_ptr;
117 30           *ctx_ptr = NULL;
118              
119 30 100         if (ctx->is_freed_ptr) {
120 26           *(ctx->is_freed_ptr) = 1;
121             }
122              
123 30 100         if (ctx->current_cv) {
124 8           CvXSUBANY(ctx->current_cv).any_ptr = NULL;
125 8           SvREFCNT_dec((SV*)ctx->current_cv);
126             }
127 30 50         if (ctx->tasks) SvREFCNT_dec((SV*)ctx->tasks);
128 30 50         if (ctx->final_cb) SvREFCNT_dec(ctx->final_cb);
129 30           Safefree(ctx);
130             }
131              
132             static void _series_next(pTHX_ series_ctx **ctx_ptr);
133              
134 5038           static void series_next_cb(pTHX_ CV *cv) {
135 5038           dXSARGS;
136 5038 50         SvREFCNT_inc_simple_void(cv);
137 5038           sv_2mortal((SV*)cv);
138 5038           series_ctx *ctx = (series_ctx *)CvXSUBANY(cv).any_ptr;
139 5038 100         if (!ctx) {
140 3           XSRETURN_EMPTY;
141             }
142              
143 5035 100         if (!ctx->unsafe) {
144 5030           CvXSUBANY(cv).any_ptr = NULL;
145 5030 50         if (ctx->current_cv == cv) {
146 5030           ctx->current_cv = NULL;
147 5030           SvREFCNT_dec((SV*)cv);
148             }
149             }
150              
151 5035 100         if (items > 0 && SvTRUE(ST(0))) {
    50          
152 3           ctx->current_idx = ctx->total_tasks;
153             }
154 5035           _series_next(aTHX_ &ctx);
155 5035           XSRETURN_EMPTY;
156             }
157              
158 5066           static void _series_next(pTHX_ series_ctx **ctx_ptr) {
159 5066 50         if (!ctx_ptr || !*ctx_ptr) return;
    50          
160 5066           series_ctx *ctx = *ctx_ptr;
161              
162 5066 100         if (ctx->running) {
163 5027           ctx->delayed = 1;
164 5027           return;
165             }
166              
167 39           ctx->running = 1;
168 39           ctx->delayed = 1;
169              
170 5086 100         while (ctx->delayed) {
171 5077           ctx->delayed = 0;
172 5077 50         SV **task_ary = (AvREAL(ctx->tasks) && !SvMAGICAL(ctx->tasks)) ? AvARRAY(ctx->tasks) : NULL;
    100          
173 5077 100         if (ctx->current_idx >= ctx->total_tasks) {
174 25           SV *cb = ctx->final_cb;
175 47 50         if (IS_PVCV(cb)) {
    100          
    50          
176 24           dSP;
177 24           ENTER;
178 24           SAVETMPS;
179 24           SvREFCNT_inc(cb);
180 24           sv_2mortal(cb);
181 24 50         PUSHMARK(SP);
182 24           PUTBACK;
183 24           series_cleanup(aTHX_ ctx_ptr);
184 24           call_sv(cb, G_DISCARD | G_VOID);
185 22 50         FREETMPS;
186 22           LEAVE;
187             } else {
188 1           series_cleanup(aTHX_ ctx_ptr);
189             }
190 23           return;
191             }
192              
193 5049 50         SV **fetch_ptr = (task_ary && ctx->current_idx <= AvFILL(ctx->tasks))
    50          
194 5049           ? &task_ary[ctx->current_idx]
195 10101 100         : av_fetch(ctx->tasks, ctx->current_idx, 0);
196 5052 50         SV *task_sv = fetch_ptr ? *fetch_ptr : NULL;
197              
198 5052           ctx->current_idx++;
199              
200 10088 100         if (IS_PVCV(task_sv)) {
    100          
    50          
201 5041 100         if (!ctx->unsafe) {
202 5036 50         if (ctx->current_cv) {
203 0           CvXSUBANY(ctx->current_cv).any_ptr = NULL;
204 0           SvREFCNT_dec((SV*)ctx->current_cv);
205             }
206 5036           CV *cv = newXS(NULL, series_next_cb, __FILE__);
207 5036           CvXSUBANY(cv).any_ptr = ctx;
208 5036           ctx->current_cv = (CV*)SvREFCNT_inc((SV*)cv);
209 5 100         } else if (!ctx->current_cv) {
210 3           ctx->current_cv = newXS(NULL, series_next_cb, __FILE__);
211 3           CvXSUBANY(ctx->current_cv).any_ptr = ctx;
212             }
213              
214 5041           SV *next_rv = NULL;
215 5041           dSP;
216 5041           ENTER;
217 5041           SAVETMPS;
218 5041 100         if (!ctx->unsafe) {
219 5036           next_rv = sv_2mortal(newRV_noinc((SV*)ctx->current_cv));
220             } else {
221 5           next_rv = sv_2mortal(newRV_inc((SV*)ctx->current_cv));
222             }
223 5041 50         PUSHMARK(SP);
224 5041 50         XPUSHs(next_rv);
225 5041           PUTBACK;
226            
227 5041 100         U32 flags = G_DISCARD | (ctx->unsafe ? 0 : G_EVAL);
228 5041           call_sv(task_sv, flags);
229              
230 5041 100         if (!ctx->unsafe) {
231 5036           SPAGAIN;
232 5036 50         if (SvTRUE(ERRSV)) {
    100          
233 5 50         SV *err = sv_mortalcopy(ERRSV);
234 5           series_cleanup(aTHX_ ctx_ptr);
235 5           croak_sv(err);
236             }
237             }
238              
239 5036 50         FREETMPS;
240 5036           LEAVE;
241 5036 50         if (!*ctx_ptr) return;
242             } else {
243 11           ctx->delayed = 1;
244             }
245             }
246              
247 9 50         if (*ctx_ptr) (*ctx_ptr)->running = 0;
248             }
249              
250 24           static void plimit_cleanup(pTHX_ plimit_ctx **ctx_ptr) {
251 24 50         if (!ctx_ptr || !*ctx_ptr) return;
    50          
252 24           plimit_ctx *ctx = *ctx_ptr;
253 24           *ctx_ptr = NULL;
254              
255 24 100         if (ctx->is_freed_ptr) {
256 21           *(ctx->is_freed_ptr) = 1;
257             }
258              
259 24 100         if (ctx->shared_cv) {
260 3           CvXSUBANY(ctx->shared_cv).any_ptr = NULL;
261 3           SvREFCNT_dec((SV*)ctx->shared_cv);
262             }
263 24 100         if (ctx->cvs) {
264             I32 i;
265 1070 100         for (i = 0; i < ctx->num_cvs; i++) {
266 1049 100         if (ctx->cvs[i]) {
267 1037           CvXSUBANY(ctx->cvs[i]).any_ptr = NULL;
268 1037           SvREFCNT_dec((SV*)ctx->cvs[i]);
269             }
270             }
271 21           Safefree(ctx->cvs);
272             }
273 24 50         if (ctx->tasks) SvREFCNT_dec((SV*)ctx->tasks);
274 24 50         if (ctx->final_cb) SvREFCNT_dec(ctx->final_cb);
275 24           Safefree(ctx);
276             }
277              
278             static void _plimit_dispatch(pTHX_ plimit_ctx **ctx_ptr);
279              
280 1140           static void plimit_task_done(pTHX_ CV *cv) {
281 1140           dXSARGS;
282 1140 50         SvREFCNT_inc_simple_void(cv);
283 1140           sv_2mortal((SV*)cv);
284 1140           plimit_ctx *ctx = (plimit_ctx *)CvXSUBANY(cv).any_ptr;
285 1140 100         if (!ctx) {
286 2           XSRETURN_EMPTY;
287             }
288              
289 1138 100         if (ctx->cvs) {
290 1033           CvXSUBANY(cv).any_ptr = NULL;
291             }
292              
293 1138           ctx->active--;
294              
295 1138 100         if (--ctx->remaining <= 0) {
296 16           SV *cb = ctx->final_cb;
297 30 50         if (IS_PVCV(cb)) {
    100          
    50          
298 15           dSP;
299 15           ENTER;
300 15           SAVETMPS;
301 15           SvREFCNT_inc(cb);
302 15           sv_2mortal(cb);
303 15 50         PUSHMARK(SP);
304 15           PUTBACK;
305              
306 15           plimit_cleanup(aTHX_ &ctx);
307              
308 15           call_sv(cb, G_DISCARD | G_VOID);
309              
310 14 50         FREETMPS;
311 14           LEAVE;
312             } else {
313 1           plimit_cleanup(aTHX_ &ctx);
314             }
315             } else {
316 1122           _plimit_dispatch(aTHX_ &ctx);
317             }
318 1137           XSRETURN_EMPTY;
319             }
320              
321 1146           static void _plimit_dispatch(pTHX_ plimit_ctx **ctx_ptr) {
322 2256 50         if (!ctx_ptr || !*ctx_ptr) return;
    50          
323 1146           plimit_ctx *ctx = *ctx_ptr;
324              
325 1146 100         if (ctx->running) {
326 1110           ctx->delayed = 1;
327 1110           return;
328             }
329              
330 36           int is_freed = 0;
331 36           int *old_is_freed_ptr = ctx->is_freed_ptr;
332 36           ctx->is_freed_ptr = &is_freed;
333              
334 36           ctx->running = 1;
335 36           ctx->delayed = 1;
336              
337 51 100         while (ctx->delayed) {
338 36           ctx->delayed = 0;
339              
340 1169 50         while (!is_freed && ctx->active < ctx->limit && ctx->current_idx < ctx->total_tasks) {
    100          
    100          
341 1154 50         SV **task_ary = (AvREAL(ctx->tasks) && !SvMAGICAL(ctx->tasks)) ? AvARRAY(ctx->tasks) : NULL;
    100          
342 1151 50         SV **fetch_ptr = (task_ary && ctx->current_idx <= AvFILL(ctx->tasks))
    50          
343 1151           ? &task_ary[ctx->current_idx]
344 2305 100         : av_fetch(ctx->tasks, ctx->current_idx, 0);
345 1154 50         SV *task_sv = fetch_ptr ? *fetch_ptr : NULL;
346              
347 1154           ctx->current_idx++;
348              
349 2279 100         if (IS_PVCV(task_sv)) {
    100          
    50          
350 1142           SV *done_rv = NULL;
351 1142           CV *cv = NULL;
352 1142           dSP;
353              
354 1142 100         if (!ctx->unsafe) {
355 1037           cv = newXS(NULL, plimit_task_done, __FILE__);
356 1037           CvXSUBANY(cv).any_ptr = ctx;
357 1037           ctx->cvs[ctx->current_idx - 1] = (CV*)SvREFCNT_inc((SV*)cv);
358             }
359              
360 1142           ctx->active++;
361 1142           int task_unsafe = ctx->unsafe;
362              
363 1142           ENTER;
364 1142           SAVETMPS;
365 1142 100         if (!ctx->unsafe) {
366 1037           done_rv = sv_2mortal(newRV_noinc((SV*)cv));
367             } else {
368 105           done_rv = sv_2mortal(newRV_inc((SV*)ctx->shared_cv));
369             }
370 1142 50         PUSHMARK(SP);
371 1142 50         XPUSHs(done_rv);
372 1142           PUTBACK;
373              
374 1142 100         U32 flags = G_DISCARD | (task_unsafe ? 0 : G_EVAL);
375 1142           call_sv(task_sv, flags);
376              
377 1142 100         if (!task_unsafe) {
378 1037           SPAGAIN;
379 1037 50         if (SvTRUE(ERRSV)) {
    100          
380 5 50         SV *err = sv_mortalcopy(ERRSV);
381 5 100         if (!is_freed) {
382 4           plimit_cleanup(aTHX_ ctx_ptr);
383             }
384 5 50         if (old_is_freed_ptr) *old_is_freed_ptr = 1;
385 5           croak_sv(err);
386             }
387             }
388              
389 1137 50         FREETMPS;
390 1137           LEAVE;
391 1137 100         if (is_freed) goto done;
392             } else {
393 12 100         if (--ctx->remaining <= 0) {
394 4           SV *cb = ctx->final_cb;
395 7 50         if (IS_PVCV(cb)) {
    50          
    50          
396 4           dSP;
397 4           ENTER;
398 4           SAVETMPS;
399 4           SvREFCNT_inc(cb);
400 4           sv_2mortal(cb);
401 4 50         PUSHMARK(SP);
402 4           PUTBACK;
403 4           plimit_cleanup(aTHX_ ctx_ptr);
404 4 50         if (old_is_freed_ptr) *old_is_freed_ptr = 1;
405 4           call_sv(cb, G_DISCARD | G_VOID);
406 3 50         FREETMPS;
407 3           LEAVE;
408             } else {
409 0           plimit_cleanup(aTHX_ ctx_ptr);
410 0 0         if (old_is_freed_ptr) *old_is_freed_ptr = 1;
411             }
412 3           goto done;
413             }
414 8           ctx->delayed = 1;
415             }
416             }
417             }
418              
419 15 50         if (!is_freed) {
420 15           ctx->is_freed_ptr = old_is_freed_ptr;
421 15           ctx->running = 0;
422             }
423 0           done:
424 30 100         if (is_freed && old_is_freed_ptr) {
    50          
425 15           *old_is_freed_ptr = 1;
426             }
427             }
428              
429             MODULE = EV::Future PACKAGE = EV::Future
430              
431             PROTOTYPES: DISABLE
432              
433             BOOT:
434 6 50         I_EV_API ("EV::Future");
    50          
    50          
435              
436             void
437             parallel(tasks, final_cb, ...)
438             AV *tasks
439             SV *final_cb
440             CODE:
441 25           int unsafe = 0;
442 25 100         if (items > 2 && SvTRUE(ST(2))) unsafe = 1;
    50          
443              
444 25           I32 len = av_len(tasks) + 1;
445 25 100         if (len <= 0) {
446 1 50         if (IS_PVCV(final_cb)) {
    50          
    50          
447 1           dSP;
448 1           ENTER;
449 1           SAVETMPS;
450 1 50         PUSHMARK(SP);
451 1           PUTBACK;
452 1           call_sv(final_cb, G_DISCARD | G_VOID);
453 1 50         FREETMPS;
454 1           LEAVE;
455             }
456 1           return;
457             }
458              
459 24           int is_freed = 0;
460             parallel_ctx *ctx;
461 24           Newx(ctx, 1, parallel_ctx);
462 24           ctx->is_freed_ptr = &is_freed;
463 24           ctx->tasks = (AV*)SvREFCNT_inc((SV*)tasks);
464 24           ctx->final_cb = SvREFCNT_inc(final_cb);
465 24           ctx->remaining = len;
466 24           ctx->cvs = NULL;
467 24           ctx->num_cvs = 0;
468 24           ctx->shared_cv = NULL;
469              
470 24           SV *done_rv = NULL;
471 24 100         if (unsafe) {
472 3           ctx->shared_cv = newXS(NULL, parallel_task_done, __FILE__);
473 3           CvXSUBANY(ctx->shared_cv).any_ptr = ctx;
474 3           done_rv = sv_2mortal(newRV_inc((SV*)ctx->shared_cv));
475             } else {
476 21           ctx->num_cvs = len;
477 21           Newxz(ctx->cvs, len, CV*);
478             }
479              
480 24           dSP;
481              
482             I32 i;
483 24 100         U32 flags = G_DISCARD | (unsafe ? 0 : G_EVAL);
484              
485 6155 100         for (i = 0; i < len; i++) {
486 6139 50         if (is_freed) break;
487              
488 6139 50         SV **task_ary = (AvREAL(tasks) && !SvMAGICAL(tasks)) ? AvARRAY(tasks) : NULL;
    100          
489 6136 50         SV **fetch_ptr = (task_ary && i <= AvFILL(tasks))
    100          
490 6135           ? &task_ary[i]
491 12275 100         : av_fetch(tasks, i, 0);
492 6139 100         SV *task_sv = fetch_ptr ? *fetch_ptr : NULL;
493              
494 12263 100         if (IS_PVCV(task_sv)) {
    100          
    50          
495 6128           CV *cv = NULL;
496 6128 100         if (!unsafe) {
497 6024           cv = newXS(NULL, parallel_task_done, __FILE__);
498 6024           CvXSUBANY(cv).any_ptr = ctx;
499 6024           ctx->cvs[i] = (CV*)SvREFCNT_inc((SV*)cv);
500             }
501              
502 6128           ENTER;
503 6128           SAVETMPS;
504 6128 100         if (!unsafe) {
505 6024           done_rv = sv_2mortal(newRV_noinc((SV*)cv));
506             }
507 6128 50         PUSHMARK(SP);
508 6128 50         XPUSHs(done_rv);
509 6128           PUTBACK;
510              
511 6128           call_sv(task_sv, flags);
512 6128 100         if (!unsafe) {
513 6024           SPAGAIN;
514 6024 50         if (SvTRUE(ERRSV)) {
    100          
515 4 50         SV *err = sv_mortalcopy(ERRSV);
516 4 100         if (!is_freed) {
517 3           parallel_cleanup(aTHX_ &ctx);
518             }
519 4           croak_sv(err);
520             }
521             }
522            
523 6124 100         FREETMPS;
524 6124           LEAVE;
525             } else {
526 11 100         if (--ctx->remaining <= 0) {
527 4           SV *cb = ctx->final_cb;
528 4 50         if (IS_PVCV(cb)) {
    50          
    50          
529 4           ENTER;
530 4           SAVETMPS;
531 4           SvREFCNT_inc(cb);
532 4           sv_2mortal(cb);
533 4 50         PUSHMARK(SP);
534 4           PUTBACK;
535 4           parallel_cleanup(aTHX_ &ctx);
536 4           call_sv(cb, G_DISCARD | G_VOID);
537 3 50         FREETMPS;
538 3           LEAVE;
539             } else {
540 0           parallel_cleanup(aTHX_ &ctx);
541             }
542 3           break;
543             }
544             }
545             }
546 19 100         if (!is_freed && ctx) {
    50          
547 4           ctx->is_freed_ptr = NULL;
548             }
549              
550             void
551             series(tasks, final_cb, ...)
552             AV *tasks
553             SV *final_cb
554             CODE:
555 32           int unsafe = 0;
556 32 100         if (items > 2 && SvTRUE(ST(2))) unsafe = 1;
    50          
557              
558 32           I32 len = av_len(tasks) + 1;
559 32 100         if (len <= 0) {
560 1 50         if (IS_PVCV(final_cb)) {
    50          
    50          
561 1           dSP;
562 1           ENTER;
563 1           SAVETMPS;
564 1 50         PUSHMARK(SP);
565 1           PUTBACK;
566 1           call_sv(final_cb, G_DISCARD | G_VOID);
567 1 50         FREETMPS;
568 1           LEAVE;
569             }
570 1           return;
571             }
572              
573 31           int is_freed = 0;
574             series_ctx *ctx;
575 31           Newx(ctx, 1, series_ctx);
576 31           ctx->is_freed_ptr = &is_freed;
577 31           ctx->tasks = (AV*)SvREFCNT_inc((SV*)tasks);
578 31           ctx->final_cb = SvREFCNT_inc(final_cb);
579 31           ctx->current_idx = 0;
580 31           ctx->running = 0;
581 31           ctx->delayed = 0;
582 31           ctx->total_tasks = len;
583 31           ctx->unsafe = unsafe;
584 31           ctx->current_cv = NULL;
585              
586 31           _series_next(aTHX_ &ctx);
587 24 100         if (!is_freed && ctx) {
    50          
588 5           ctx->is_freed_ptr = NULL;
589             }
590              
591             void
592             parallel_limit(tasks, limit, final_cb, ...)
593             AV *tasks
594             I32 limit
595             SV *final_cb
596             CODE:
597 25           int unsafe = 0;
598 25 100         if (items > 3 && SvTRUE(ST(3))) unsafe = 1;
    50          
599              
600 25           I32 len = av_len(tasks) + 1;
601 25 100         if (len <= 0) {
602 1 50         if (IS_PVCV(final_cb)) {
    50          
    50          
603 1           dSP;
604 1           ENTER;
605 1           SAVETMPS;
606 1 50         PUSHMARK(SP);
607 1           PUTBACK;
608 1           call_sv(final_cb, G_DISCARD | G_VOID);
609 1 50         FREETMPS;
610 1           LEAVE;
611             }
612 1           return;
613             }
614              
615 24 100         if (limit < 1) limit = 1;
616 24 100         if (limit > len) limit = len;
617              
618 24           int is_freed = 0;
619             plimit_ctx *ctx;
620 24           Newx(ctx, 1, plimit_ctx);
621 24           ctx->is_freed_ptr = &is_freed;
622 24           ctx->tasks = (AV*)SvREFCNT_inc((SV*)tasks);
623 24           ctx->final_cb = SvREFCNT_inc(final_cb);
624 24           ctx->remaining = len;
625 24           ctx->current_idx = 0;
626 24           ctx->total_tasks = len;
627 24           ctx->limit = limit;
628 24           ctx->active = 0;
629 24           ctx->unsafe = unsafe;
630 24           ctx->running = 0;
631 24           ctx->delayed = 0;
632 24           ctx->cvs = NULL;
633 24           ctx->num_cvs = 0;
634 24           ctx->shared_cv = NULL;
635              
636 24 100         if (unsafe) {
637 3           ctx->shared_cv = newXS(NULL, plimit_task_done, __FILE__);
638 3           CvXSUBANY(ctx->shared_cv).any_ptr = ctx;
639             } else {
640 21           ctx->num_cvs = len;
641 21           Newxz(ctx->cvs, len, CV*);
642             }
643              
644 24           _plimit_dispatch(aTHX_ &ctx);
645 18 100         if (!is_freed && ctx) {
    50          
646 3           ctx->is_freed_ptr = NULL;
647             }