File Coverage

blib/lib/MCE/Shared/Queue.pm
Criterion Covered Total %
statement 108 381 28.3
branch 31 242 12.8
condition 12 92 13.0
subroutine 23 46 50.0
pod 15 15 100.0
total 189 776 24.3


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Hybrid-queue helper class.
4             ##
5             ###############################################################################
6              
7             package MCE::Shared::Queue;
8              
9 8     8   8833 use strict;
  8         16  
  8         232  
10 8     8   40 use warnings;
  8         16  
  8         224  
11              
12 8     8   126 use 5.010001;
  8         29  
13              
14 8     8   48 no warnings qw( threads recursion uninitialized numeric );
  8         16  
  8         523  
15              
16             our $VERSION = '1.881';
17              
18             ## no critic (Subroutines::ProhibitExplicitReturnUndef)
19              
20 8     8   53 use Scalar::Util qw( looks_like_number );
  8         16  
  8         347  
21 8     8   43 use MCE::Shared::Base ();
  8         13  
  8         187  
22 8     8   40 use MCE::Util ();
  8         21  
  8         99  
23 8     8   32 use MCE::Mutex ();
  8         16  
  8         433  
24              
25             use overload (
26 8         46 q("") => \&MCE::Shared::Base::_stringify,
27             q(0+) => \&MCE::Shared::Base::_numify,
28             fallback => 1
29 8     8   40 );
  8         11  
30              
31             ###############################################################################
32             ## ----------------------------------------------------------------------------
33             ## Attributes used internally.
34             ## _qr_sock _qw_sock _datp _datq _dsem _heap _init_pid _porder _type
35             ## _ar_sock _aw_sock _asem _tsem
36             ##
37             ###############################################################################
38              
39             our ($HIGHEST, $LOWEST, $FIFO, $LIFO, $LILO, $FILO) = (1, 0, 1, 0, 1, 0);
40             my ($PORDER, $TYPE, $AWAIT) = ($HIGHEST, $FIFO, 0);
41              
42             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
43             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
44             my $_reset_flg = 1;
45              
46             my %_valid_fields_new = map { $_ => 1 } qw(
47             await barrier fast porder queue type
48             );
49              
50             sub _croak {
51 0     0   0 goto &MCE::Shared::Base::_croak;
52             }
53             sub CLONE {
54 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
55             }
56              
57             sub DESTROY {
58 26     26   91 my ($_Q) = @_;
59 26 50       107 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
60              
61 26         396 undef $_Q->{_datp}, undef $_Q->{_datq}, undef $_Q->{_heap};
62              
63 26 50       128 if ($_Q->{_init_pid} eq $_pid) {
64 26         155 MCE::Util::_destroy_socks($_Q, qw(_aw_sock _ar_sock _qw_sock _qr_sock));
65             }
66              
67 26         1621 return;
68             }
69              
70             ###############################################################################
71             ## ----------------------------------------------------------------------------
72             ## Instance instantiation.
73             ##
74             ###############################################################################
75              
76             # new ( options )
77              
78             sub new {
79 26     26 1 201 my ($_class, %_argv) = @_;
80 26   33     53 my $_Q = {}; bless($_Q, ref($_class) || $_class);
  26         173  
81              
82 26         129 for my $_p (keys %_argv) {
83             _croak("Queue: ($_p) is not a valid constructor argument")
84 36 50       99 unless (exists $_valid_fields_new{$_p});
85             }
86              
87 26         255 $_Q->{_asem} = 0; # Semaphore count variable for the ->await method
88 26         60 $_Q->{_datp} = {}; # Priority data { p1 => [ ], p2 => [ ], pN => [ ] }
89 26         52 $_Q->{_heap} = []; # Priority heap [ pN, p2, p1 ] in heap order
90             # fyi, _datp will always dequeue before _datq
91              
92             # --------------------------------------------------------------------------
93              
94 26 50       71 $_Q->{_await} = defined $_argv{await} ? $_argv{await} : $AWAIT;
95 26 100       125 $_Q->{_porder} = defined $_argv{porder} ? $_argv{porder} : $PORDER;
96 26 50       86 $_Q->{_type} = defined $_argv{type} ? $_argv{type} : $TYPE;
97              
98 26 50       53 if (exists $_argv{queue}) {
99             _croak('Queue: (queue) is not an ARRAY reference')
100 0 0       0 if (ref $_argv{queue} ne 'ARRAY');
101 0         0 $_Q->{_datq} = $_argv{queue};
102             }
103             else {
104 26         52 $_Q->{_datq} = [];
105             }
106              
107             # --------------------------------------------------------------------------
108              
109 26 50       143 $_Q->{_init_pid} = $_tid ? $$ .'.'. $_tid : $$;
110 26         55 $_Q->{_dsem} = 0;
111              
112 26         210 MCE::Util::_sock_pair($_Q, qw(_qr_sock _qw_sock), undef, 1);
113 26 50       6367 MCE::Util::_sock_pair($_Q, qw(_ar_sock _aw_sock), undef, 1) if $_Q->{_await};
114              
115             MCE::Shared::Object::_reset(), $_reset_flg = ''
116 26 50 66     242 if ($_reset_flg && $INC{'MCE/Shared/Server.pm'});
117              
118 26         143 return $_Q;
119             }
120              
121             ###############################################################################
122             ## ----------------------------------------------------------------------------
123             ## Public methods.
124             ##
125             ###############################################################################
126              
127             # await ( pending_threshold )
128              
129             sub await {
130             # Handled by MCE::Shared::Object when shared.
131 0     0 1 0 return;
132             }
133              
134             # clear ( )
135              
136             sub clear {
137 0     0 1 0 my ($_Q) = @_;
138              
139 0         0 %{ $_Q->{_datp} } = ();
  0         0  
140 0         0 @{ $_Q->{_datq} } = ();
  0         0  
141 0         0 @{ $_Q->{_heap} } = ();
  0         0  
142              
143 0         0 return;
144             }
145              
146             # end ( )
147              
148             sub end {
149 0     0 1 0 my ($_Q) = @_;
150              
151 0 0       0 if (!exists $_Q->{_ended}) {
152 0         0 for my $_i (1 .. $_Q->{_dsem}) { syswrite($_Q->{_qw_sock}, $LF) }
  0         0  
153 0         0 $_Q->{_dsem} = 0, $_Q->{_ended} = undef;
154             }
155              
156 0         0 return;
157             }
158              
159             # enqueue ( item [, item, ... ] )
160              
161             sub enqueue {
162 0     0 1 0 my $_Q = shift;
163              
164 0 0       0 return unless (scalar @_);
165              
166 0 0       0 if (exists $_Q->{_ended}) {
167 0         0 warn "Queue: (enqueue) called on queue that has been 'end'ed\n";
168 0         0 return;
169             }
170              
171 0 0       0 if ($_Q->{_dsem}) {
172 0         0 for my $_i (1 .. scalar @_) {
173 0         0 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
174 0 0       0 last unless $_Q->{_dsem};
175             }
176             }
177              
178 0         0 push @{ $_Q->{_datq} }, @_;
  0         0  
179              
180 0         0 return;
181             }
182              
183             # enqueuep ( priority, item [, item, ... ] )
184              
185             sub enqueuep {
186 0     0 1 0 my ($_Q, $_p) = (shift, shift);
187              
188 0 0 0     0 _croak('Queue: (enqueuep priority) is not an integer')
189             if (!looks_like_number($_p) || int($_p) != $_p);
190              
191 0 0       0 return unless (scalar @_);
192              
193 0 0       0 if (exists $_Q->{_ended}) {
194 0         0 warn "Queue: (enqueuep) called on queue that has been 'end'ed\n";
195 0         0 return;
196             }
197              
198 0 0       0 if ($_Q->{_dsem}) {
199 0         0 for my $_i (1 .. scalar @_) {
200 0         0 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
201 0 0       0 last unless $_Q->{_dsem};
202             }
203             }
204              
205 0         0 $_Q->_enqueuep($_p, @_);
206              
207 0         0 return;
208             }
209              
210             # dequeue ( count )
211             # dequeue ( )
212              
213             sub dequeue {
214 0     0 1 0 my ($_Q, $_cnt) = @_;
215 0         0 my (@_items, $_has_data, $_buf);
216              
217 0 0 0     0 if (defined $_cnt && $_cnt ne '1') {
218 0 0 0     0 _croak('Queue: (dequeue count argument) is not valid')
      0        
219             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
220              
221 0         0 my $_pending = @{ $_Q->{_datq} };
  0         0  
222              
223 0 0 0     0 if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
  0         0  
224 0         0 for my $_h (@{ $_Q->{_heap} }) {
  0         0  
225 0         0 $_pending += @{ $_Q->{_datp}->{$_h} };
  0         0  
226             }
227             }
228 0 0       0 $_cnt = $_pending if $_pending < $_cnt;
229              
230 0         0 for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() }
  0         0  
231             }
232             else {
233 0 0 0     0 $_has_data = ( @{ $_Q->{_datq} } || @{ $_Q->{_heap} } ) ? 1 : 0;
234 0         0 $_buf = $_Q->_dequeue();
235             }
236              
237 0 0       0 return @_items if (scalar @_items);
238 0 0       0 return $_buf if ($_has_data);
239 0 0       0 return () if (exists $_Q->{_ended});
240              
241 0         0 $_Q->{_dsem} += 1, MCE::Util::_sysread($_Q->{_qr_sock}, my($_next), 1);
242              
243 0         0 goto \&dequeue;
244             }
245              
246             # dequeue_nb ( count )
247             # dequeue_nb ( )
248              
249             sub dequeue_nb {
250 0     0 1 0 my ($_Q, $_cnt) = @_;
251              
252 0 0 0     0 if (defined $_cnt && $_cnt ne '1') {
253 0 0 0     0 _croak('Queue: (dequeue count argument) is not valid')
      0        
254             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
255              
256 0         0 my $_pending = @{ $_Q->{_datq} };
  0         0  
257              
258 0 0 0     0 if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
  0         0  
259 0         0 for my $_h (@{ $_Q->{_heap} }) {
  0         0  
260 0         0 $_pending += @{ $_Q->{_datp}->{$_h} };
  0         0  
261             }
262             }
263              
264 0 0       0 $_cnt = $_pending if $_pending < $_cnt;
265              
266 0         0 return map { $_Q->_dequeue() } 1 .. $_cnt;
  0         0  
267             }
268              
269 0         0 my $_buf = $_Q->_dequeue();
270              
271 0 0       0 return defined($_buf) ? $_buf : ();
272             }
273              
274             # pending ( )
275              
276             sub pending {
277 0     0 1 0 my ($_Q) = @_;
278 0         0 my $_pending = @{ $_Q->{_datq} };
  0         0  
279              
280 0 0       0 if (scalar @{ $_Q->{_heap} }) {
  0         0  
281 0         0 for my $_h (@{ $_Q->{_heap} }) {
  0         0  
282 0         0 $_pending += @{ $_Q->{_datp}->{$_h} };
  0         0  
283             }
284             }
285              
286             return (exists $_Q->{_ended})
287 0 0       0 ? $_pending ? $_pending : undef
    0          
288             : $_pending;
289             }
290              
291             # insert ( index, item [, item, ... ] )
292              
293             sub insert {
294 0     0 1 0 my ($_Q, $_i) = (shift, shift);
295              
296 0 0 0     0 _croak('Queue: (insert index) is not an integer')
297             if (!looks_like_number($_i) || int($_i) != $_i);
298              
299 0 0       0 return unless (scalar @_);
300              
301 0 0       0 if (exists $_Q->{_ended}) {
302 0         0 warn "Queue: (insert) called on queue that has been 'end'ed\n";
303 0         0 return;
304             }
305              
306 0 0       0 if ($_Q->{_dsem}) {
307 0         0 for my $_i (1 .. scalar @_) {
308 0         0 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
309 0 0       0 last unless $_Q->{_dsem};
310             }
311             }
312              
313 0 0       0 if (abs($_i) > scalar @{ $_Q->{_datq} }) {
  0         0  
314 0 0       0 if ($_i >= 0) {
315 0 0       0 if ($_Q->{_type}) {
316 0         0 push @{ $_Q->{_datq} }, @_;
  0         0  
317             } else {
318 0         0 unshift @{ $_Q->{_datq} }, @_;
  0         0  
319             }
320             }
321             else {
322 0 0       0 if ($_Q->{_type}) {
323 0         0 unshift @{ $_Q->{_datq} }, @_;
  0         0  
324             } else {
325 0         0 push @{ $_Q->{_datq} }, @_;
  0         0  
326             }
327             }
328             }
329             else {
330 0 0       0 if (!$_Q->{_type}) {
331             $_i = ($_i >= 0)
332 0 0       0 ? scalar(@{ $_Q->{_datq} }) - $_i
  0         0  
333             : abs($_i);
334             }
335 0         0 splice @{ $_Q->{_datq} }, $_i, 0, @_;
  0         0  
336             }
337              
338 0         0 return;
339             }
340              
341             # insertp ( priority, index, item [, item, ... ] )
342              
343             sub insertp {
344 0     0 1 0 my ($_Q, $_p, $_i) = (shift, shift, shift);
345              
346 0 0 0     0 _croak('Queue: (insertp priority) is not an integer')
347             if (!looks_like_number($_p) || int($_p) != $_p);
348 0 0 0     0 _croak('Queue: (insertp index) is not an integer')
349             if (!looks_like_number($_i) || int($_i) != $_i);
350              
351 0 0       0 return unless (scalar @_);
352              
353 0 0       0 if (exists $_Q->{_ended}) {
354 0         0 warn "Queue: (insertp) called on queue that has been 'end'ed\n";
355 0         0 return;
356             }
357              
358 0 0       0 if ($_Q->{_dsem}) {
359 0         0 for my $_i (1 .. scalar @_) {
360 0         0 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
361 0 0       0 last unless $_Q->{_dsem};
362             }
363             }
364              
365 0 0 0     0 if (exists $_Q->{_datp}->{$_p} && scalar @{ $_Q->{_datp}->{$_p} }) {
  0         0  
366              
367 0 0       0 if (abs($_i) > scalar @{ $_Q->{_datp}->{$_p} }) {
  0         0  
368 0 0       0 if ($_i >= 0) {
369 0 0       0 if ($_Q->{_type}) {
370 0         0 push @{ $_Q->{_datp}->{$_p} }, @_;
  0         0  
371             } else {
372 0         0 unshift @{ $_Q->{_datp}->{$_p} }, @_;
  0         0  
373             }
374             }
375             else {
376 0 0       0 if ($_Q->{_type}) {
377 0         0 unshift @{ $_Q->{_datp}->{$_p} }, @_;
  0         0  
378             } else {
379 0         0 push @{ $_Q->{_datp}->{$_p} }, @_;
  0         0  
380             }
381             }
382             }
383             else {
384 0 0       0 if (!$_Q->{_type}) {
385             $_i = ($_i >=0)
386 0 0       0 ? scalar(@{ $_Q->{_datp}->{$_p} }) - $_i
  0         0  
387             : abs($_i);
388             }
389 0         0 splice @{ $_Q->{_datp}->{$_p} }, $_i, 0, @_;
  0         0  
390             }
391             }
392             else {
393 0         0 $_Q->_enqueuep($_p, @_);
394             }
395              
396 0         0 return;
397             }
398              
399             # peek ( index )
400             # peek ( )
401              
402             sub peek {
403 0     0 1 0 my ($_Q, $_i) = @_;
404              
405 0 0       0 if ($_i) {
406 0 0 0     0 _croak('Queue: (peek index) is not an integer')
407             if (!looks_like_number($_i) || int($_i) != $_i);
408             }
409 0         0 else { $_i = 0 }
410              
411 0 0       0 return undef if (abs($_i) > scalar @{ $_Q->{_datq} });
  0         0  
412              
413 0 0       0 if (!$_Q->{_type}) {
414             $_i = ($_i >= 0)
415 0 0       0 ? scalar(@{ $_Q->{_datq} }) - ($_i + 1)
  0         0  
416             : abs($_i + 1);
417             }
418              
419 0         0 return $_Q->{_datq}->[$_i];
420             }
421              
422             # peekp ( priority, index )
423             # peekp ( priority )
424              
425             sub peekp {
426 0     0 1 0 my ($_Q, $_p, $_i) = @_;
427              
428 0 0       0 if ($_i) {
429 0 0 0     0 _croak('Queue: (peekp index) is not an integer')
430             if (!looks_like_number($_i) || int($_i) != $_i);
431             }
432 0         0 else { $_i = 0 }
433              
434 0 0 0     0 _croak('Queue: (peekp priority) is not an integer')
435             if (!looks_like_number($_p) || int($_p) != $_p);
436              
437 0 0       0 return undef unless (exists $_Q->{_datp}->{$_p});
438 0 0       0 return undef if (abs($_i) > scalar @{ $_Q->{_datp}->{$_p} });
  0         0  
439              
440 0 0       0 if (!$_Q->{_type}) {
441             $_i = ($_i >= 0)
442 0 0       0 ? scalar(@{ $_Q->{_datp}->{$_p} }) - ($_i + 1)
  0         0  
443             : abs($_i + 1);
444             }
445              
446 0         0 return $_Q->{_datp}->{$_p}->[$_i];
447             }
448              
449             # peekh ( index )
450             # peekh ( )
451              
452             sub peekh {
453 0     0 1 0 my ($_Q, $_i) = @_;
454              
455 0 0       0 if ($_i) {
456 0 0 0     0 _croak('Queue: (peekh index) is not an integer')
457             if (!looks_like_number($_i) || int($_i) != $_i);
458             }
459 0         0 else { $_i = 0 }
460              
461 0 0       0 return undef if (abs($_i) > scalar @{ $_Q->{_heap} });
  0         0  
462 0         0 return $_Q->{_heap}->[$_i];
463             }
464              
465             # heap ( )
466              
467             sub heap {
468 0     0 1 0 return @{ shift->{_heap} };
  0         0  
469             }
470              
471             ###############################################################################
472             ## ----------------------------------------------------------------------------
473             ## Private methods.
474             ##
475             ###############################################################################
476              
477             # Add items to the tail of the queue with priority level.
478              
479             sub _enqueuep {
480 0     0   0 my ($_Q, $_p) = (shift, shift);
481              
482             # Enlist priority into the heap.
483 0 0 0     0 if (!exists $_Q->{_datp}->{$_p} || @{ $_Q->{_datp}->{$_p} } == 0) {
  0         0  
484              
485 0 0       0 unless (scalar @{ $_Q->{_heap} }) {
  0 0       0  
486 0         0 push @{ $_Q->{_heap} }, $_p;
  0         0  
487             }
488 0         0 elsif ($_Q->{_porder}) {
489 0         0 $_Q->_heap_insert_high($_p);
490             }
491             else {
492 0         0 $_Q->_heap_insert_low($_p);
493             }
494             }
495              
496             # Append item(s) into the queue.
497 0         0 push @{ $_Q->{_datp}->{$_p} }, @_;
  0         0  
498              
499 0         0 return;
500             }
501              
502             # Return one item from the queue.
503              
504             sub _dequeue {
505 0     0   0 my ($_Q) = @_;
506              
507             # Return item from the non-priority queue.
508 0 0       0 unless (scalar @{ $_Q->{_heap} }) {
  0         0  
509             return ($_Q->{_type})
510 0 0       0 ? shift @{ $_Q->{_datq} } : pop @{ $_Q->{_datq} };
  0         0  
  0         0  
511             }
512              
513 0         0 my $_p = $_Q->{_heap}->[0];
514              
515             # Delist priority from the heap when 1 item remains.
516 0 0       0 shift @{ $_Q->{_heap} } if (@{ $_Q->{_datp}->{$_p} } == 1);
  0         0  
  0         0  
517              
518             # Return item from the priority queue.
519             return ($_Q->{_type})
520 0 0       0 ? shift @{ $_Q->{_datp}->{$_p} } : pop @{ $_Q->{_datp}->{$_p} };
  0         0  
  0         0  
521             }
522              
523             # Helper method for getting the reference to the underlying array.
524             # Use with test scripts for comparing data only (not a public API).
525              
526             sub _get_aref {
527 0     0   0 my ($_Q, $_p) = @_;
528              
529 0 0       0 if (defined $_p) {
530 0 0 0     0 _croak('Queue: (get_aref priority) is not an integer')
531             if (!looks_like_number($_p) || int($_p) != $_p);
532              
533 0 0       0 return undef unless (exists $_Q->{_datp}->{$_p});
534 0         0 return $_Q->{_datp}->{$_p};
535             }
536              
537 0         0 return $_Q->{_datq};
538             }
539              
540             # Insert priority into the heap. A lower priority level comes first.
541              
542             sub _heap_insert_low {
543 0     0   0 my ($_Q, $_p) = @_;
544              
545             # Insert priority at the head of the heap.
546 0 0       0 if ($_p < $_Q->{_heap}->[0]) {
    0          
547 0         0 unshift @{ $_Q->{_heap} }, $_p;
  0         0  
548             }
549              
550             # Insert priority at the end of the heap.
551             elsif ($_p > $_Q->{_heap}->[-1]) {
552 0         0 push @{ $_Q->{_heap} }, $_p;
  0         0  
553             }
554              
555             # Insert priority through binary search.
556             else {
557 0         0 my $_lower = 0; my $_upper = @{ $_Q->{_heap} };
  0         0  
  0         0  
558              
559 0         0 while ($_lower < $_upper) {
560 0         0 my $_midpoint = $_lower + (($_upper - $_lower) >> 1);
561 0 0       0 if ($_p > $_Q->{_heap}->[$_midpoint]) {
562 0         0 $_lower = $_midpoint + 1;
563             } else {
564 0         0 $_upper = $_midpoint;
565             }
566             }
567              
568             # Insert priority into the heap.
569 0         0 splice @{ $_Q->{_heap} }, $_lower, 0, $_p;
  0         0  
570             }
571              
572 0         0 return;
573             }
574              
575             # Insert priority into the heap. A higher priority level comes first.
576              
577             sub _heap_insert_high {
578 0     0   0 my ($_Q, $_p) = @_;
579              
580             # Insert priority at the head of the heap.
581 0 0       0 if ($_p > $_Q->{_heap}->[0]) {
    0          
582 0         0 unshift @{ $_Q->{_heap} }, $_p;
  0         0  
583             }
584              
585             # Insert priority at the end of the heap.
586             elsif ($_p < $_Q->{_heap}->[-1]) {
587 0         0 push @{ $_Q->{_heap} }, $_p;
  0         0  
588             }
589              
590             # Insert priority through binary search.
591             else {
592 0         0 my $_lower = 0; my $_upper = @{ $_Q->{_heap} };
  0         0  
  0         0  
593              
594 0         0 while ($_lower < $_upper) {
595 0         0 my $_midpoint = $_lower + (($_upper - $_lower) >> 1);
596 0 0       0 if ($_p < $_Q->{_heap}->[$_midpoint]) {
597 0         0 $_lower = $_midpoint + 1;
598             } else {
599 0         0 $_upper = $_midpoint;
600             }
601             }
602              
603             # Insert priority into the heap.
604 0         0 splice @{ $_Q->{_heap} }, $_lower, 0, $_p;
  0         0  
605             }
606              
607 0         0 return;
608             }
609              
610             ###############################################################################
611             ## ----------------------------------------------------------------------------
612             ## Server functions.
613             ##
614             ###############################################################################
615              
616             {
617 8     8   28539 use bytes;
  8         19  
  8         45  
618              
619             use constant {
620 8         8191 SHR_O_QUA => 'O~QUA', # Queue await
621             SHR_O_QUD => 'O~QUD', # Queue dequeue
622             SHR_O_QUN => 'O~QUN', # Queue dequeue non-blocking
623 8     8   371 };
  8         24  
624              
625             my (
626             $_DAU_R_SOCK_REF, $_DAU_R_SOCK, $_obj, $_freeze, $_thaw,
627             $_cnt, $_id, $_has_data, $_pending, $_t
628             );
629              
630             my %_output_function = (
631              
632             SHR_O_QUA.$LF => sub { # Queue await
633             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
634              
635             chomp($_id = <$_DAU_R_SOCK>),
636             chomp($_t = <$_DAU_R_SOCK>);
637              
638             my $_Q = $_obj->{ $_id } || do {
639             print {$_DAU_R_SOCK} $LF;
640             };
641             $_Q->{_tsem} = $_t;
642              
643             if ($_Q->pending() <= $_t) {
644             syswrite($_Q->{_aw_sock}, $LF);
645             } else {
646             $_Q->{_asem} += 1;
647             }
648              
649             print {$_DAU_R_SOCK} $LF;
650              
651             return;
652             },
653              
654             SHR_O_QUD.$LF => sub { # Queue dequeue
655             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
656              
657             chomp($_id = <$_DAU_R_SOCK>),
658             chomp($_cnt = <$_DAU_R_SOCK>);
659              
660             $_cnt = 0 if ($_cnt == 1);
661              
662             my $_Q = $_obj->{ $_id } || do {
663             print {$_DAU_R_SOCK} '-1'.$LF;
664             return;
665             };
666              
667             my (@_items, $_buf);
668              
669             if ($_cnt) {
670             $_pending = @{ $_Q->{_datq} };
671              
672             if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
673             for my $_h (@{ $_Q->{_heap} }) {
674             $_pending += @{ $_Q->{_datp}->{$_h} };
675             }
676             }
677             $_cnt = $_pending if $_pending < $_cnt;
678              
679             for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() }
680             }
681             else {
682             $_has_data = ( @{ $_Q->{_datq} } || @{ $_Q->{_heap} } ) ? 1 : 0;
683             $_buf = $_Q->_dequeue();
684             }
685              
686             if ($_cnt) {
687             $_buf = $_freeze->(\@_items);
688             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
689             }
690             elsif ($_has_data) {
691             $_buf = $_freeze->([ $_buf ]);
692             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
693             }
694             elsif (exists $_Q->{_ended}) {
695             print {$_DAU_R_SOCK} '-2'.$LF;
696             }
697             else {
698             print {$_DAU_R_SOCK} '-1'.$LF;
699             $_Q->{_dsem} += 1;
700             }
701              
702             if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) {
703             for my $_i (1 .. $_Q->{_asem}) {
704             syswrite($_Q->{_aw_sock}, $LF);
705             }
706             $_Q->{_asem} = 0;
707             }
708              
709             return;
710             },
711              
712             SHR_O_QUN.$LF => sub { # Queue dequeue non-blocking
713             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
714              
715             chomp($_id = <$_DAU_R_SOCK>),
716             chomp($_cnt = <$_DAU_R_SOCK>);
717              
718             my $_Q = $_obj->{ $_id } || do {
719             print {$_DAU_R_SOCK} '-1'.$LF;
720             return;
721             };
722              
723             if ($_cnt == 1) {
724             my $_buf = $_Q->_dequeue();
725              
726             if (defined $_buf) {
727             $_buf = $_freeze->([ $_buf ]);
728             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
729             }
730             else {
731             print {$_DAU_R_SOCK} '-1'.$LF;
732             }
733             }
734             else {
735             my @_items;
736             my $_pending = @{ $_Q->{_datq} };
737              
738             if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
739             for my $_h (@{ $_Q->{_heap} }) {
740             $_pending += @{ $_Q->{_datp}->{$_h} };
741             }
742             }
743             $_cnt = $_pending if $_pending < $_cnt;
744              
745             for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() }
746              
747             if ($_cnt) {
748             my $_buf = $_freeze->(\@_items);
749             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
750             }
751             else {
752             print {$_DAU_R_SOCK} '-1'.$LF;
753             }
754             }
755              
756             if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) {
757             for my $_i (1 .. $_Q->{_asem}) {
758             syswrite($_Q->{_aw_sock}, $LF);
759             }
760             $_Q->{_asem} = 0;
761             }
762              
763             return;
764             },
765              
766             );
767              
768             sub _init_mgr {
769 0     0   0 my $_function;
770 0         0 ( $_DAU_R_SOCK_REF, $_obj, $_function, $_freeze, $_thaw ) = @_;
771              
772 0         0 for my $key ( keys %_output_function ) {
773 0 0       0 last if exists($_function->{$key});
774 0         0 $_function->{$key} = $_output_function{$key};
775             }
776              
777 0         0 return;
778             }
779             }
780              
781             ###############################################################################
782             ## ----------------------------------------------------------------------------
783             ## Object package.
784             ##
785             ###############################################################################
786              
787             ## Items below are folded into MCE::Shared::Object.
788              
789             package # hide from rpm
790             MCE::Shared::Object;
791              
792 8     8   65 use strict;
  8         51  
  8         231  
793 8     8   43 use warnings;
  8         16  
  8         285  
794              
795 8     8   45 no warnings qw( threads recursion uninitialized numeric once );
  8         11  
  8         309  
796              
797 8     8   43 use bytes;
  8         16  
  8         93  
798              
799 8     8   308 no overloading;
  8         16  
  8         7306  
800              
801             my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
802              
803             my ($_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, $_obj,
804             $_freeze, $_thaw);
805              
806             sub _init_queue {
807 22     22   251 ($_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, $_obj,
808             $_freeze, $_thaw) = @_;
809              
810 22         99 return;
811             }
812              
813             sub _req_queue {
814 20 50   20   109 local $\ = undef if (defined $\);
815 20 50       86 local $/ = $LF if ($/ ne $LF);
816 20         60 local $MCE::Signal::SIG;
817              
818             {
819 20         45 local $MCE::Signal::IPC = 1;
  20         44  
820 20 50       148 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
821              
822 20         1114 print({$_DAT_W_SOCK} $_[0].$LF . $_chn.$LF),
823 20         392 print({$_DAU_W_SOCK} $_[1]);
  20         1476  
824 20         5135 chomp($_[2] = <$_DAU_W_SOCK>);
825              
826 20 50       270 read($_DAU_W_SOCK, $_[3], $_[2]) if ($_[2] > 0);
827              
828 20 50       157 $_dat_un->() if !$_is_MSWin32;
829             }
830              
831 20 50       141 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
832             }
833              
834             sub await {
835 0     0   0 my $_id = shift()->[0];
836 0 0       0 return unless ( my $_Q = $_obj->{ $_id } );
837 0 0       0 return unless ( exists $_Q->{_qr_sock} );
838              
839 0   0     0 my $_t = shift || 0;
840              
841             _croak('Queue: (await) is not enabled for this queue')
842 0 0       0 unless (exists $_Q->{_ar_sock});
843 0 0 0     0 _croak('Queue: (await threshold) is not an integer')
844             if (!looks_like_number($_t) || int($_t) != $_t);
845              
846 0 0       0 $_t = 0 if ($_t < 0);
847 0         0 _req1('O~QUA', $_id.$LF . $_t.$LF);
848              
849 0 0       0 MCE::Util::_sock_ready($_Q->{_ar_sock}) if $_is_MSWin32;
850 0         0 MCE::Util::_sysread($_Q->{_ar_sock}, my($_b), 1);
851              
852 0         0 return;
853             }
854              
855             sub dequeue {
856 16     16   21378 my ($self, $_cnt) = @_;
857 16         121 my $_id = $self->[0];
858              
859 16 50       136 return unless ( my $_Q = $_obj->{ $_id } );
860 16 50       91 return unless ( exists $_Q->{_qr_sock} );
861              
862 16 100 100     158 if (defined $_cnt && $_cnt ne '1') {
863 6 50 33     182 _croak('Queue: (dequeue count argument) is not valid')
      33        
864             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
865             }
866             else {
867 10         24 $_cnt = 1;
868             }
869              
870 16         113 _req_queue('O~QUD', $_id.$LF . $_cnt.$LF, my($_len), my($_buf));
871              
872 16 100 66     398 return $_thaw->($_buf)[0] if ($_len > 0 && $_cnt == 1);
873 6 50       51 return @{ $_thaw->($_buf) } if ($_len > 0);
  6         309  
874 0 0       0 return if ($_len == -2);
875              
876 0 0       0 MCE::Util::_sock_ready($_Q->{_qr_sock}) if $_is_MSWin32;
877 0         0 MCE::Util::_sysread($_Q->{_qr_sock}, my($_next), 1);
878              
879 0         0 goto \&dequeue;
880             }
881              
882             sub dequeue_nb {
883 4     4   5035 my ($self, $_cnt) = @_;
884 4         25 my $_id = $self->[0];
885              
886 4 50       31 return unless ( my $_Q = $_obj->{ $_id } );
887 4 50       27 return unless ( exists $_Q->{_qr_sock} );
888              
889 4 50 33     35 if (defined $_cnt && $_cnt ne '1') {
890 0 0 0     0 _croak('Queue: (dequeue_nb count argument) is not valid')
      0        
891             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
892             }
893             else {
894 4         13 $_cnt = 1;
895             }
896              
897 4         35 _req_queue('O~QUN', $_id.$LF . $_cnt.$LF, my($_len), my($_buf));
898              
899 4 50       37 return if ($_len < 0);
900              
901             ($_cnt == 1)
902             ? $_thaw->($_buf)[0]
903 4 50       103 : @{ $_thaw->($_buf) };
  0         0  
904             }
905              
906             sub pending {
907 4 50 33 4   6205 (@_ == 1 && !wantarray) ? _size('pending', @_) : _auto('pending', @_);
908             }
909              
910             1;
911              
912             __END__