File Coverage

blib/lib/MCE/Shared/Queue.pm
Criterion Covered Total %
statement 121 436 27.7
branch 38 288 13.1
condition 14 121 11.5
subroutine 24 48 50.0
pod 16 16 100.0
total 213 909 23.4


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Hybrid-queue helper class.
4             ##
5             ###############################################################################
6              
7             package MCE::Shared::Queue;
8              
9 8     8   9380 use strict;
  8         16  
  8         200  
10 8     8   32 use warnings;
  8         16  
  8         166  
11              
12 8     8   130 use 5.010001;
  8         19  
13              
14 8     8   37 no warnings qw( threads recursion uninitialized numeric );
  8         37  
  8         412  
15              
16             our $VERSION = '1.885';
17              
18             ## no critic (Subroutines::ProhibitExplicitReturnUndef)
19              
20 8     8   43 use Scalar::Util qw( looks_like_number );
  8         13  
  8         335  
21 8     8   43 use MCE::Shared::Base ();
  8         8  
  8         154  
22 8     8   45 use MCE::Util ();
  8         8  
  8         83  
23 8     8   29 use MCE::Mutex ();
  8         56  
  8         331  
24              
25             use overload (
26 8         43 q("") => \&MCE::Shared::Base::_stringify,
27             q(0+) => \&MCE::Shared::Base::_numify,
28             fallback => 1
29 8     8   40 );
  8         16  
30              
31             ###############################################################################
32             ## ----------------------------------------------------------------------------
33             ## Attributes used internally.
34             ## _qr_sock _qw_sock _datp _datq _dsem _heap _init_pid _porder _type
35             ## _ar_sock _aw_sock _asem _tsem
36             ##
37             ###############################################################################
38              
39             our ($HIGHEST, $LOWEST, $FIFO, $LIFO, $LILO, $FILO) = (1, 0, 1, 0, 1, 0);
40             my ($PORDER, $TYPE, $AWAIT) = ($HIGHEST, $FIFO, 0);
41              
42             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
43             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
44             my $_reset_flg = 1;
45              
46             my %_valid_fields_new = map { $_ => 1 } qw(
47             await barrier fast porder queue type
48             );
49              
50             sub _croak {
51 0     0   0 goto &MCE::Shared::Base::_croak;
52             }
53             sub CLONE {
54 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
55             }
56              
57             sub DESTROY {
58 26     26   66 my ($_Q) = @_;
59 26 50       84 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
60              
61 26         199 undef $_Q->{_datp}, undef $_Q->{_datq}, undef $_Q->{_heap};
62              
63 26 50       150 if ($_Q->{_init_pid} eq $_pid) {
64 26         152 MCE::Util::_destroy_socks($_Q, qw(_aw_sock _ar_sock _qw_sock _qr_sock));
65             }
66              
67 26         1327 return;
68             }
69              
70             ###############################################################################
71             ## ----------------------------------------------------------------------------
72             ## Instance instantiation.
73             ##
74             ###############################################################################
75              
76             # new ( options )
77              
78             sub new {
79 26     26 1 147 my ($_class, %_argv) = @_;
80 26   33     65 my $_Q = {}; bless($_Q, ref($_class) || $_class);
  26         142  
81              
82 26         114 for my $_p (keys %_argv) {
83             _croak("Queue: ($_p) is not a valid constructor argument")
84 36 50       109 unless (exists $_valid_fields_new{$_p});
85             }
86              
87 26         246 $_Q->{_asem} = 0; # Semaphore count variable for the ->await method
88 26         48 $_Q->{_datp} = {}; # Priority data { p1 => [ ], p2 => [ ], pN => [ ] }
89 26         57 $_Q->{_heap} = []; # Priority heap [ pN, p2, p1 ] in heap order
90             # fyi, _datp will always dequeue before _datq
91              
92             # --------------------------------------------------------------------------
93              
94 26 50       73 $_Q->{_await} = defined $_argv{await} ? $_argv{await} : $AWAIT;
95 26 100       65 $_Q->{_porder} = defined $_argv{porder} ? $_argv{porder} : $PORDER;
96 26 50       204 $_Q->{_type} = defined $_argv{type} ? $_argv{type} : $TYPE;
97              
98 26 50       60 if (exists $_argv{queue}) {
99             _croak('Queue: (queue) is not an ARRAY reference')
100 0 0       0 if (ref $_argv{queue} ne 'ARRAY');
101 0         0 $_Q->{_datq} = $_argv{queue};
102             }
103             else {
104 26         45 $_Q->{_datq} = [];
105             }
106              
107             # --------------------------------------------------------------------------
108              
109 26         152 $_Q->{_qr_mutex} = MCE::Mutex->new();
110 26 50       20640 $_Q->{_init_pid} = $_tid ? $$ .'.'. $_tid : $$;
111 26         335 $_Q->{_dsem} = 0;
112              
113 26         133 MCE::Util::_sock_pair($_Q, qw(_qr_sock _qw_sock), undef, 1);
114 26 50       5099 MCE::Util::_sock_pair($_Q, qw(_ar_sock _aw_sock), undef, 1) if $_Q->{_await};
115              
116             MCE::Shared::Object::_reset(), $_reset_flg = ''
117 26 50 66     161 if ($_reset_flg && $INC{'MCE/Shared/Server.pm'});
118              
119 26         131 return $_Q;
120             }
121              
122             ###############################################################################
123             ## ----------------------------------------------------------------------------
124             ## Public methods.
125             ##
126             ###############################################################################
127              
128             # await ( pending_threshold )
129              
130             sub await {
131             # Handled by MCE::Shared::Object when shared.
132 0     0 1 0 return;
133             }
134              
135             # clear ( )
136              
137             sub clear {
138 0     0 1 0 my ($_Q) = @_;
139              
140 0         0 %{ $_Q->{_datp} } = ();
  0         0  
141 0         0 @{ $_Q->{_datq} } = ();
  0         0  
142 0         0 @{ $_Q->{_heap} } = ();
  0         0  
143              
144 0         0 return;
145             }
146              
147             # end ( )
148              
149             sub end {
150 0     0 1 0 my ($_Q) = @_;
151              
152 0 0       0 if (!exists $_Q->{_ended}) {
153 0         0 for my $_i (1 .. $_Q->{_dsem}) { syswrite($_Q->{_qw_sock}, $LF) }
  0         0  
154 0         0 $_Q->{_dsem} = 0, $_Q->{_ended} = undef;
155             }
156              
157 0         0 return;
158             }
159              
160             # enqueue ( item [, item, ... ] )
161              
162             sub enqueue {
163 0     0 1 0 my $_Q = shift;
164              
165 0 0       0 return unless (scalar @_);
166              
167 0 0       0 if (exists $_Q->{_ended}) {
168 0         0 warn "Queue: (enqueue) called on queue that has been 'end'ed\n";
169 0         0 return;
170             }
171              
172 0 0       0 if ($_Q->{_dsem}) {
173 0         0 for my $_i (1 .. scalar @_) {
174 0         0 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
175 0 0       0 last unless $_Q->{_dsem};
176             }
177             }
178              
179 0         0 push @{ $_Q->{_datq} }, @_;
  0         0  
180              
181 0         0 return;
182             }
183              
184             # enqueuep ( priority, item [, item, ... ] )
185              
186             sub enqueuep {
187 0     0 1 0 my ($_Q, $_p) = (shift, shift);
188              
189 0 0 0     0 _croak('Queue: (enqueuep priority) is not an integer')
190             if (!looks_like_number($_p) || int($_p) != $_p);
191              
192 0 0       0 return unless (scalar @_);
193              
194 0 0       0 if (exists $_Q->{_ended}) {
195 0         0 warn "Queue: (enqueuep) called on queue that has been 'end'ed\n";
196 0         0 return;
197             }
198              
199 0 0       0 if ($_Q->{_dsem}) {
200 0         0 for my $_i (1 .. scalar @_) {
201 0         0 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
202 0 0       0 last unless $_Q->{_dsem};
203             }
204             }
205              
206 0         0 $_Q->_enqueuep($_p, @_);
207              
208 0         0 return;
209             }
210              
211             # dequeue ( )
212             # dequeue ( count )
213              
214             sub dequeue {
215 0     0 1 0 my ($_Q, $_cnt) = @_;
216 0         0 my (@_items, $_has_data, $_buf);
217              
218 0 0 0     0 if (defined $_cnt && $_cnt ne '1') {
219 0 0 0     0 _croak('Queue: (dequeue count argument) is not valid')
      0        
220             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
221              
222 0         0 my $_pending = @{ $_Q->{_datq} };
  0         0  
223              
224 0 0 0     0 if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
  0         0  
225 0         0 for my $_h (@{ $_Q->{_heap} }) {
  0         0  
226 0         0 $_pending += @{ $_Q->{_datp}->{$_h} };
  0         0  
227             }
228             }
229 0 0       0 $_cnt = $_pending if $_pending < $_cnt;
230              
231 0         0 for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() }
  0         0  
232             }
233             else {
234 0 0 0     0 $_has_data = ( @{ $_Q->{_datq} } || @{ $_Q->{_heap} } ) ? 1 : 0;
235 0         0 $_buf = $_Q->_dequeue();
236             }
237              
238 0 0       0 return @_items if (scalar @_items);
239 0 0       0 return $_buf if ($_has_data);
240 0 0       0 return () if (exists $_Q->{_ended});
241              
242 0         0 $_Q->{_dsem} += 1, MCE::Util::_sysread($_Q->{_qr_sock}, my($_next), 1);
243              
244 0         0 goto \&dequeue;
245             }
246              
247             # dequeue_nb ( )
248             # dequeue_nb ( count )
249              
250             sub dequeue_nb {
251 0     0 1 0 my ($_Q, $_cnt) = @_;
252              
253 0 0 0     0 if (defined $_cnt && $_cnt ne '1') {
254 0 0 0     0 _croak('Queue: (dequeue_nb count argument) is not valid')
      0        
255             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
256              
257 0         0 my $_pending = @{ $_Q->{_datq} };
  0         0  
258              
259 0 0 0     0 if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
  0         0  
260 0         0 for my $_h (@{ $_Q->{_heap} }) {
  0         0  
261 0         0 $_pending += @{ $_Q->{_datp}->{$_h} };
  0         0  
262             }
263             }
264              
265 0 0       0 $_cnt = $_pending if $_pending < $_cnt;
266              
267 0         0 return map { $_Q->_dequeue() } 1 .. $_cnt;
  0         0  
268             }
269              
270 0         0 my $_buf = $_Q->_dequeue();
271              
272 0 0       0 return defined($_buf) ? $_buf : ();
273             }
274              
275             # dequeue_timed ( timeout )
276             # dequeue_timed ( timeout, count )
277              
278             sub dequeue_timed {
279 0     0 1 0 my ($_Q, $_timeout, $_cnt) = @_;
280              
281 0 0       0 if (defined $_timeout) {
282 0 0       0 _croak('Queue: (dequeue_timed timeout argument) is not valid')
283             if (!looks_like_number($_timeout));
284             }
285              
286 0 0 0     0 if (defined $_cnt && $_cnt ne '1') {
287 0 0 0     0 _croak('Queue: (dequeue_timed count argument) is not valid')
      0        
288             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
289              
290 0         0 my $_pending = @{ $_Q->{_datq} };
  0         0  
291              
292 0 0 0     0 if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
  0         0  
293 0         0 for my $_h (@{ $_Q->{_heap} }) {
  0         0  
294 0         0 $_pending += @{ $_Q->{_datp}->{$_h} };
  0         0  
295             }
296             }
297              
298 0 0       0 $_cnt = $_pending if $_pending < $_cnt;
299              
300 0         0 return map { $_Q->_dequeue() } 1 .. $_cnt;
  0         0  
301             }
302              
303 0         0 my $_buf = $_Q->_dequeue();
304              
305 0 0       0 return defined($_buf) ? $_buf : ();
306             }
307              
308             # pending ( )
309              
310             sub pending {
311 0     0 1 0 my ($_Q) = @_;
312 0         0 my $_pending = @{ $_Q->{_datq} };
  0         0  
313              
314 0 0       0 if (scalar @{ $_Q->{_heap} }) {
  0         0  
315 0         0 for my $_h (@{ $_Q->{_heap} }) {
  0         0  
316 0         0 $_pending += @{ $_Q->{_datp}->{$_h} };
  0         0  
317             }
318             }
319              
320             return (exists $_Q->{_ended})
321 0 0       0 ? $_pending ? $_pending : undef
    0          
322             : $_pending;
323             }
324              
325             # insert ( index, item [, item, ... ] )
326              
327             sub insert {
328 0     0 1 0 my ($_Q, $_i) = (shift, shift);
329              
330 0 0 0     0 _croak('Queue: (insert index) is not an integer')
331             if (!looks_like_number($_i) || int($_i) != $_i);
332              
333 0 0       0 return unless (scalar @_);
334              
335 0 0       0 if (exists $_Q->{_ended}) {
336 0         0 warn "Queue: (insert) called on queue that has been 'end'ed\n";
337 0         0 return;
338             }
339              
340 0 0       0 if ($_Q->{_dsem}) {
341 0         0 for my $_i (1 .. scalar @_) {
342 0         0 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
343 0 0       0 last unless $_Q->{_dsem};
344             }
345             }
346              
347 0 0       0 if (abs($_i) > scalar @{ $_Q->{_datq} }) {
  0         0  
348 0 0       0 if ($_i >= 0) {
349 0 0       0 if ($_Q->{_type}) {
350 0         0 push @{ $_Q->{_datq} }, @_;
  0         0  
351             } else {
352 0         0 unshift @{ $_Q->{_datq} }, @_;
  0         0  
353             }
354             }
355             else {
356 0 0       0 if ($_Q->{_type}) {
357 0         0 unshift @{ $_Q->{_datq} }, @_;
  0         0  
358             } else {
359 0         0 push @{ $_Q->{_datq} }, @_;
  0         0  
360             }
361             }
362             }
363             else {
364 0 0       0 if (!$_Q->{_type}) {
365             $_i = ($_i >= 0)
366 0 0       0 ? scalar(@{ $_Q->{_datq} }) - $_i
  0         0  
367             : abs($_i);
368             }
369 0         0 splice @{ $_Q->{_datq} }, $_i, 0, @_;
  0         0  
370             }
371              
372 0         0 return;
373             }
374              
375             # insertp ( priority, index, item [, item, ... ] )
376              
377             sub insertp {
378 0     0 1 0 my ($_Q, $_p, $_i) = (shift, shift, shift);
379              
380 0 0 0     0 _croak('Queue: (insertp priority) is not an integer')
381             if (!looks_like_number($_p) || int($_p) != $_p);
382 0 0 0     0 _croak('Queue: (insertp index) is not an integer')
383             if (!looks_like_number($_i) || int($_i) != $_i);
384              
385 0 0       0 return unless (scalar @_);
386              
387 0 0       0 if (exists $_Q->{_ended}) {
388 0         0 warn "Queue: (insertp) called on queue that has been 'end'ed\n";
389 0         0 return;
390             }
391              
392 0 0       0 if ($_Q->{_dsem}) {
393 0         0 for my $_i (1 .. scalar @_) {
394 0         0 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
395 0 0       0 last unless $_Q->{_dsem};
396             }
397             }
398              
399 0 0 0     0 if (exists $_Q->{_datp}->{$_p} && scalar @{ $_Q->{_datp}->{$_p} }) {
  0         0  
400              
401 0 0       0 if (abs($_i) > scalar @{ $_Q->{_datp}->{$_p} }) {
  0         0  
402 0 0       0 if ($_i >= 0) {
403 0 0       0 if ($_Q->{_type}) {
404 0         0 push @{ $_Q->{_datp}->{$_p} }, @_;
  0         0  
405             } else {
406 0         0 unshift @{ $_Q->{_datp}->{$_p} }, @_;
  0         0  
407             }
408             }
409             else {
410 0 0       0 if ($_Q->{_type}) {
411 0         0 unshift @{ $_Q->{_datp}->{$_p} }, @_;
  0         0  
412             } else {
413 0         0 push @{ $_Q->{_datp}->{$_p} }, @_;
  0         0  
414             }
415             }
416             }
417             else {
418 0 0       0 if (!$_Q->{_type}) {
419             $_i = ($_i >=0)
420 0 0       0 ? scalar(@{ $_Q->{_datp}->{$_p} }) - $_i
  0         0  
421             : abs($_i);
422             }
423 0         0 splice @{ $_Q->{_datp}->{$_p} }, $_i, 0, @_;
  0         0  
424             }
425             }
426             else {
427 0         0 $_Q->_enqueuep($_p, @_);
428             }
429              
430 0         0 return;
431             }
432              
433             # peek ( index )
434             # peek ( )
435              
436             sub peek {
437 0     0 1 0 my ($_Q, $_i) = @_;
438              
439 0 0       0 if ($_i) {
440 0 0 0     0 _croak('Queue: (peek index) is not an integer')
441             if (!looks_like_number($_i) || int($_i) != $_i);
442             }
443 0         0 else { $_i = 0 }
444              
445 0 0       0 return undef if (abs($_i) > scalar @{ $_Q->{_datq} });
  0         0  
446              
447 0 0       0 if (!$_Q->{_type}) {
448             $_i = ($_i >= 0)
449 0 0       0 ? scalar(@{ $_Q->{_datq} }) - ($_i + 1)
  0         0  
450             : abs($_i + 1);
451             }
452              
453 0         0 return $_Q->{_datq}->[$_i];
454             }
455              
456             # peekp ( priority, index )
457             # peekp ( priority )
458              
459             sub peekp {
460 0     0 1 0 my ($_Q, $_p, $_i) = @_;
461              
462 0 0       0 if ($_i) {
463 0 0 0     0 _croak('Queue: (peekp index) is not an integer')
464             if (!looks_like_number($_i) || int($_i) != $_i);
465             }
466 0         0 else { $_i = 0 }
467              
468 0 0 0     0 _croak('Queue: (peekp priority) is not an integer')
469             if (!looks_like_number($_p) || int($_p) != $_p);
470              
471 0 0       0 return undef unless (exists $_Q->{_datp}->{$_p});
472 0 0       0 return undef if (abs($_i) > scalar @{ $_Q->{_datp}->{$_p} });
  0         0  
473              
474 0 0       0 if (!$_Q->{_type}) {
475             $_i = ($_i >= 0)
476 0 0       0 ? scalar(@{ $_Q->{_datp}->{$_p} }) - ($_i + 1)
  0         0  
477             : abs($_i + 1);
478             }
479              
480 0         0 return $_Q->{_datp}->{$_p}->[$_i];
481             }
482              
483             # peekh ( index )
484             # peekh ( )
485              
486             sub peekh {
487 0     0 1 0 my ($_Q, $_i) = @_;
488              
489 0 0       0 if ($_i) {
490 0 0 0     0 _croak('Queue: (peekh index) is not an integer')
491             if (!looks_like_number($_i) || int($_i) != $_i);
492             }
493 0         0 else { $_i = 0 }
494              
495 0 0       0 return undef if (abs($_i) > scalar @{ $_Q->{_heap} });
  0         0  
496 0         0 return $_Q->{_heap}->[$_i];
497             }
498              
499             # heap ( )
500              
501             sub heap {
502 0     0 1 0 return @{ shift->{_heap} };
  0         0  
503             }
504              
505             ###############################################################################
506             ## ----------------------------------------------------------------------------
507             ## Private methods.
508             ##
509             ###############################################################################
510              
511             # Add items to the tail of the queue with priority level.
512              
513             sub _enqueuep {
514 0     0   0 my ($_Q, $_p) = (shift, shift);
515              
516             # Enlist priority into the heap.
517 0 0 0     0 if (!exists $_Q->{_datp}->{$_p} || @{ $_Q->{_datp}->{$_p} } == 0) {
  0         0  
518              
519 0 0       0 unless (scalar @{ $_Q->{_heap} }) {
  0 0       0  
520 0         0 push @{ $_Q->{_heap} }, $_p;
  0         0  
521             }
522 0         0 elsif ($_Q->{_porder}) {
523 0         0 $_Q->_heap_insert_high($_p);
524             }
525             else {
526 0         0 $_Q->_heap_insert_low($_p);
527             }
528             }
529              
530             # Append item(s) into the queue.
531 0         0 push @{ $_Q->{_datp}->{$_p} }, @_;
  0         0  
532              
533 0         0 return;
534             }
535              
536             # Return one item from the queue.
537              
538             sub _dequeue {
539 0     0   0 my ($_Q) = @_;
540              
541             # Return item from the non-priority queue.
542 0 0       0 unless (scalar @{ $_Q->{_heap} }) {
  0         0  
543             return ($_Q->{_type})
544 0 0       0 ? shift @{ $_Q->{_datq} } : pop @{ $_Q->{_datq} };
  0         0  
  0         0  
545             }
546              
547 0         0 my $_p = $_Q->{_heap}->[0];
548              
549             # Delist priority from the heap when 1 item remains.
550 0 0       0 shift @{ $_Q->{_heap} } if (@{ $_Q->{_datp}->{$_p} } == 1);
  0         0  
  0         0  
551              
552             # Return item from the priority queue.
553             return ($_Q->{_type})
554 0 0       0 ? shift @{ $_Q->{_datp}->{$_p} } : pop @{ $_Q->{_datp}->{$_p} };
  0         0  
  0         0  
555             }
556              
557             # Helper method for getting the reference to the underlying array.
558             # Use with test scripts for comparing data only (not a public API).
559              
560             sub _get_aref {
561 0     0   0 my ($_Q, $_p) = @_;
562              
563 0 0       0 if (defined $_p) {
564 0 0 0     0 _croak('Queue: (get_aref priority) is not an integer')
565             if (!looks_like_number($_p) || int($_p) != $_p);
566              
567 0 0       0 return undef unless (exists $_Q->{_datp}->{$_p});
568 0         0 return $_Q->{_datp}->{$_p};
569             }
570              
571 0         0 return $_Q->{_datq};
572             }
573              
574             # Insert priority into the heap. A lower priority level comes first.
575              
576             sub _heap_insert_low {
577 0     0   0 my ($_Q, $_p) = @_;
578              
579             # Insert priority at the head of the heap.
580 0 0       0 if ($_p < $_Q->{_heap}->[0]) {
    0          
581 0         0 unshift @{ $_Q->{_heap} }, $_p;
  0         0  
582             }
583              
584             # Insert priority at the end of the heap.
585             elsif ($_p > $_Q->{_heap}->[-1]) {
586 0         0 push @{ $_Q->{_heap} }, $_p;
  0         0  
587             }
588              
589             # Insert priority through binary search.
590             else {
591 0         0 my $_lower = 0; my $_upper = @{ $_Q->{_heap} };
  0         0  
  0         0  
592              
593 0         0 while ($_lower < $_upper) {
594 0         0 my $_midpoint = $_lower + (($_upper - $_lower) >> 1);
595 0 0       0 if ($_p > $_Q->{_heap}->[$_midpoint]) {
596 0         0 $_lower = $_midpoint + 1;
597             } else {
598 0         0 $_upper = $_midpoint;
599             }
600             }
601              
602             # Insert priority into the heap.
603 0         0 splice @{ $_Q->{_heap} }, $_lower, 0, $_p;
  0         0  
604             }
605              
606 0         0 return;
607             }
608              
609             # Insert priority into the heap. A higher priority level comes first.
610              
611             sub _heap_insert_high {
612 0     0   0 my ($_Q, $_p) = @_;
613              
614             # Insert priority at the head of the heap.
615 0 0       0 if ($_p > $_Q->{_heap}->[0]) {
    0          
616 0         0 unshift @{ $_Q->{_heap} }, $_p;
  0         0  
617             }
618              
619             # Insert priority at the end of the heap.
620             elsif ($_p < $_Q->{_heap}->[-1]) {
621 0         0 push @{ $_Q->{_heap} }, $_p;
  0         0  
622             }
623              
624             # Insert priority through binary search.
625             else {
626 0         0 my $_lower = 0; my $_upper = @{ $_Q->{_heap} };
  0         0  
  0         0  
627              
628 0         0 while ($_lower < $_upper) {
629 0         0 my $_midpoint = $_lower + (($_upper - $_lower) >> 1);
630 0 0       0 if ($_p < $_Q->{_heap}->[$_midpoint]) {
631 0         0 $_lower = $_midpoint + 1;
632             } else {
633 0         0 $_upper = $_midpoint;
634             }
635             }
636              
637             # Insert priority into the heap.
638 0         0 splice @{ $_Q->{_heap} }, $_lower, 0, $_p;
  0         0  
639             }
640              
641 0         0 return;
642             }
643              
644             ###############################################################################
645             ## ----------------------------------------------------------------------------
646             ## Server functions.
647             ##
648             ###############################################################################
649              
650             {
651 8     8   24303 use bytes;
  8         16  
  8         41  
652              
653             use constant {
654 8         7541 SHR_O_QUA => 'O~QUA', # Queue await
655             SHR_O_QUD => 'O~QUD', # Queue dequeue
656             SHR_O_QUN => 'O~QUN', # Queue dequeue non-blocking
657             SHR_O_QUT => 'O~QUT', # Queue dequeue timed
658 8     8   329 };
  8         8  
659              
660             my (
661             $_DAU_R_SOCK_REF, $_DAU_R_SOCK, $_obj, $_freeze, $_thaw,
662             $_cnt, $_id, $_has_data, $_pending, $_t
663             );
664              
665             my %_output_function = (
666              
667             SHR_O_QUA.$LF => sub { # Queue await
668             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
669              
670             chomp($_id = <$_DAU_R_SOCK>),
671             chomp($_t = <$_DAU_R_SOCK>);
672              
673             my $_Q = $_obj->{ $_id } || do {
674             print {$_DAU_R_SOCK} $LF;
675             return;
676             };
677             $_Q->{_tsem} = $_t;
678              
679             if ($_Q->pending() <= $_t) {
680             syswrite($_Q->{_aw_sock}, $LF);
681             } else {
682             $_Q->{_asem} += 1;
683             }
684              
685             print {$_DAU_R_SOCK} $LF;
686              
687             return;
688             },
689              
690             SHR_O_QUD.$LF => sub { # Queue dequeue
691             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
692              
693             chomp($_id = <$_DAU_R_SOCK>),
694             chomp($_cnt = <$_DAU_R_SOCK>);
695              
696             $_cnt = 0 if ($_cnt == 1);
697              
698             my $_Q = $_obj->{ $_id } || do {
699             print {$_DAU_R_SOCK} '-1'.$LF;
700             return;
701             };
702              
703             my (@_items, $_buf);
704              
705             if ($_cnt) {
706             $_pending = @{ $_Q->{_datq} };
707              
708             if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
709             for my $_h (@{ $_Q->{_heap} }) {
710             $_pending += @{ $_Q->{_datp}->{$_h} };
711             }
712             }
713             $_cnt = $_pending if $_pending < $_cnt;
714              
715             for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() }
716             }
717             else {
718             $_has_data = ( @{ $_Q->{_datq} } || @{ $_Q->{_heap} } ) ? 1 : 0;
719             $_buf = $_Q->_dequeue();
720             }
721              
722             if ($_cnt) {
723             $_buf = $_freeze->(\@_items);
724             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
725             }
726             elsif ($_has_data) {
727             $_buf = $_freeze->([ $_buf ]);
728             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
729             }
730             elsif (exists $_Q->{_ended}) {
731             print {$_DAU_R_SOCK} '-2'.$LF;
732             }
733             else {
734             print {$_DAU_R_SOCK} '-1'.$LF;
735             $_Q->{_dsem} += 1;
736             }
737              
738             if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) {
739             for my $_i (1 .. $_Q->{_asem}) {
740             syswrite($_Q->{_aw_sock}, $LF);
741             }
742             $_Q->{_asem} = 0;
743             }
744              
745             return;
746             },
747              
748             SHR_O_QUN.$LF => sub { # Queue dequeue non-blocking
749             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
750              
751             chomp($_id = <$_DAU_R_SOCK>),
752             chomp($_cnt = <$_DAU_R_SOCK>);
753              
754             my $_Q = $_obj->{ $_id } || do {
755             print {$_DAU_R_SOCK} '-1'.$LF;
756             return;
757             };
758              
759             if ($_cnt == 1) {
760             my $_buf = $_Q->_dequeue();
761              
762             if (defined $_buf) {
763             $_buf = $_freeze->([ $_buf ]);
764             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
765             }
766             else {
767             print {$_DAU_R_SOCK} '-1'.$LF;
768             }
769             }
770             else {
771             my @_items;
772             my $_pending = @{ $_Q->{_datq} };
773              
774             if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
775             for my $_h (@{ $_Q->{_heap} }) {
776             $_pending += @{ $_Q->{_datp}->{$_h} };
777             }
778             }
779             $_cnt = $_pending if $_pending < $_cnt;
780              
781             for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() }
782              
783             if ($_cnt) {
784             my $_buf = $_freeze->(\@_items);
785             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
786             }
787             else {
788             print {$_DAU_R_SOCK} '-1'.$LF;
789             }
790             }
791              
792             if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) {
793             for my $_i (1 .. $_Q->{_asem}) {
794             syswrite($_Q->{_aw_sock}, $LF);
795             }
796             $_Q->{_asem} = 0;
797             }
798              
799             return;
800             },
801              
802             SHR_O_QUT.$LF => sub { # Queue dequeue timed
803             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
804              
805             chomp($_id = <$_DAU_R_SOCK>);
806              
807             my $_Q = $_obj->{ $_id } || do {
808             print {$_DAU_R_SOCK} $LF;
809             return;
810             };
811              
812             $_Q->{_dsem} -= 1 if $_Q->{_dsem};
813              
814             print {$_DAU_R_SOCK} $LF;
815              
816             return;
817             },
818              
819             );
820              
821             sub _init_mgr {
822 0     0   0 my $_function;
823 0         0 ( $_DAU_R_SOCK_REF, $_obj, $_function, $_freeze, $_thaw ) = @_;
824              
825 0         0 for my $key ( keys %_output_function ) {
826 0 0       0 last if exists($_function->{$key});
827 0         0 $_function->{$key} = $_output_function{$key};
828             }
829              
830 0         0 return;
831             }
832             }
833              
834             ###############################################################################
835             ## ----------------------------------------------------------------------------
836             ## Object package.
837             ##
838             ###############################################################################
839              
840             ## Items below are folded into MCE::Shared::Object.
841              
842             package # hide from rpm
843             MCE::Shared::Object;
844              
845 8     8   51 use strict;
  8         16  
  8         139  
846 8     8   27 use warnings;
  8         51  
  8         381  
847              
848 8     8   45 no warnings qw( threads recursion uninitialized numeric once );
  8         16  
  8         279  
849              
850 8     8   37 use bytes;
  8         21  
  8         24  
851              
852 8     8   179 no overloading;
  8         13  
  8         9129  
853              
854             my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
855              
856             my ($_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, $_obj,
857             $_freeze, $_thaw);
858              
859             sub _init_queue {
860 22     22   166 ($_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, $_obj,
861             $_freeze, $_thaw) = @_;
862              
863 22         74 return;
864             }
865              
866             sub _req_queue {
867 24 50   24   74 local $\ = undef if (defined $\);
868 24 50       60 local $/ = $LF if ($/ ne $LF);
869 24         48 local $MCE::Signal::SIG;
870              
871             {
872 24         31 local $MCE::Signal::IPC = 1;
  24         33  
873 24 50       102 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
874              
875 24         775 print({$_DAT_W_SOCK} $_[0].$LF . $_chn.$LF),
876 24         293 print({$_DAU_W_SOCK} $_[1]);
  24         229  
877 24         5435 chomp($_[2] = <$_DAU_W_SOCK>);
878              
879 24 50       195 read($_DAU_W_SOCK, $_[3], $_[2]) if ($_[2] > 0);
880              
881 24 50       217 $_dat_un->() if !$_is_MSWin32;
882             }
883              
884 24 50       95 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
885             }
886              
887             sub await {
888 0     0   0 my $_id = shift()->[0];
889 0 0       0 return unless ( my $_Q = $_obj->{ $_id } );
890 0 0       0 return unless ( exists $_Q->{_qr_sock} );
891              
892 0   0     0 my $_t = shift || 0;
893              
894             _croak('Queue: (await) is not enabled for this queue')
895 0 0       0 unless (exists $_Q->{_ar_sock});
896 0 0 0     0 _croak('Queue: (await threshold) is not an integer')
897             if (!looks_like_number($_t) || int($_t) != $_t);
898              
899 0 0       0 $_t = 0 if ($_t < 0);
900 0         0 _req1('O~QUA', $_id.$LF . $_t.$LF);
901              
902 0 0       0 MCE::Util::_sock_ready($_Q->{_ar_sock}) if $_is_MSWin32;
903 0         0 MCE::Util::_sysread($_Q->{_ar_sock}, my($_b), 1);
904              
905 0         0 return;
906             }
907              
908             sub dequeue {
909 16     16   23480 my ($self, $_cnt) = @_;
910 16         56 my $_id = $self->[0];
911              
912 16 50       98 return unless ( my $_Q = $_obj->{ $_id } );
913 16 50       76 return unless ( exists $_Q->{_qr_sock} );
914              
915 16 100 100     138 if (defined $_cnt && $_cnt ne '1') {
916 6 50 33     89 _croak('Queue: (dequeue count argument) is not valid')
      33        
917             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
918             } else {
919 10         24 $_cnt = 1;
920             }
921              
922 16         85 _req_queue('O~QUD', $_id.$LF . $_cnt.$LF, my($_len), my($_buf));
923              
924 16 100 66     312 return $_thaw->($_buf)[0] if ($_len > 0 && $_cnt == 1);
925 6 50       21 return @{ $_thaw->($_buf) } if ($_len > 0);
  6         130  
926 0 0       0 return if ($_len == -2);
927              
928 0 0       0 MCE::Util::_sock_ready($_Q->{_qr_sock}) if $_is_MSWin32;
929 0         0 MCE::Util::_sysread($_Q->{_qr_sock}, my($_next), 1);
930              
931 0         0 goto \&dequeue;
932             }
933              
934             sub dequeue_nb {
935 4     4   3207 my ($self, $_cnt) = @_;
936 4         19 my $_id = $self->[0];
937              
938 4 50       21 return unless ( my $_Q = $_obj->{ $_id } );
939 4 50       17 return unless ( exists $_Q->{_qr_sock} );
940              
941 4 50 33     87 if (defined $_cnt && $_cnt ne '1') {
942 0 0 0     0 _croak('Queue: (dequeue_nb count argument) is not valid')
      0        
943             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
944             } else {
945 4         14 $_cnt = 1;
946             }
947              
948 4         24 _req_queue('O~QUN', $_id.$LF . $_cnt.$LF, my($_len), my($_buf));
949              
950 4 50       21 return if ($_len < 0);
951              
952 4 50       86 ($_cnt == 1) ? $_thaw->($_buf)[0] : @{ $_thaw->($_buf) };
  0         0  
953             }
954              
955             sub dequeue_timed {
956 4     4   37 my ($self, $_timeout, $_cnt) = @_;
957 4         19 my $_id = $self->[0];
958 4         9 my $_start;
959              
960 4 50       19 return unless ( my $_Q = $_obj->{ $_id } );
961 4 50       20 return unless ( exists $_Q->{_qr_sock} );
962              
963 4 50       27 if (defined $_timeout) {
964 0 0 0     0 _croak('Queue: (dequeue_timed timeout argument) is not valid')
965             if (!looks_like_number($_timeout) || $_timeout < 0);
966 0         0 $_start = MCE::Util::_time();
967             }
968              
969 4 50 33     23 if (defined $_cnt && $_cnt ne '1') {
970 0 0 0     0 _croak('Queue: (dequeue_timed count argument) is not valid')
      0        
971             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
972             } else {
973 4         14 $_cnt = 1;
974             }
975              
976 4 50 33     18 if (! $_timeout || $_timeout < 0.0) {
977 4         26 _req_queue('O~QUN', $_id.$LF . $_cnt.$LF, my($_len), my($_buf));
978 4 50       18 return if ($_len < 0);
979 4 50       82 return ($_cnt == 1) ? $_thaw->($_buf)[0] : @{ $_thaw->($_buf) };
  0         0  
980             }
981              
982 0         0 _req_queue('O~QUD', $_id.$LF . $_cnt.$LF, my($_len), my($_buf));
983              
984 0 0 0     0 return $_thaw->($_buf)[0] if ($_len > 0 && $_cnt == 1);
985 0 0       0 return @{ $_thaw->($_buf) } if ($_len > 0);
  0         0  
986 0 0       0 return if ($_len == -2);
987              
988 0         0 $_Q->{_qr_mutex}->lock();
989 0         0 $_timeout = $_timeout - (MCE::Util::_time() - $_start) - 0.045;
990 0 0       0 $_timeout = 0.0 if $_timeout < 0.045;
991              
992 0         0 CORE::vec(my $_r, CORE::fileno($_Q->{_qr_sock}), 1) = 1;
993 0 0       0 if (CORE::select($_r, undef, undef, $_timeout) > 0) {
994 0         0 MCE::Util::_sysread($_Q->{_qr_sock}, my($_next), 1);
995 0         0 $_Q->{_qr_mutex}->unlock();
996 0         0 _req_queue('O~QUN', $_id.$LF . $_cnt.$LF, my($_len), my($_buf));
997 0 0       0 return if ($_len < 0);
998 0 0       0 return ($_cnt == 1) ? $_thaw->($_buf)[0] : @{ $_thaw->($_buf) };
  0         0  
999             }
1000              
1001 0         0 $_Q->{_qr_mutex}->unlock();
1002 0         0 _req1('O~QUT', $_id.$LF);
1003 0         0 MCE::Util::_sleep(0.045); # yield
1004              
1005 0         0 return ();
1006             }
1007              
1008             sub pending {
1009 4 50 33 4   10187 (@_ == 1 && !wantarray) ? _size('pending', @_) : _auto('pending', @_);
1010             }
1011              
1012             1;
1013              
1014             __END__