File Coverage

Future.xs
Criterion Covered Total %
statement 419 425 98.5
branch 260 362 71.8
condition n/a
subroutine n/a
pod n/a
total 679 787 86.2


line stmt bran cond sub pod time code
1             #define PERL_NO_GET_CONTEXT
2             #include "EXTERN.h"
3             #include "perl.h"
4             #include "XSUB.h"
5              
6             #include "EVAPI.h"
7              
8             #define IS_PVCV(sv) (sv && SvROK(sv) && SvTYPE(SvRV(sv)) == SVt_PVCV)
9              
10             typedef struct {
11             AV *tasks;
12             SV *final_cb;
13             I32 remaining;
14             CV **cvs;
15             I32 num_cvs;
16             CV *shared_cv; /* for unsafe mode */
17             int *is_freed_ptr;
18             } parallel_ctx;
19              
20             /* Guard registered with SAVEDESTRUCTOR_X so that, on any unwind (including
21             uncaught croak from unsafe-mode tasks), we clear ctx->is_freed_ptr and free
22             the heap-allocated is_freed flag. Prevents writes to a dead stack slot when
23             async tasks complete after the XSUB has unwound. */
24             typedef struct {
25             int **target_field; /* &ctx->is_freed_ptr */
26             int *is_freed;
27             } ifp_guard;
28              
29 82           static void ifp_guard_destroy(pTHX_ void *p) {
30 82           ifp_guard *g = (ifp_guard *)p;
31 82 100         if (!*(g->is_freed)) {
32             /* cleanup didn't run — ctx still alive, safe to clear field */
33 15           *(g->target_field) = NULL;
34             }
35 82           Safefree(g->is_freed);
36 82           Safefree(g);
37 82           }
38              
39             typedef struct {
40             AV *tasks;
41             SV *final_cb;
42             I32 current_idx;
43             int running;
44             int delayed;
45             I32 total_tasks;
46             int unsafe;
47             CV *current_cv;
48             int *is_freed_ptr;
49             } series_ctx;
50              
51             typedef struct {
52             AV *tasks;
53             SV *final_cb;
54             I32 remaining;
55             I32 current_idx;
56             I32 total_tasks;
57             I32 limit;
58             I32 active;
59             int unsafe;
60             int running;
61             int delayed;
62             CV **cvs;
63             I32 num_cvs;
64             CV *shared_cv;
65             int *is_freed_ptr;
66             } plimit_ctx;
67              
68 24           static void parallel_cleanup(pTHX_ parallel_ctx **ctx_ptr) {
69 24 50         if (!ctx_ptr || !*ctx_ptr) return;
    50          
70 24           parallel_ctx *ctx = *ctx_ptr;
71 24           *ctx_ptr = NULL;
72              
73 24 100         if (ctx->is_freed_ptr) {
74 20           *(ctx->is_freed_ptr) = 1;
75             }
76              
77 24 100         if (ctx->shared_cv) {
78 4           CvXSUBANY(ctx->shared_cv).any_ptr = NULL;
79 4           SvREFCNT_dec((SV*)ctx->shared_cv);
80             }
81 24 100         if (ctx->cvs) {
82             I32 i;
83 6053 100         for (i = 0; i < ctx->num_cvs; i++) {
84 6033 100         if (ctx->cvs[i]) {
85 6023           CvXSUBANY(ctx->cvs[i]).any_ptr = NULL;
86 6023           SvREFCNT_dec((SV*)ctx->cvs[i]);
87             }
88             }
89 20           Safefree(ctx->cvs);
90             }
91 24 50         if (ctx->tasks) SvREFCNT_dec((SV*)ctx->tasks);
92 24 50         if (ctx->final_cb) SvREFCNT_dec(ctx->final_cb);
93 24           Safefree(ctx);
94             }
95              
96 6127           static void parallel_task_done(pTHX_ CV *cv) {
97 6127           dXSARGS;
98 6127 50         SvREFCNT_inc_simple_void(cv);
99 6127           sv_2mortal((SV*)cv);
100 6127           parallel_ctx *ctx = (parallel_ctx *)CvXSUBANY(cv).any_ptr;
101 6127 100         if (!ctx) {
102 2           XSRETURN_EMPTY;
103             }
104              
105 6125 100         if (ctx->cvs) {
106 6019           CvXSUBANY(cv).any_ptr = NULL;
107             }
108              
109 6125 100         if (--ctx->remaining <= 0) {
110 17           SV *cb = ctx->final_cb;
111 32 50         if (IS_PVCV(cb)) {
    100          
    50          
112 16           dSP;
113 16           ENTER;
114 16           SAVETMPS;
115 16           SvREFCNT_inc(cb);
116 16           sv_2mortal(cb);
117 16 50         PUSHMARK(SP);
118 16           PUTBACK;
119              
120 16           parallel_cleanup(aTHX_ &ctx);
121              
122 16           call_sv(cb, G_DISCARD | G_VOID);
123              
124 15 50         FREETMPS;
125 15           LEAVE;
126             } else {
127 1           parallel_cleanup(aTHX_ &ctx);
128             }
129             }
130 6124           XSRETURN_EMPTY;
131             }
132              
133 30           static void series_cleanup(pTHX_ series_ctx **ctx_ptr) {
134 30 50         if (!ctx_ptr || !*ctx_ptr) return;
    50          
135 30           series_ctx *ctx = *ctx_ptr;
136 30           *ctx_ptr = NULL;
137              
138 30 100         if (ctx->is_freed_ptr) {
139 26           *(ctx->is_freed_ptr) = 1;
140             }
141              
142 30 100         if (ctx->current_cv) {
143 8           CvXSUBANY(ctx->current_cv).any_ptr = NULL;
144 8           SvREFCNT_dec((SV*)ctx->current_cv);
145             }
146 30 50         if (ctx->tasks) SvREFCNT_dec((SV*)ctx->tasks);
147 30 50         if (ctx->final_cb) SvREFCNT_dec(ctx->final_cb);
148 30           Safefree(ctx);
149             }
150              
151             static void _series_next(pTHX_ series_ctx **ctx_ptr);
152              
153 5039           static void series_next_cb(pTHX_ CV *cv) {
154 5039           dXSARGS;
155 5039 50         SvREFCNT_inc_simple_void(cv);
156 5039           sv_2mortal((SV*)cv);
157 5039           series_ctx *ctx = (series_ctx *)CvXSUBANY(cv).any_ptr;
158 5039 100         if (!ctx) {
159 3           XSRETURN_EMPTY;
160             }
161              
162 5036 100         if (!ctx->unsafe) {
163 5030           CvXSUBANY(cv).any_ptr = NULL;
164 5030 50         if (ctx->current_cv == cv) {
165 5030           ctx->current_cv = NULL;
166 5030           SvREFCNT_dec((SV*)cv);
167             }
168             }
169              
170 5036 100         if (items > 0 && SvTRUE(ST(0))) {
    50          
171 3           ctx->current_idx = ctx->total_tasks;
172             }
173 5036           _series_next(aTHX_ &ctx);
174 5036           XSRETURN_EMPTY;
175             }
176              
177 5068           static void _series_next(pTHX_ series_ctx **ctx_ptr) {
178 5068 50         if (!ctx_ptr || !*ctx_ptr) return;
    50          
179 5068           series_ctx *ctx = *ctx_ptr;
180              
181 5068 100         if (ctx->running) {
182 5028           ctx->delayed = 1;
183 5028           return;
184             }
185              
186 40           ctx->running = 1;
187 40           ctx->delayed = 1;
188              
189 5087 100         while (ctx->delayed) {
190 5078           ctx->delayed = 0;
191 5078 50         SV **task_ary = (AvREAL(ctx->tasks) && !SvMAGICAL(ctx->tasks)) ? AvARRAY(ctx->tasks) : NULL;
    100          
192 5078 100         if (ctx->current_idx >= ctx->total_tasks) {
193 25           SV *cb = ctx->final_cb;
194 47 50         if (IS_PVCV(cb)) {
    100          
    50          
195 24           dSP;
196 24           ENTER;
197 24           SAVETMPS;
198 24           SvREFCNT_inc(cb);
199 24           sv_2mortal(cb);
200 24 50         PUSHMARK(SP);
201 24           PUTBACK;
202 24           series_cleanup(aTHX_ ctx_ptr);
203 24           call_sv(cb, G_DISCARD | G_VOID);
204 22 50         FREETMPS;
205 22           LEAVE;
206             } else {
207 1           series_cleanup(aTHX_ ctx_ptr);
208             }
209 23           return;
210             }
211              
212 5050 50         SV **fetch_ptr = (task_ary && ctx->current_idx <= AvFILL(ctx->tasks))
    50          
213 5050           ? &task_ary[ctx->current_idx]
214 10103 100         : av_fetch(ctx->tasks, ctx->current_idx, 0);
215 5053 50         SV *task_sv = fetch_ptr ? *fetch_ptr : NULL;
216              
217 5053           ctx->current_idx++;
218              
219 10089 100         if (IS_PVCV(task_sv)) {
    100          
    50          
220 5042 100         if (!ctx->unsafe) {
221 5036 50         if (ctx->current_cv) {
222 0           CvXSUBANY(ctx->current_cv).any_ptr = NULL;
223 0           SvREFCNT_dec((SV*)ctx->current_cv);
224             }
225 5036           CV *cv = newXS(NULL, series_next_cb, __FILE__);
226 5036           CvXSUBANY(cv).any_ptr = ctx;
227 5036           ctx->current_cv = (CV*)SvREFCNT_inc((SV*)cv);
228 6 100         } else if (!ctx->current_cv) {
229 4           ctx->current_cv = newXS(NULL, series_next_cb, __FILE__);
230 4           CvXSUBANY(ctx->current_cv).any_ptr = ctx;
231             }
232              
233 5042           SV *next_rv = NULL;
234 5042           dSP;
235 5042           ENTER;
236 5042           SAVETMPS;
237 5042 100         if (!ctx->unsafe) {
238 5036           next_rv = sv_2mortal(newRV_noinc((SV*)ctx->current_cv));
239             } else {
240 6           next_rv = sv_2mortal(newRV_inc((SV*)ctx->current_cv));
241             }
242 5042 50         PUSHMARK(SP);
243 5042 50         XPUSHs(next_rv);
244 5042           PUTBACK;
245            
246 5042 100         U32 flags = G_DISCARD | (ctx->unsafe ? 0 : G_EVAL);
247 5042           call_sv(task_sv, flags);
248              
249 5041 100         if (!ctx->unsafe) {
250 5036           SPAGAIN;
251 5036 50         if (SvTRUE(ERRSV)) {
    100          
252 5 50         SV *err = sv_mortalcopy(ERRSV);
253 5           series_cleanup(aTHX_ ctx_ptr);
254 5           croak_sv(err);
255             }
256             }
257              
258 5036 50         FREETMPS;
259 5036           LEAVE;
260 5036 50         if (!*ctx_ptr) return;
261             } else {
262 11           ctx->delayed = 1;
263             }
264             }
265              
266 9 50         if (*ctx_ptr) (*ctx_ptr)->running = 0;
267             }
268              
269 25           static void plimit_cleanup(pTHX_ plimit_ctx **ctx_ptr) {
270 25 50         if (!ctx_ptr || !*ctx_ptr) return;
    50          
271 25           plimit_ctx *ctx = *ctx_ptr;
272 25           *ctx_ptr = NULL;
273              
274 25 100         if (ctx->is_freed_ptr) {
275 21           *(ctx->is_freed_ptr) = 1;
276             }
277              
278 25 100         if (ctx->shared_cv) {
279 4           CvXSUBANY(ctx->shared_cv).any_ptr = NULL;
280 4           SvREFCNT_dec((SV*)ctx->shared_cv);
281             }
282 25 100         if (ctx->cvs) {
283             I32 i;
284 1070 100         for (i = 0; i < ctx->num_cvs; i++) {
285 1049 100         if (ctx->cvs[i]) {
286 1037           CvXSUBANY(ctx->cvs[i]).any_ptr = NULL;
287 1037           SvREFCNT_dec((SV*)ctx->cvs[i]);
288             }
289             }
290 21           Safefree(ctx->cvs);
291             }
292 25 50         if (ctx->tasks) SvREFCNT_dec((SV*)ctx->tasks);
293 25 50         if (ctx->final_cb) SvREFCNT_dec(ctx->final_cb);
294 25           Safefree(ctx);
295             }
296              
297             static void _plimit_dispatch(pTHX_ plimit_ctx **ctx_ptr);
298              
299 1142           static void plimit_task_done(pTHX_ CV *cv) {
300 1142           dXSARGS;
301 1142 50         SvREFCNT_inc_simple_void(cv);
302 1142           sv_2mortal((SV*)cv);
303 1142           plimit_ctx *ctx = (plimit_ctx *)CvXSUBANY(cv).any_ptr;
304 1142 100         if (!ctx) {
305 2           XSRETURN_EMPTY;
306             }
307              
308 1140 100         if (ctx->cvs) {
309 1033           CvXSUBANY(cv).any_ptr = NULL;
310             }
311              
312 1140           ctx->active--;
313              
314 1140 100         if (--ctx->remaining <= 0) {
315 17           SV *cb = ctx->final_cb;
316 32 50         if (IS_PVCV(cb)) {
    100          
    50          
317 16           dSP;
318 16           ENTER;
319 16           SAVETMPS;
320 16           SvREFCNT_inc(cb);
321 16           sv_2mortal(cb);
322 16 50         PUSHMARK(SP);
323 16           PUTBACK;
324              
325 16           plimit_cleanup(aTHX_ &ctx);
326              
327 16           call_sv(cb, G_DISCARD | G_VOID);
328              
329 15 50         FREETMPS;
330 15           LEAVE;
331             } else {
332 1           plimit_cleanup(aTHX_ &ctx);
333             }
334             } else {
335 1123           _plimit_dispatch(aTHX_ &ctx);
336             }
337 1139           XSRETURN_EMPTY;
338             }
339              
340 1148           static void _plimit_dispatch(pTHX_ plimit_ctx **ctx_ptr) {
341 2259 50         if (!ctx_ptr || !*ctx_ptr) return;
    50          
342 1148           plimit_ctx *ctx = *ctx_ptr;
343              
344 1148 100         if (ctx->running) {
345 1111           ctx->delayed = 1;
346 1111           return;
347             }
348              
349 37           int is_freed = 0;
350 37           int *old_is_freed_ptr = ctx->is_freed_ptr;
351 37           ctx->is_freed_ptr = &is_freed;
352              
353 37           ctx->running = 1;
354 37           ctx->delayed = 1;
355              
356 52 100         while (ctx->delayed) {
357 37           ctx->delayed = 0;
358              
359 1171 50         while (!is_freed && ctx->active < ctx->limit && ctx->current_idx < ctx->total_tasks) {
    100          
    100          
360 1156 50         SV **task_ary = (AvREAL(ctx->tasks) && !SvMAGICAL(ctx->tasks)) ? AvARRAY(ctx->tasks) : NULL;
    100          
361 1153 50         SV **fetch_ptr = (task_ary && ctx->current_idx <= AvFILL(ctx->tasks))
    50          
362 1153           ? &task_ary[ctx->current_idx]
363 2309 100         : av_fetch(ctx->tasks, ctx->current_idx, 0);
364 1156 50         SV *task_sv = fetch_ptr ? *fetch_ptr : NULL;
365              
366 1156           ctx->current_idx++;
367              
368 2282 100         if (IS_PVCV(task_sv)) {
    100          
    50          
369 1144           SV *done_rv = NULL;
370 1144           CV *cv = NULL;
371 1144           dSP;
372              
373 1144 100         if (!ctx->unsafe) {
374 1037           cv = newXS(NULL, plimit_task_done, __FILE__);
375 1037           CvXSUBANY(cv).any_ptr = ctx;
376 1037           ctx->cvs[ctx->current_idx - 1] = (CV*)SvREFCNT_inc((SV*)cv);
377             }
378              
379 1144           ctx->active++;
380 1144           int task_unsafe = ctx->unsafe;
381              
382 1144           ENTER;
383 1144           SAVETMPS;
384 1144 100         if (!ctx->unsafe) {
385 1037           done_rv = sv_2mortal(newRV_noinc((SV*)cv));
386             } else {
387 107           done_rv = sv_2mortal(newRV_inc((SV*)ctx->shared_cv));
388             }
389 1144 50         PUSHMARK(SP);
390 1144 50         XPUSHs(done_rv);
391 1144           PUTBACK;
392              
393 1144 100         U32 flags = G_DISCARD | (task_unsafe ? 0 : G_EVAL);
394 1144           call_sv(task_sv, flags);
395              
396 1143 100         if (!task_unsafe) {
397 1037           SPAGAIN;
398 1037 50         if (SvTRUE(ERRSV)) {
    100          
399 5 50         SV *err = sv_mortalcopy(ERRSV);
400 5 100         if (!is_freed) {
401 4           plimit_cleanup(aTHX_ ctx_ptr);
402             }
403 5 50         if (old_is_freed_ptr) *old_is_freed_ptr = 1;
404 5           croak_sv(err);
405             }
406             }
407              
408 1138 50         FREETMPS;
409 1138           LEAVE;
410 1138 100         if (is_freed) goto done;
411             } else {
412 12 100         if (--ctx->remaining <= 0) {
413 4           SV *cb = ctx->final_cb;
414 7 50         if (IS_PVCV(cb)) {
    50          
    50          
415 4           dSP;
416 4           ENTER;
417 4           SAVETMPS;
418 4           SvREFCNT_inc(cb);
419 4           sv_2mortal(cb);
420 4 50         PUSHMARK(SP);
421 4           PUTBACK;
422 4           plimit_cleanup(aTHX_ ctx_ptr);
423 4 50         if (old_is_freed_ptr) *old_is_freed_ptr = 1;
424 4           call_sv(cb, G_DISCARD | G_VOID);
425 3 50         FREETMPS;
426 3           LEAVE;
427             } else {
428 0           plimit_cleanup(aTHX_ ctx_ptr);
429 0 0         if (old_is_freed_ptr) *old_is_freed_ptr = 1;
430             }
431 3           goto done;
432             }
433 8           ctx->delayed = 1;
434             }
435             }
436             }
437              
438 15 50         if (!is_freed) {
439 15           ctx->is_freed_ptr = old_is_freed_ptr;
440 15           ctx->running = 0;
441             }
442 0           done:
443 30 100         if (is_freed && old_is_freed_ptr) {
    50          
444 15           *old_is_freed_ptr = 1;
445             }
446             }
447              
448             MODULE = EV::Future PACKAGE = EV::Future
449              
450             PROTOTYPES: DISABLE
451              
452             BOOT:
453 7 50         I_EV_API ("EV::Future");
    50          
    50          
454              
455             void
456             parallel(tasks, final_cb, ...)
457             AV *tasks
458             SV *final_cb
459             CODE:
460 26           int unsafe = 0;
461 26 100         if (items > 2 && SvTRUE(ST(2))) unsafe = 1;
    50          
462              
463 26           I32 len = av_len(tasks) + 1;
464 26 100         if (len <= 0) {
465 1 50         if (IS_PVCV(final_cb)) {
    50          
    50          
466 1           dSP;
467 1           ENTER;
468 1           SAVETMPS;
469 1 50         PUSHMARK(SP);
470 1           PUTBACK;
471 1           call_sv(final_cb, G_DISCARD | G_VOID);
472 1 50         FREETMPS;
473 1           LEAVE;
474             }
475 1           return;
476             }
477              
478 25           ENTER;
479              
480             int *is_freed;
481 25           Newxz(is_freed, 1, int);
482              
483             parallel_ctx *ctx;
484 25           Newx(ctx, 1, parallel_ctx);
485 25           ctx->is_freed_ptr = is_freed;
486 25           ctx->tasks = (AV*)SvREFCNT_inc((SV*)tasks);
487 25           ctx->final_cb = SvREFCNT_inc(final_cb);
488 25           ctx->remaining = len;
489 25           ctx->cvs = NULL;
490 25           ctx->num_cvs = 0;
491 25           ctx->shared_cv = NULL;
492              
493             ifp_guard *guard;
494 25           Newx(guard, 1, ifp_guard);
495 25           guard->target_field = &ctx->is_freed_ptr;
496 25           guard->is_freed = is_freed;
497 25           SAVEDESTRUCTOR_X(ifp_guard_destroy, guard);
498              
499 25           SV *done_rv = NULL;
500 25 100         if (unsafe) {
501 4           ctx->shared_cv = newXS(NULL, parallel_task_done, __FILE__);
502 4           CvXSUBANY(ctx->shared_cv).any_ptr = ctx;
503 4           done_rv = sv_2mortal(newRV_inc((SV*)ctx->shared_cv));
504             } else {
505 21           ctx->num_cvs = len;
506 21           Newxz(ctx->cvs, len, CV*);
507             }
508              
509 25           dSP;
510              
511             I32 i;
512 25 100         U32 flags = G_DISCARD | (unsafe ? 0 : G_EVAL);
513              
514 6157 100         for (i = 0; i < len; i++) {
515 6141 50         if (*is_freed) break;
516              
517 6141 50         SV **task_ary = (AvREAL(tasks) && !SvMAGICAL(tasks)) ? AvARRAY(tasks) : NULL;
    100          
518 6138 50         SV **fetch_ptr = (task_ary && i <= AvFILL(tasks))
    100          
519 6137           ? &task_ary[i]
520 12279 100         : av_fetch(tasks, i, 0);
521 6141 100         SV *task_sv = fetch_ptr ? *fetch_ptr : NULL;
522              
523 12266 100         if (IS_PVCV(task_sv)) {
    100          
    50          
524 6130           CV *cv = NULL;
525 6130 100         if (!unsafe) {
526 6024           cv = newXS(NULL, parallel_task_done, __FILE__);
527 6024           CvXSUBANY(cv).any_ptr = ctx;
528 6024           ctx->cvs[i] = (CV*)SvREFCNT_inc((SV*)cv);
529             }
530              
531 6130           ENTER;
532 6130           SAVETMPS;
533 6130 100         if (!unsafe) {
534 6024           done_rv = sv_2mortal(newRV_noinc((SV*)cv));
535             }
536 6130 50         PUSHMARK(SP);
537 6130 50         XPUSHs(done_rv);
538 6130           PUTBACK;
539              
540 6130           call_sv(task_sv, flags);
541 6129           SPAGAIN;
542 6129 100         if (!unsafe) {
543 6024 50         if (SvTRUE(ERRSV)) {
    100          
544 4 50         SV *err = sv_mortalcopy(ERRSV);
545 4 100         if (!*is_freed) {
546 3           parallel_cleanup(aTHX_ &ctx);
547             }
548 4           croak_sv(err);
549             }
550             }
551              
552 6125 100         FREETMPS;
553 6125           LEAVE;
554             } else {
555 11 100         if (--ctx->remaining <= 0) {
556 4           SV *cb = ctx->final_cb;
557 4 50         if (IS_PVCV(cb)) {
    50          
    50          
558 4           ENTER;
559 4           SAVETMPS;
560 4           SvREFCNT_inc(cb);
561 4           sv_2mortal(cb);
562 4 50         PUSHMARK(SP);
563 4           PUTBACK;
564 4           parallel_cleanup(aTHX_ &ctx);
565 4           call_sv(cb, G_DISCARD | G_VOID);
566 3 50         FREETMPS;
567 3           LEAVE;
568             } else {
569 0           parallel_cleanup(aTHX_ &ctx);
570             }
571 3           break;
572             }
573             }
574             }
575 19           LEAVE;
576              
577             void
578             series(tasks, final_cb, ...)
579             AV *tasks
580             SV *final_cb
581             CODE:
582 33           int unsafe = 0;
583 33 100         if (items > 2 && SvTRUE(ST(2))) unsafe = 1;
    50          
584              
585 33           I32 len = av_len(tasks) + 1;
586 33 100         if (len <= 0) {
587 1 50         if (IS_PVCV(final_cb)) {
    50          
    50          
588 1           dSP;
589 1           ENTER;
590 1           SAVETMPS;
591 1 50         PUSHMARK(SP);
592 1           PUTBACK;
593 1           call_sv(final_cb, G_DISCARD | G_VOID);
594 1 50         FREETMPS;
595 1           LEAVE;
596             }
597 1           return;
598             }
599              
600 32           ENTER;
601              
602             int *is_freed;
603 32           Newxz(is_freed, 1, int);
604              
605             series_ctx *ctx;
606 32           Newx(ctx, 1, series_ctx);
607 32           ctx->is_freed_ptr = is_freed;
608 32           ctx->tasks = (AV*)SvREFCNT_inc((SV*)tasks);
609 32           ctx->final_cb = SvREFCNT_inc(final_cb);
610 32           ctx->current_idx = 0;
611 32           ctx->running = 0;
612 32           ctx->delayed = 0;
613 32           ctx->total_tasks = len;
614 32           ctx->unsafe = unsafe;
615 32           ctx->current_cv = NULL;
616              
617             ifp_guard *guard;
618 32           Newx(guard, 1, ifp_guard);
619 32           guard->target_field = &ctx->is_freed_ptr;
620 32           guard->is_freed = is_freed;
621 32           SAVEDESTRUCTOR_X(ifp_guard_destroy, guard);
622              
623 32           _series_next(aTHX_ &ctx);
624              
625 24           LEAVE;
626              
627             void
628             parallel_limit(tasks, limit, final_cb, ...)
629             AV *tasks
630             I32 limit
631             SV *final_cb
632             CODE:
633 26           int unsafe = 0;
634 26 100         if (items > 3 && SvTRUE(ST(3))) unsafe = 1;
    50          
635              
636 26           I32 len = av_len(tasks) + 1;
637 26 100         if (len <= 0) {
638 1 50         if (IS_PVCV(final_cb)) {
    50          
    50          
639 1           dSP;
640 1           ENTER;
641 1           SAVETMPS;
642 1 50         PUSHMARK(SP);
643 1           PUTBACK;
644 1           call_sv(final_cb, G_DISCARD | G_VOID);
645 1 50         FREETMPS;
646 1           LEAVE;
647             }
648 1           return;
649             }
650              
651 25 100         if (limit < 1) limit = 1;
652 25 100         if (limit > len) limit = len;
653              
654 25           ENTER;
655              
656             int *is_freed;
657 25           Newxz(is_freed, 1, int);
658              
659             plimit_ctx *ctx;
660 25           Newx(ctx, 1, plimit_ctx);
661 25           ctx->is_freed_ptr = is_freed;
662 25           ctx->tasks = (AV*)SvREFCNT_inc((SV*)tasks);
663 25           ctx->final_cb = SvREFCNT_inc(final_cb);
664 25           ctx->remaining = len;
665 25           ctx->current_idx = 0;
666 25           ctx->total_tasks = len;
667 25           ctx->limit = limit;
668 25           ctx->active = 0;
669 25           ctx->unsafe = unsafe;
670 25           ctx->running = 0;
671 25           ctx->delayed = 0;
672 25           ctx->cvs = NULL;
673 25           ctx->num_cvs = 0;
674 25           ctx->shared_cv = NULL;
675              
676             ifp_guard *guard;
677 25           Newx(guard, 1, ifp_guard);
678 25           guard->target_field = &ctx->is_freed_ptr;
679 25           guard->is_freed = is_freed;
680 25           SAVEDESTRUCTOR_X(ifp_guard_destroy, guard);
681              
682 25 100         if (unsafe) {
683 4           ctx->shared_cv = newXS(NULL, plimit_task_done, __FILE__);
684 4           CvXSUBANY(ctx->shared_cv).any_ptr = ctx;
685             } else {
686 21           ctx->num_cvs = len;
687 21           Newxz(ctx->cvs, len, CV*);
688             }
689              
690 25           _plimit_dispatch(aTHX_ &ctx);
691              
692 18           LEAVE;