File Coverage

blib/lib/Hypersonic/Future/Pool.pm
Criterion Covered Total %
statement 61 75 81.3
branch 0 4 0.0
condition 2 4 50.0
subroutine 11 13 84.6
pod 0 3 0.0
total 74 99 74.7


line stmt bran cond sub pod time code
1             package Hypersonic::Future::Pool;
2              
3 9     9   3234 use strict;
  9         13  
  9         318  
4 9     9   47 use warnings;
  9         13  
  9         477  
5 9     9   123 use 5.010;
  9         21  
6              
7             our $VERSION = '0.15';
8              
9 9     9   32 use XS::JIT::Builder;
  9         12  
  9         518  
10 9     9   34 use Hypersonic::Future qw(MAX_FUTURES);
  9         11  
  9         494  
11 9     9   60 use Hypersonic::JIT::Util;
  9         10  
  9         323  
12              
13             # Thread pool configuration
14             use constant {
15 9         1124 MAX_POOLS => 256,
16             DEFAULT_WORKERS => 8,
17             DEFAULT_QUEUE_SIZE => 4096,
18             OP_TYPE_CODE => 1,
19             OP_TYPE_DB_QUERY => 2,
20             OP_TYPE_DB_EXECUTE => 3,
21             POOL_STATE_UNINITIALIZED => 0,
22             POOL_STATE_RUNNING => 1,
23             POOL_STATE_SHUTDOWN => 2,
24 9     9   34 };
  9         9  
25              
26 9     9   35 use Exporter 'import';
  9         12  
  9         20660  
27             our @EXPORT_OK = qw(
28             MAX_POOLS DEFAULT_WORKERS DEFAULT_QUEUE_SIZE
29             OP_TYPE_CODE OP_TYPE_DB_QUERY OP_TYPE_DB_EXECUTE
30             POOL_STATE_UNINITIALIZED POOL_STATE_RUNNING POOL_STATE_SHUTDOWN
31             );
32              
33             # JIT state - our so Future.pm can set it
34             our $COMPILED = 0;
35              
36             sub get_xs_functions {
37             return {
38             # Lifecycle - instance methods (extract slot from $$self)
39 9     9 0 492 'Hypersonic::Future::Pool::new' => { source => 'xs_pool_new', is_xs_native => 1 },
40             'Hypersonic::Future::Pool::init' => { source => 'xs_pool_init', is_xs_native => 1 },
41             'Hypersonic::Future::Pool::shutdown' => { source => 'xs_pool_shutdown', is_xs_native => 1 },
42             'Hypersonic::Future::Pool::DESTROY' => { source => 'xs_pool_destroy', is_xs_native => 1 },
43              
44             # Operations - instance methods (direct XS, no Perl wrappers)
45             'Hypersonic::Future::Pool::submit' => { source => 'xs_pool_submit', is_xs_native => 1 },
46             'Hypersonic::Future::Pool::process_ready' => { source => 'xs_pool_process_ready', is_xs_native => 1 },
47              
48             # State inspection - will have custom OPs
49             'Hypersonic::Future::Pool::is_initialized' => { source => 'xs_pool_is_initialized', is_xs_native => 1 },
50             'Hypersonic::Future::Pool::pending_count' => { source => 'xs_pool_pending_count', is_xs_native => 1 },
51             'Hypersonic::Future::Pool::get_notify_fd' => { source => 'xs_pool_get_notify_fd', is_xs_native => 1 },
52             'Hypersonic::Future::Pool::workers' => { source => 'xs_pool_workers', is_xs_native => 1 },
53             'Hypersonic::Future::Pool::slot' => { source => 'xs_pool_slot', is_xs_native => 1 },
54              
55             # Global pool helpers (class methods)
56             'Hypersonic::Future::Pool::init_global' => { source => 'xs_pool_init_global', is_xs_native => 1 },
57             'Hypersonic::Future::Pool::shutdown_global' => { source => 'xs_pool_shutdown_global', is_xs_native => 1 },
58             'Hypersonic::Future::Pool::default_pool' => { source => 'xs_pool_default_pool', is_xs_native => 1 },
59              
60             # Slot-based helpers for event loop (class methods)
61             'Hypersonic::Future::Pool::_get_notify_fd_slot' => { source => 'xs_pool_get_notify_fd_slot', is_xs_native => 1 },
62             'Hypersonic::Future::Pool::_process_ready_slot' => { source => 'xs_pool_process_ready_slot', is_xs_native => 1 },
63              
64             # Custom op registration
65             'Hypersonic::Future::Pool::_register_ops' => { source => 'xs_pool_register_ops', is_xs_native => 1 },
66             };
67             }
68              
69             sub generate_c_code {
70 9     9 0 21 my ($class, $builder, $opts) = @_;
71              
72 9   50     29 $opts //= {};
73 9   50     68 my $max_pools = $opts->{max_pools} // MAX_POOLS;
74 9         36 my $inline = Hypersonic::JIT::Util->inline_keyword;
75              
76             # PERL_VERSION_* macros are already provided by XS::JIT preamble
77             # No need to emit them here
78              
79             # System includes via centralized utility
80 9         40 Hypersonic::JIT::Util->add_standard_includes($builder,
81             qw(unistd fcntl threading));
82              
83             # Platform-specific eventfd (Linux only)
84 9         38 Hypersonic::JIT::Util->add_platform_eventfd($builder);
85              
86             # Defines
87 9         84 $builder->line("#define MAX_POOLS $max_pools")
88             ->line('#define POOL_STATE_UNINITIALIZED 0')
89             ->line('#define POOL_STATE_RUNNING 1')
90             ->line('#define POOL_STATE_SHUTDOWN 2')
91             ->line('#define OP_TYPE_CODE 1')
92             ->line('#define OP_TYPE_DB_QUERY 2')
93             ->line('#define OP_TYPE_DB_EXECUTE 3')
94             ->blank;
95              
96             # ThreadPoolOp - a queued operation
97 9         108 $builder->line('typedef struct ThreadPoolOp {')
98             ->line(' int op_type;')
99             ->line(' int future_slot;')
100             ->line(' int pool_slot; /* Which pool owns this op */')
101             ->line(' SV *code; /* For CODE type: coderef to execute */')
102             ->line(' SV *args; /* Arrayref of arguments */')
103             ->line(' SV *result; /* Result after execution */')
104             ->line(' char *error; /* Error message if failed */')
105             ->line(' int completed; /* 1 when done */')
106             ->line(' struct ThreadPoolOp *next;')
107             ->line('} ThreadPoolOp;')
108             ->blank;
109              
110             # PoolContext - per-pool state (mirrors FutureContext pattern)
111 9         154 $builder->line('typedef struct PoolContext {')
112             ->line(' int state; /* POOL_STATE_* */')
113             ->line(' int in_use; /* Slot allocation flag */')
114             ->line(' int workers; /* Number of worker threads */')
115             ->line(' int queue_size; /* Max queue depth */')
116             ->line(' pthread_t *threads; /* Dynamic array of worker handles */')
117             ->line(' pthread_mutex_t mutex;')
118             ->line(' pthread_cond_t cond;')
119             ->line(' pthread_cond_t done_cond;')
120             ->line(' ThreadPoolOp *queue_head;')
121             ->line(' ThreadPoolOp *queue_tail;')
122             ->line(' ThreadPoolOp *completed_head;')
123             ->line(' ThreadPoolOp *completed_tail;')
124             ->line(' int queue_count;')
125             ->line(' int completed_count;')
126             ->line(' int notify_fd;')
127             ->line('#if !USE_EVENTFD')
128             ->line(' int notify_pipe[2];')
129             ->line('#endif')
130             ->line('} PoolContext;')
131             ->blank;
132              
133             # Pool registry and freelist (like future_registry)
134 9         51 $builder->line('static PoolContext pool_registry[MAX_POOLS];')
135             ->line('static int pool_freelist[MAX_POOLS];')
136             ->line('static int pool_freelist_count = 0;')
137             ->line('static int pool_freelist_initialized = 0;')
138             ->line('static int default_pool_slot = -1; /* Global default pool */')
139             ->blank;
140              
141             # Freelist init
142 9         100 $builder->line("static $inline void pool_freelist_init(void) {")
143             ->line(' int i;')
144             ->line(' if (pool_freelist_initialized) return;')
145             ->line(' for (i = MAX_POOLS - 1; i >= 0; i--) {')
146             ->line(' pool_freelist[pool_freelist_count++] = i;')
147             ->line(' }')
148             ->line(' pool_freelist_initialized = 1;')
149             ->line('}')
150             ->blank;
151              
152             # Alloc slot
153 9         113 $builder->line("static $inline int pool_alloc_slot(void) {")
154             ->line(' if (!pool_freelist_initialized) pool_freelist_init();')
155             ->line(' if (pool_freelist_count > 0) {')
156             ->line(' int slot = pool_freelist[--pool_freelist_count];')
157             ->line(' PoolContext *ctx = &pool_registry[slot];')
158             ->line(' memset(ctx, 0, sizeof(PoolContext));')
159             ->line(' ctx->in_use = 1;')
160             ->line(' ctx->state = POOL_STATE_UNINITIALIZED;')
161             ->line(' return slot;')
162             ->line(' }')
163             ->line(' return -1;')
164             ->line('}')
165             ->blank;
166              
167             # Free slot
168 9         108 $builder->line('static void pool_free_slot(int slot) {')
169             ->line(' if (slot < 0 || slot >= MAX_POOLS) return;')
170             ->line(' PoolContext *ctx = &pool_registry[slot];')
171             ->line(' if (!ctx->in_use) return;')
172             ->line(' if (ctx->threads) {')
173             ->line(' free(ctx->threads);')
174             ->line(' ctx->threads = NULL;')
175             ->line(' }')
176             ->line(' ctx->in_use = 0;')
177             ->line(' if (pool_freelist_count < MAX_POOLS) {')
178             ->line(' pool_freelist[pool_freelist_count++] = slot;')
179             ->line(' }')
180             ->line('}')
181             ->blank;
182              
183             # Helper: signal completion for a specific pool
184 9         124 $builder->line('static void pool_signal_completion_slot(int slot) {')
185             ->line(' PoolContext *pool = &pool_registry[slot];')
186             ->line('#if USE_EVENTFD')
187             ->line(' uint64_t val = 1;')
188             ->line(' write(pool->notify_fd, &val, sizeof(val));')
189             ->line('#else')
190             ->line(' char c = 1;')
191             ->line(' write(pool->notify_pipe[1], &c, 1);')
192             ->line('#endif')
193             ->line('}')
194             ->blank;
195              
196             # Helper: clear notification for a specific pool
197 9         80 $builder->line('static void pool_clear_notification_slot(int slot) {')
198             ->line(' PoolContext *pool = &pool_registry[slot];')
199             ->line('#if USE_EVENTFD')
200             ->line(' uint64_t val;')
201             ->line(' read(pool->notify_fd, &val, sizeof(val));')
202             ->line('#else')
203             ->line(' char buf[64];')
204             ->line(' while (read(pool->notify_pipe[0], buf, sizeof(buf)) > 0) {}')
205             ->line('#endif')
206             ->line('}')
207             ->blank;
208              
209             # Helper: get notify_fd for a pool slot (for event loop integration)
210 9         79 $builder->line('static int pool_get_notify_fd_slot(int slot) {')
211             ->line(' if (slot < 0 || slot >= MAX_POOLS) return -1;')
212             ->line(' PoolContext *pool = &pool_registry[slot];')
213             ->line(' if (pool->state != POOL_STATE_RUNNING) return -1;')
214             ->line(' return pool->notify_fd;')
215             ->line('}')
216             ->blank;
217              
218             # Helper: process ready operations for a pool slot (for event loop integration)
219             # This is a simplified version that just clears notification - actual processing
220             # should be done via XS to properly handle Perl callbacks
221 9         64 $builder->line('static void pool_process_ready_slot(int slot) {')
222             ->line(' if (slot < 0 || slot >= MAX_POOLS) return;')
223             ->line(' PoolContext *pool = &pool_registry[slot];')
224             ->line(' if (pool->state != POOL_STATE_RUNNING) return;')
225             ->line(' pool_clear_notification_slot(slot);')
226             ->line('}')
227             ->blank;
228              
229             # Worker thread function - takes pool_slot as argument
230             # Fixed: wait while NOT shutdown AND no work (handles UNINITIALIZED state too)
231 9         415 $builder->line('static void* pool_worker_fn(void *arg) {')
232             ->line(' int pool_slot = (int)(intptr_t)arg;')
233             ->line(' PoolContext *pool = &pool_registry[pool_slot];')
234             ->line(' while (1) {')
235             ->line(' ThreadPoolOp *op = NULL;')
236             ->line(' pthread_mutex_lock(&pool->mutex);')
237             ->line(' /* Wait while not shutdown and no work available */')
238             ->line(' while (pool->state != POOL_STATE_SHUTDOWN && !pool->queue_head) {')
239             ->line(' pthread_cond_wait(&pool->cond, &pool->mutex);')
240             ->line(' }')
241             ->line(' /* Exit if shutdown and queue empty */')
242             ->line(' if (pool->state == POOL_STATE_SHUTDOWN && !pool->queue_head) {')
243             ->line(' pthread_mutex_unlock(&pool->mutex);')
244             ->line(' break;')
245             ->line(' }')
246             ->line(' /* Dequeue */')
247             ->line(' op = pool->queue_head;')
248             ->line(' pool->queue_head = op->next;')
249             ->line(' if (!pool->queue_head) pool->queue_tail = NULL;')
250             ->line(' pool->queue_count--;')
251             ->line(' op->next = NULL;')
252             ->line(' /* Move to completed queue */')
253             ->line(' if (pool->completed_tail) {')
254             ->line(' pool->completed_tail->next = op;')
255             ->line(' } else {')
256             ->line(' pool->completed_head = op;')
257             ->line(' }')
258             ->line(' pool->completed_tail = op;')
259             ->line(' pool->completed_count++;')
260             ->line(' pthread_mutex_unlock(&pool->mutex);')
261             ->line(' /* Notify main thread */')
262             ->line(' pool_signal_completion_slot(pool_slot);')
263             ->line(' }')
264             ->line(' return NULL;')
265             ->line('}')
266             ->blank;
267              
268             # Generate custom OPs for hot-path methods (disabled for debugging)
269             # $class->_gen_custom_ops($builder);
270              
271             # Generate XS functions
272 9         62 $class->_gen_xs_functions($builder);
273              
274             # Placeholder _register_ops that does nothing
275 9         141 $builder->xs_function('xs_pool_register_ops')
276             ->xs_preamble
277             ->xs_return('0')
278             ->xs_end;
279             }
280              
281             sub _gen_custom_ops {
282 0     0   0 my ($class, $builder) = @_;
283              
284             # Custom OP: pp_pool_is_initialized
285 0         0 $builder->line('static OP* pp_pool_is_initialized(pTHX) {')
286             ->line(' dSP;')
287             ->line(' SV* self = TOPs;')
288             ->line(' int slot = SvIV(SvRV(self));')
289             ->line(' SETs(boolSV(pool_registry[slot].state == POOL_STATE_RUNNING));')
290             ->line(' return NORMAL;')
291             ->line('}')
292             ->blank;
293              
294             # Custom OP: pp_pool_pending_count
295 0         0 $builder->line('static OP* pp_pool_pending_count(pTHX) {')
296             ->line(' dSP;')
297             ->line(' SV* self = TOPs;')
298             ->line(' int slot = SvIV(SvRV(self));')
299             ->line(' PoolContext *pool = &pool_registry[slot];')
300             ->line(' pthread_mutex_lock(&pool->mutex);')
301             ->line(' int count = pool->queue_count + pool->completed_count;')
302             ->line(' pthread_mutex_unlock(&pool->mutex);')
303             ->line(' SETs(sv_2mortal(newSViv(count)));')
304             ->line(' return NORMAL;')
305             ->line('}')
306             ->blank;
307              
308             # XOP declarations
309 0         0 $builder->xop_declare('pool_is_initialized_xop', 'pp_pool_is_initialized', 'pool is_initialized')
310             ->xop_declare('pool_pending_count_xop', 'pp_pool_pending_count', 'pool pending_count');
311              
312             # Call checkers - generate the S_ck_ck_* functions
313 0         0 $builder->ck_start('ck_pool_is_initialized')
314             ->ck_preamble
315             ->ck_build_unop('pp_pool_is_initialized', '0')
316             ->ck_end;
317              
318 0         0 $builder->ck_start('ck_pool_pending_count')
319             ->ck_preamble
320             ->ck_build_unop('pp_pool_pending_count', '0')
321             ->ck_end;
322              
323             # Call checker registration function
324             # cv_set_call_checker requires Perl 5.14+ - check at JIT time, not C compile time
325 0         0 $builder->xs_function('xs_pool_register_ops')
326             ->xs_preamble
327             ->line('register_xop_pool_is_initialized_xop(aTHX);')
328             ->line('register_xop_pool_pending_count_xop(aTHX);');
329              
330             # JIT optimization: only emit cv_set_call_checker code if Perl >= 5.14
331             # This eliminates dead code from the generated C file on older Perls
332 0 0       0 if ($] >= 5.014000) {
333 0         0 $builder->line('{')
334             ->line(' CV *cv;')
335             ->line(' cv = get_cv("Hypersonic::Future::Pool::is_initialized", 0);')
336             ->line(' if (cv) cv_set_call_checker(cv, S_ck_ck_pool_is_initialized, &PL_sv_undef);')
337             ->line(' cv = get_cv("Hypersonic::Future::Pool::pending_count", 0);')
338             ->line(' if (cv) cv_set_call_checker(cv, S_ck_ck_pool_pending_count, &PL_sv_undef);')
339             ->line('}');
340             }
341              
342 0         0 $builder->xs_return('0')
343             ->xs_end;
344             }
345              
346             sub _gen_xs_functions {
347 9     9   21 my ($class, $builder) = @_;
348              
349             # XS: new - allocate slot and return blessed IV ref
350             # Usage: Pool->new() or Pool->new(workers => N, queue_size => N)
351 9         307 $builder->xs_function('xs_pool_new')
352             ->xs_preamble
353             ->line('int workers = 8; /* default */')
354             ->line('int queue_size = 4096; /* default */')
355             ->line('/* Parse named parameters */')
356             ->line('int i;')
357             ->line('for (i = 1; i < items - 1; i += 2) {')
358             ->line(' const char *key = SvPV_nolen(ST(i));')
359             ->line(' SV *val = ST(i + 1);')
360             ->line(' if (strEQ(key, "workers")) workers = SvIV(val);')
361             ->line(' else if (strEQ(key, "queue_size")) queue_size = SvIV(val);')
362             ->line('}')
363             ->line('int slot = pool_alloc_slot();')
364             ->line('if (slot < 0) croak("Pool registry full (max %d pools)", MAX_POOLS);')
365             ->line('PoolContext *ctx = &pool_registry[slot];')
366             ->line('ctx->workers = workers;')
367             ->line('ctx->queue_size = queue_size;')
368             ->line('SV *slot_sv = newSViv(slot);')
369             ->line('SV *self_ref = newRV_noinc(slot_sv);')
370             ->line('sv_bless(self_ref, gv_stashpv("Hypersonic::Future::Pool", GV_ADD));')
371             ->line('ST(0) = sv_2mortal(self_ref);')
372             ->xs_return('1')
373             ->xs_end;
374              
375             # XS: init - initialize the pool (instance method)
376 9         294 $builder->xs_function('xs_pool_init')
377             ->xs_preamble
378             ->line('int i;')
379             ->line('int slot = SvIV(SvRV(ST(0)));')
380             ->line('PoolContext *pool = &pool_registry[slot];')
381             ->line('if (pool->state == POOL_STATE_RUNNING) {')
382             ->line(' ST(0) = sv_2mortal(newSViv(1));')
383             ->line(' XSRETURN(1);')
384             ->line('}')
385             ->line('pthread_mutex_init(&pool->mutex, NULL);')
386             ->line('pthread_cond_init(&pool->cond, NULL);')
387             ->line('pthread_cond_init(&pool->done_cond, NULL);')
388             ->line('#if USE_EVENTFD')
389             ->line('pool->notify_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);')
390             ->line('if (pool->notify_fd < 0) croak("eventfd() failed");')
391             ->line('#else')
392             ->line('if (pipe(pool->notify_pipe) < 0) croak("pipe() failed");')
393             ->line('fcntl(pool->notify_pipe[0], F_SETFL, O_NONBLOCK);')
394             ->line('fcntl(pool->notify_pipe[1], F_SETFL, O_NONBLOCK);')
395             ->line('pool->notify_fd = pool->notify_pipe[0];')
396             ->line('#endif')
397             ->line('pool->threads = (pthread_t*)malloc(pool->workers * sizeof(pthread_t));')
398             ->line('if (!pool->threads) croak("malloc failed for threads");')
399             ->line('pool->state = POOL_STATE_RUNNING; /* Set BEFORE creating threads */')
400             ->line('for (i = 0; i < pool->workers; i++) {')
401             ->line(' if (pthread_create(&pool->threads[i], NULL, pool_worker_fn, (void*)(intptr_t)slot) != 0) {')
402             ->line(' croak("pthread_create failed");')
403             ->line(' }')
404             ->line('}')
405             ->line('ST(0) = sv_2mortal(newSViv(1));')
406             ->xs_return('1')
407             ->xs_end;
408              
409             # XS: shutdown - shutdown the pool (instance method)
410 9         337 $builder->xs_function('xs_pool_shutdown')
411             ->xs_preamble
412             ->line('int i;')
413             ->line('int slot = SvIV(SvRV(ST(0)));')
414             ->line('PoolContext *pool = &pool_registry[slot];')
415             ->line('if (pool->state != POOL_STATE_RUNNING) {')
416             ->line(' ST(0) = sv_2mortal(newSViv(0));')
417             ->line(' XSRETURN(1);')
418             ->line('}')
419             ->line('pthread_mutex_lock(&pool->mutex);')
420             ->line('pool->state = POOL_STATE_SHUTDOWN;')
421             ->line('pthread_cond_broadcast(&pool->cond);')
422             ->line('pthread_mutex_unlock(&pool->mutex);')
423             ->line('for (i = 0; i < pool->workers; i++) {')
424             ->line(' pthread_join(pool->threads[i], NULL);')
425             ->line('}')
426             ->line('#if USE_EVENTFD')
427             ->line('close(pool->notify_fd);')
428             ->line('#else')
429             ->line('close(pool->notify_pipe[0]);')
430             ->line('close(pool->notify_pipe[1]);')
431             ->line('#endif')
432             ->line('pthread_mutex_destroy(&pool->mutex);')
433             ->line('pthread_cond_destroy(&pool->cond);')
434             ->line('pthread_cond_destroy(&pool->done_cond);')
435             ->line('ST(0) = sv_2mortal(newSViv(1));')
436             ->xs_return('1')
437             ->xs_end;
438              
439             # XS: DESTROY - cleanup on destruction
440             # Skip cleanup during global destruction (PL_dirty) - let OS clean up threads
441 9         344 $builder->xs_function('xs_pool_destroy')
442             ->xs_preamble
443             ->line('int i;')
444             ->line('/* Skip during global destruction - pthread_join can hang/crash */')
445             ->line('if (PL_dirty) XSRETURN(0);')
446             ->line('int slot = SvIV(SvRV(ST(0)));')
447             ->line('if (slot >= 0 && slot < MAX_POOLS) {')
448             ->line(' PoolContext *pool = &pool_registry[slot];')
449             ->line(' if (pool->in_use && pool->state == POOL_STATE_RUNNING) {')
450             ->line(' /* Auto-shutdown on destruction */')
451             ->line(' pthread_mutex_lock(&pool->mutex);')
452             ->line(' pool->state = POOL_STATE_SHUTDOWN;')
453             ->line(' pthread_cond_broadcast(&pool->cond);')
454             ->line(' pthread_mutex_unlock(&pool->mutex);')
455             ->line(' for (i = 0; i < pool->workers; i++) {')
456             ->line(' pthread_join(pool->threads[i], NULL);')
457             ->line(' }')
458             ->line('#if USE_EVENTFD')
459             ->line(' close(pool->notify_fd);')
460             ->line('#else')
461             ->line(' close(pool->notify_pipe[0]);')
462             ->line(' close(pool->notify_pipe[1]);')
463             ->line('#endif')
464             ->line(' pthread_mutex_destroy(&pool->mutex);')
465             ->line(' pthread_cond_destroy(&pool->cond);')
466             ->line(' pthread_cond_destroy(&pool->done_cond);')
467             ->line(' }')
468             ->line(' pool_free_slot(slot);')
469             ->line('}')
470             ->xs_return('0')
471             ->xs_end;
472              
473             # XS: submit - queue an operation (instance method)
474             # Usage: $pool->submit($future, $code, $args_aref)
475             # $future can be a Future object or an integer slot
476 9         1363 $builder->xs_function('xs_pool_submit')
477             ->xs_preamble
478             ->line('if (items < 3) croak("Usage: $pool->submit($future, $code, $args_aref)");')
479             ->line('int pool_slot = SvIV(SvRV(ST(0)));')
480             ->line('PoolContext *pool = &pool_registry[pool_slot];')
481             ->line('if (pool->state != POOL_STATE_RUNNING) croak("Pool not initialized");')
482             ->line('/* Extract future slot - accept Future object or integer */')
483             ->line('int future_slot;')
484             ->line('SV *future_sv = ST(1);')
485             ->line('if (SvROK(future_sv) && sv_derived_from(future_sv, "Hypersonic::Future")) {')
486             ->line(' future_slot = SvIV(SvRV(future_sv));')
487             ->line('} else {')
488             ->line(' future_slot = SvIV(future_sv);')
489             ->line('}')
490             ->line('SV *code = ST(2);')
491             ->line('SV *args = (items > 3) ? ST(3) : NULL;')
492             ->line('ThreadPoolOp *op = (ThreadPoolOp*)calloc(1, sizeof(ThreadPoolOp));')
493             ->line('if (!op) croak("malloc failed");')
494             ->line('op->op_type = OP_TYPE_CODE;')
495             ->line('op->future_slot = future_slot;')
496             ->line('op->pool_slot = pool_slot;')
497             ->line('op->code = SvREFCNT_inc(code);')
498             ->line('if (args) op->args = SvREFCNT_inc(args);')
499             ->line('pthread_mutex_lock(&pool->mutex);')
500             ->line('if (pool->queue_tail) {')
501             ->line(' pool->queue_tail->next = op;')
502             ->line('} else {')
503             ->line(' pool->queue_head = op;')
504             ->line('}')
505             ->line('pool->queue_tail = op;')
506             ->line('pool->queue_count++;')
507             ->line('pthread_cond_signal(&pool->cond);')
508             ->line('pthread_mutex_unlock(&pool->mutex);')
509             ->line('ST(0) = sv_2mortal(newSViv(1));')
510             ->xs_return('1')
511             ->xs_end;
512              
513             # XS: process_ready - process completed ops (instance method)
514 9         634 $builder->xs_function('xs_pool_process_ready')
515             ->xs_preamble
516             ->line('int i;')
517             ->line('int pool_slot = SvIV(SvRV(ST(0)));')
518             ->line('PoolContext *pool = &pool_registry[pool_slot];')
519             ->line('if (pool->state != POOL_STATE_RUNNING) { ST(0) = sv_2mortal(newSViv(0)); XSRETURN(1); }')
520             ->line('pool_clear_notification_slot(pool_slot);')
521             ->line('int processed = 0;')
522             ->line('while (1) {')
523             ->line(' ThreadPoolOp *op = NULL;')
524             ->line(' pthread_mutex_lock(&pool->mutex);')
525             ->line(' if (pool->completed_head) {')
526             ->line(' op = pool->completed_head;')
527             ->line(' pool->completed_head = op->next;')
528             ->line(' if (!pool->completed_head) pool->completed_tail = NULL;')
529             ->line(' pool->completed_count--;')
530             ->line(' }')
531             ->line(' pthread_mutex_unlock(&pool->mutex);')
532             ->line(' if (!op) break;')
533             ->line(' /* Execute the code in main Perl thread */')
534             ->line(' dSP;')
535             ->line(' ENTER; SAVETMPS;')
536             ->line(' PUSHMARK(SP);')
537             ->line(' if (op->args && SvROK(op->args)) {')
538             ->line(' AV *args_av = (AV*)SvRV(op->args);')
539             ->line(' int len = av_len(args_av) + 1;')
540             ->line(' for (i = 0; i < len; i++) {')
541             ->line(' SV **elem = av_fetch(args_av, i, 0);')
542             ->line(' if (elem) XPUSHs(*elem);')
543             ->line(' }')
544             ->line(' }')
545             ->line(' PUTBACK;')
546             ->line(' int count = call_sv(op->code, G_EVAL | G_ARRAY);')
547             ->line(' SPAGAIN;')
548             ->line(' SV *errsv = ERRSV;')
549             ->line(' /* Access future registry directly */')
550             ->line(' extern FutureContext future_registry[];')
551             ->line(' extern void future_invoke_callbacks(int, int);')
552             ->line(' FutureContext *ctx = &future_registry[op->future_slot];')
553             ->line(' if (SvTRUE(errsv)) {')
554             ->line(' STRLEN len;')
555             ->line(' const char *msg = SvPV(errsv, len);')
556             ->line(' ctx->fail_message = (char*)malloc(len + 1);')
557             ->line(' memcpy(ctx->fail_message, msg, len);')
558             ->line(' ctx->fail_message[len] = 0;')
559             ->line(' ctx->state = FUTURE_STATE_FAILED;')
560             ->line(' while (count-- > 0) POPs;')
561             ->line(' PUTBACK;')
562             ->line(' FREETMPS; LEAVE;')
563             ->line(' future_invoke_callbacks(op->future_slot, FUTURE_CB_FAIL);')
564             ->line(' } else if (count > 0) {')
565             ->line(' ctx->result_values = (SV**)malloc(count * sizeof(SV*));')
566             ->line(' for (i = count - 1; i >= 0; i--) {')
567             ->line(' ctx->result_values[i] = SvREFCNT_inc(POPs);')
568             ->line(' }')
569             ->line(' ctx->result_count = count;')
570             ->line(' ctx->state = FUTURE_STATE_DONE;')
571             ->line(' PUTBACK;')
572             ->line(' FREETMPS; LEAVE;')
573             ->line(' future_invoke_callbacks(op->future_slot, FUTURE_CB_DONE);')
574             ->line(' } else {')
575             ->line(' ctx->state = FUTURE_STATE_DONE;')
576             ->line(' PUTBACK;')
577             ->line(' FREETMPS; LEAVE;')
578             ->line(' future_invoke_callbacks(op->future_slot, FUTURE_CB_DONE);')
579             ->line(' }')
580             ->line(' if (op->code) SvREFCNT_dec(op->code);')
581             ->line(' if (op->args) SvREFCNT_dec(op->args);')
582             ->line(' free(op);')
583             ->line(' processed++;')
584             ->line('}')
585             ->line('ST(0) = sv_2mortal(newSViv(processed));')
586             ->xs_return('1')
587             ->xs_end;
588              
589             # XS: is_initialized (instance method - also has custom OP)
590 9         58 $builder->xs_function('xs_pool_is_initialized')
591             ->xs_preamble
592             ->line('int slot = SvIV(SvRV(ST(0)));')
593             ->line('ST(0) = boolSV(pool_registry[slot].state == POOL_STATE_RUNNING);')
594             ->xs_return('1')
595             ->xs_end;
596              
597             # XS: pending_count (instance method - also has custom OP)
598 9         97 $builder->xs_function('xs_pool_pending_count')
599             ->xs_preamble
600             ->line('int slot = SvIV(SvRV(ST(0)));')
601             ->line('PoolContext *pool = &pool_registry[slot];')
602             ->line('if (pool->state != POOL_STATE_RUNNING) { ST(0) = sv_2mortal(newSViv(0)); XSRETURN(1); }')
603             ->line('pthread_mutex_lock(&pool->mutex);')
604             ->line('int count = pool->queue_count + pool->completed_count;')
605             ->line('pthread_mutex_unlock(&pool->mutex);')
606             ->line('ST(0) = sv_2mortal(newSViv(count));')
607             ->xs_return('1')
608             ->xs_end;
609              
610             # XS: get_notify_fd (instance method)
611 9         89 $builder->xs_function('xs_pool_get_notify_fd')
612             ->xs_preamble
613             ->line('int slot = SvIV(SvRV(ST(0)));')
614             ->line('PoolContext *pool = &pool_registry[slot];')
615             ->line('if (pool->state != POOL_STATE_RUNNING) { ST(0) = sv_2mortal(newSViv(-1)); XSRETURN(1); }')
616             ->line('ST(0) = sv_2mortal(newSViv(pool->notify_fd));')
617             ->xs_return('1')
618             ->xs_end;
619              
620             # XS: workers (instance method)
621 9         74 $builder->xs_function('xs_pool_workers')
622             ->xs_preamble
623             ->line('int slot = SvIV(SvRV(ST(0)));')
624             ->line('ST(0) = sv_2mortal(newSViv(pool_registry[slot].workers));')
625             ->xs_return('1')
626             ->xs_end;
627              
628             # XS: slot (instance method - returns the slot number)
629 9         78 $builder->xs_function('xs_pool_slot')
630             ->xs_preamble
631             ->line('int slot = SvIV(SvRV(ST(0)));')
632             ->line('ST(0) = sv_2mortal(newSViv(slot));')
633             ->xs_return('1')
634             ->xs_end;
635              
636             # XS: get_notify_fd_slot (class method for event loop)
637 9         95 $builder->xs_function('xs_pool_get_notify_fd_slot')
638             ->xs_preamble
639             ->line('if (items < 2) croak("Usage: Pool->_get_notify_fd_slot($slot)");')
640             ->line('int slot = SvIV(ST(1));')
641             ->line('if (slot < 0 || slot >= MAX_POOLS) { ST(0) = sv_2mortal(newSViv(-1)); XSRETURN(1); }')
642             ->line('PoolContext *pool = &pool_registry[slot];')
643             ->line('if (pool->state != POOL_STATE_RUNNING) { ST(0) = sv_2mortal(newSViv(-1)); XSRETURN(1); }')
644             ->line('ST(0) = sv_2mortal(newSViv(pool->notify_fd));')
645             ->xs_return('1')
646             ->xs_end;
647              
648             # XS: process_ready_slot (class method for event loop)
649 9         610 $builder->xs_function('xs_pool_process_ready_slot')
650             ->xs_preamble
651             ->line('int i;')
652             ->line('if (items < 2) croak("Usage: Pool->_process_ready_slot($slot)");')
653             ->line('int pool_slot = SvIV(ST(1));')
654             ->line('if (pool_slot < 0 || pool_slot >= MAX_POOLS) { ST(0) = sv_2mortal(newSViv(0)); XSRETURN(1); }')
655             ->line('PoolContext *pool = &pool_registry[pool_slot];')
656             ->line('if (pool->state != POOL_STATE_RUNNING) { ST(0) = sv_2mortal(newSViv(0)); XSRETURN(1); }')
657             ->line('pool_clear_notification_slot(pool_slot);')
658             ->line('int processed = 0;')
659             ->line('while (1) {')
660             ->line(' ThreadPoolOp *op = NULL;')
661             ->line(' pthread_mutex_lock(&pool->mutex);')
662             ->line(' if (pool->completed_head) {')
663             ->line(' op = pool->completed_head;')
664             ->line(' pool->completed_head = op->next;')
665             ->line(' if (!pool->completed_head) pool->completed_tail = NULL;')
666             ->line(' pool->completed_count--;')
667             ->line(' }')
668             ->line(' pthread_mutex_unlock(&pool->mutex);')
669             ->line(' if (!op) break;')
670             ->line(' dSP;')
671             ->line(' ENTER; SAVETMPS;')
672             ->line(' PUSHMARK(SP);')
673             ->line(' if (op->args && SvROK(op->args)) {')
674             ->line(' AV *args_av = (AV*)SvRV(op->args);')
675             ->line(' int len = av_len(args_av) + 1;')
676             ->line(' for (i = 0; i < len; i++) {')
677             ->line(' SV **elem = av_fetch(args_av, i, 0);')
678             ->line(' if (elem) XPUSHs(*elem);')
679             ->line(' }')
680             ->line(' }')
681             ->line(' PUTBACK;')
682             ->line(' int count = call_sv(op->code, G_EVAL | G_ARRAY);')
683             ->line(' SPAGAIN;')
684             ->line(' SV *errsv = ERRSV;')
685             ->line(' extern FutureContext future_registry[];')
686             ->line(' extern void future_invoke_callbacks(int, int);')
687             ->line(' FutureContext *ctx = &future_registry[op->future_slot];')
688             ->line(' if (SvTRUE(errsv)) {')
689             ->line(' STRLEN len;')
690             ->line(' const char *msg = SvPV(errsv, len);')
691             ->line(' ctx->fail_message = (char*)malloc(len + 1);')
692             ->line(' memcpy(ctx->fail_message, msg, len);')
693             ->line(' ctx->fail_message[len] = 0;')
694             ->line(' ctx->state = FUTURE_STATE_FAILED;')
695             ->line(' while (count-- > 0) POPs;')
696             ->line(' PUTBACK;')
697             ->line(' FREETMPS; LEAVE;')
698             ->line(' future_invoke_callbacks(op->future_slot, FUTURE_CB_FAIL);')
699             ->line(' } else if (count > 0) {')
700             ->line(' ctx->result_values = (SV**)malloc(count * sizeof(SV*));')
701             ->line(' for (i = count - 1; i >= 0; i--) {')
702             ->line(' ctx->result_values[i] = SvREFCNT_inc(POPs);')
703             ->line(' }')
704             ->line(' ctx->result_count = count;')
705             ->line(' ctx->state = FUTURE_STATE_DONE;')
706             ->line(' PUTBACK;')
707             ->line(' FREETMPS; LEAVE;')
708             ->line(' future_invoke_callbacks(op->future_slot, FUTURE_CB_DONE);')
709             ->line(' } else {')
710             ->line(' ctx->state = FUTURE_STATE_DONE;')
711             ->line(' PUTBACK;')
712             ->line(' FREETMPS; LEAVE;')
713             ->line(' future_invoke_callbacks(op->future_slot, FUTURE_CB_DONE);')
714             ->line(' }')
715             ->line(' if (op->code) SvREFCNT_dec(op->code);')
716             ->line(' if (op->args) SvREFCNT_dec(op->args);')
717             ->line(' free(op);')
718             ->line(' processed++;')
719             ->line('}')
720             ->line('ST(0) = sv_2mortal(newSViv(processed));')
721             ->xs_return('1')
722             ->xs_end;
723              
724             # XS: init_global - create/init default pool (class method)
725             # Usage: Pool->init_global() or Pool->init_global(workers => N)
726 9         484 $builder->xs_function('xs_pool_init_global')
727             ->xs_preamble
728             ->line('/* If default pool exists and is running, return it */')
729             ->line('if (default_pool_slot >= 0) {')
730             ->line(' PoolContext *pool = &pool_registry[default_pool_slot];')
731             ->line(' if (pool->in_use && pool->state == POOL_STATE_RUNNING) {')
732             ->line(' SV *slot_sv = newSViv(default_pool_slot);')
733             ->line(' SV *self_ref = newRV_noinc(slot_sv);')
734             ->line(' sv_bless(self_ref, gv_stashpv("Hypersonic::Future::Pool", GV_ADD));')
735             ->line(' ST(0) = sv_2mortal(self_ref);')
736             ->line(' XSRETURN(1);')
737             ->line(' }')
738             ->line('}')
739             ->line('/* Parse named parameters */')
740             ->line('int workers = 8;')
741             ->line('int queue_size = 4096;')
742             ->line('int i;')
743             ->line('for (i = 1; i < items - 1; i += 2) {')
744             ->line(' const char *key = SvPV_nolen(ST(i));')
745             ->line(' SV *val = ST(i + 1);')
746             ->line(' if (strEQ(key, "workers")) workers = SvIV(val);')
747             ->line(' else if (strEQ(key, "queue_size")) queue_size = SvIV(val);')
748             ->line('}')
749             ->line('/* Allocate slot */')
750             ->line('int slot = pool_alloc_slot();')
751             ->line('if (slot < 0) croak("Pool registry full (max %d pools)", MAX_POOLS);')
752             ->line('default_pool_slot = slot;')
753             ->line('PoolContext *pool = &pool_registry[slot];')
754             ->line('pool->workers = workers;')
755             ->line('pool->queue_size = queue_size;')
756             ->line('/* Initialize pool */')
757             ->line('pthread_mutex_init(&pool->mutex, NULL);')
758             ->line('pthread_cond_init(&pool->cond, NULL);')
759             ->line('pthread_cond_init(&pool->done_cond, NULL);')
760             ->line('#if USE_EVENTFD')
761             ->line('pool->notify_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);')
762             ->line('if (pool->notify_fd < 0) croak("eventfd() failed");')
763             ->line('#else')
764             ->line('if (pipe(pool->notify_pipe) < 0) croak("pipe() failed");')
765             ->line('fcntl(pool->notify_pipe[0], F_SETFL, O_NONBLOCK);')
766             ->line('fcntl(pool->notify_pipe[1], F_SETFL, O_NONBLOCK);')
767             ->line('pool->notify_fd = pool->notify_pipe[0];')
768             ->line('#endif')
769             ->line('pool->threads = (pthread_t*)malloc(pool->workers * sizeof(pthread_t));')
770             ->line('if (!pool->threads) croak("malloc failed for threads");')
771             ->line('pool->state = POOL_STATE_RUNNING;')
772             ->line('for (i = 0; i < pool->workers; i++) {')
773             ->line(' if (pthread_create(&pool->threads[i], NULL, pool_worker_fn, (void*)(intptr_t)slot) != 0) {')
774             ->line(' croak("pthread_create failed");')
775             ->line(' }')
776             ->line('}')
777             ->line('SV *slot_sv = newSViv(slot);')
778             ->line('SV *self_ref = newRV_noinc(slot_sv);')
779             ->line('sv_bless(self_ref, gv_stashpv("Hypersonic::Future::Pool", GV_ADD));')
780             ->line('ST(0) = sv_2mortal(self_ref);')
781             ->xs_return('1')
782             ->xs_end;
783              
784             # XS: shutdown_global - shutdown default pool (class method)
785 9         366 $builder->xs_function('xs_pool_shutdown_global')
786             ->xs_preamble
787             ->line('int i;')
788             ->line('if (default_pool_slot < 0) {')
789             ->line(' ST(0) = sv_2mortal(newSViv(0));')
790             ->line(' XSRETURN(1);')
791             ->line('}')
792             ->line('PoolContext *pool = &pool_registry[default_pool_slot];')
793             ->line('if (pool->state != POOL_STATE_RUNNING) {')
794             ->line(' ST(0) = sv_2mortal(newSViv(0));')
795             ->line(' XSRETURN(1);')
796             ->line('}')
797             ->line('pthread_mutex_lock(&pool->mutex);')
798             ->line('pool->state = POOL_STATE_SHUTDOWN;')
799             ->line('pthread_cond_broadcast(&pool->cond);')
800             ->line('pthread_mutex_unlock(&pool->mutex);')
801             ->line('for (i = 0; i < pool->workers; i++) {')
802             ->line(' pthread_join(pool->threads[i], NULL);')
803             ->line('}')
804             ->line('#if USE_EVENTFD')
805             ->line('close(pool->notify_fd);')
806             ->line('#else')
807             ->line('close(pool->notify_pipe[0]);')
808             ->line('close(pool->notify_pipe[1]);')
809             ->line('#endif')
810             ->line('pthread_mutex_destroy(&pool->mutex);')
811             ->line('pthread_cond_destroy(&pool->cond);')
812             ->line('pthread_cond_destroy(&pool->done_cond);')
813             ->line('pool_free_slot(default_pool_slot);')
814             ->line('default_pool_slot = -1;')
815             ->line('ST(0) = sv_2mortal(newSViv(1));')
816             ->xs_return('1')
817             ->xs_end;
818              
819             # XS: default_pool - get default pool (class method)
820 9         201 $builder->xs_function('xs_pool_default_pool')
821             ->xs_preamble
822             ->line('if (default_pool_slot < 0) {')
823             ->line(' ST(0) = &PL_sv_undef;')
824             ->line(' XSRETURN(1);')
825             ->line('}')
826             ->line('PoolContext *pool = &pool_registry[default_pool_slot];')
827             ->line('if (!pool->in_use) {')
828             ->line(' ST(0) = &PL_sv_undef;')
829             ->line(' XSRETURN(1);')
830             ->line('}')
831             ->line('SV *slot_sv = newSViv(default_pool_slot);')
832             ->line('SV *self_ref = newRV_noinc(slot_sv);')
833             ->line('sv_bless(self_ref, gv_stashpv("Hypersonic::Future::Pool", GV_ADD));')
834             ->line('ST(0) = sv_2mortal(self_ref);')
835             ->xs_return('1')
836             ->xs_end;
837             }
838              
839             sub compile {
840 0     0 0   my ($class, %opts) = @_;
841              
842 0 0         return 1 if $COMPILED;
843              
844             # Pool is compiled together with Future - just call Future's compile
845 0           Hypersonic::Future->compile(%opts);
846              
847 0           return 1;
848             }
849              
850             # All methods are XS - see get_xs_functions:
851             # new, init, shutdown, DESTROY
852             # submit, process_ready
853             # is_initialized, pending_count, get_notify_fd, workers, slot
854             # init_global, shutdown_global, default_pool
855              
856             1;
857              
858             __END__