File Coverage

blib/lib/MCE/Core/Input/Sequence.pm
Criterion Covered Total %
statement 62 97 63.9
branch 20 60 33.3
condition 7 17 41.1
subroutine 6 8 75.0
pod n/a
total 95 182 52.2


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Sequence of numbers (for task_id == 0).
4             ##
5             ## This package provides a sequence of numbers used internally by the worker
6             ## process. Distribution follows a bank-queuing model.
7             ##
8             ## There is no public API.
9             ##
10             ###############################################################################
11              
12             package MCE::Core::Input::Sequence;
13              
14 13     13   1111 use strict;
  13         31  
  13         434  
15 13     13   90 use warnings;
  13         55  
  13         833  
16              
17             our $VERSION = '1.889';
18              
19             ## Items below are folded into MCE.
20              
21             package # hide from rpm
22             MCE;
23              
24 13     13   98 no warnings qw( threads recursion uninitialized );
  13         29  
  13         15260  
25              
26             my $_que_read_size = $MCE::_que_read_size;
27             my $_que_template = $MCE::_que_template;
28              
29             ###############################################################################
30             ## ----------------------------------------------------------------------------
31             ## Worker process -- Sequence Queue (distribution via bank queuing model).
32             ##
33             ###############################################################################
34              
35             sub _worker_sequence_queue {
36              
37 14     14   38 my ($self) = @_;
38              
39 14         35 @_ = ();
40              
41             _croak('MCE::_worker_sequence_queue: (user_func) is not specified')
42 14 50       168 unless (defined $self->{user_func});
43              
44 14 50       105 my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
45 14         33 my $_QUE_R_SOCK = $self->{_que_r_sock};
46 14         32 my $_QUE_W_SOCK = $self->{_que_w_sock};
47 14   50     80 my $_bounds_only = $self->{bounds_only} || 0;
48 14         41 my $_chunk_size = $self->{chunk_size};
49 14         368 my $_wuf = $self->{_wuf};
50              
51 14         47 my ($_next, $_chunk_id, $_seq_n, $_begin, $_end, $_step, $_fmt);
52 14         0 my ($_DAT_LOCK, $_dat_ex, $_dat_un, $_pid, $_abort, $_offset);
53              
54 14 50       89 $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$;
55              
56             # inlined for performance
57 14         70 $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 2 + 10 )};
58             $_dat_ex = sub {
59             CORE::lock($_DAT_LOCK->{_t_lock}), MCE::Util::_sock_ready($_DAT_LOCK->{_r_sock})
60 65 50   65   134 if $_is_MSWin32;
61             MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
62 65 50       291 unless $_DAT_LOCK->{ $_pid };
63 14         413 };
64             $_dat_un = sub {
65             syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
66 65 50   65   615 if $_DAT_LOCK->{ $_pid };
67 14         47 };
68              
69 14 50       62 if (ref $self->{sequence} eq 'ARRAY') {
70 14         26 ($_begin, $_end, $_step, $_fmt) = @{ $self->{sequence} };
  14         46  
71             }
72             else {
73 0         0 $_begin = $self->{sequence}->{begin};
74 0         0 $_end = $self->{sequence}->{end};
75 0         0 $_step = $self->{sequence}->{step};
76 0         0 $_fmt = $self->{sequence}->{format};
77             }
78              
79 14         34 $_abort = $self->{_abort_msg};
80 14         52 $_chunk_id = $_offset = 0;
81              
82 14 50       46 $_fmt =~ s/%// if (defined $_fmt);
83              
84             ## -------------------------------------------------------------------------
85              
86 14     0   44 $self->{_next_jmp} = sub { goto _WORKER_SEQUENCE__NEXT; };
  0         0  
87 14     0   41 $self->{_last_jmp} = sub { goto _WORKER_SEQUENCE__LAST; };
  0         0  
88              
89 14         29 local $_;
90              
91             _WORKER_SEQUENCE__NEXT:
92              
93 14         38 while (1) {
94              
95             ## Obtain the next chunk_id and sequence number.
96 65         167 $_dat_ex->();
97 65 50       189 MCE::Util::_sock_ready($_QUE_R_SOCK) if $_is_MSWin32;
98 65         182 MCE::Util::_sysread($_QUE_R_SOCK, $_next, $_que_read_size);
99              
100 65         265 ($_chunk_id, $_offset) = unpack($_que_template, $_next);
101              
102 65 100       184 if ($_offset >= $_abort) {
103 14         167 syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_offset));
104 14         84 $_dat_un->();
105 14         146 return;
106             }
107              
108             syswrite(
109 51         747 $_QUE_W_SOCK, pack($_que_template, $_chunk_id + 1, $_offset + 1)
110             );
111              
112 51         199 $_dat_un->();
113 51         115 $_chunk_id++;
114              
115             ## Call user function.
116 51 100 66     195 if ($_chunk_size == 1 || $_begin == $_end) {
117 36         68 $_ = $_offset * $_step + $_begin;
118 36 50       90 $_ = _sprintf("%$_fmt", $_) if (defined $_fmt);
119 36 50 33     155 if ($_chunk_size > 1 || $_bounds_only) {
120 0 0       0 $_ = ($_bounds_only) ? [ $_, $_ ] : [ $_ ];
121             }
122 36         119 $_wuf->($self, $_, $_chunk_id);
123             }
124             else {
125 15         39 my $_n_begin = ($_offset * $_chunk_size) * $_step + $_begin;
126 15         54 my @_n = (); $_seq_n = $_n_begin;
  15         27  
127              
128             ## -------------------------------------------------------------------
129              
130 15 50       32 if ($_bounds_only) {
131 0         0 my ($_tmp_b, $_tmp_e) = ($_seq_n);
132              
133 0 0       0 if ($_begin <= $_end) {
134 0 0       0 if ($_step * ($_chunk_size - 1) + $_n_begin <= $_end) {
    0          
135 0         0 $_tmp_e = $_step * ($_chunk_size - 1) + $_n_begin;
136             }
137             elsif ($_step == 1) {
138 0         0 $_tmp_e = $_end;
139             }
140             else {
141 0         0 for my $_i (1 .. $_chunk_size) {
142 0 0       0 last if ($_seq_n > $_end);
143 0         0 $_tmp_e = $_seq_n;
144 0         0 $_seq_n = $_step * $_i + $_n_begin;
145             }
146             }
147             }
148             else {
149 0 0       0 if ($_step * ($_chunk_size - 1) + $_n_begin >= $_end) {
    0          
150 0         0 $_tmp_e = $_step * ($_chunk_size - 1) + $_n_begin;
151             }
152             elsif ($_step == -1) {
153 0         0 $_tmp_e = $_end;
154             }
155             else {
156 0         0 for my $_i (1 .. $_chunk_size) {
157 0 0       0 last if ($_seq_n < $_end);
158 0         0 $_tmp_e = $_seq_n;
159 0         0 $_seq_n = $_step * $_i + $_n_begin;
160             }
161             }
162             }
163              
164 0 0       0 @_n = (defined $_fmt)
165             ? ( _sprintf("%$_fmt",$_tmp_b), _sprintf("%$_fmt",$_tmp_e) )
166             : ( $_tmp_b, $_tmp_e );
167             }
168              
169             ## -------------------------------------------------------------------
170              
171             else {
172 15 50       33 if ($_begin <= $_end) {
173 15 50 33     115 if (!defined $_fmt && $_step == 1 && abs($_end) < ~1 && abs($_begin) < ~1) {
      33        
      33        
174 15 100       59 $_ = ($_seq_n + $_chunk_size <= $_end)
175             ? [ $_seq_n .. $_seq_n + $_chunk_size - 1 ]
176             : [ $_seq_n .. $_end ];
177              
178 15         62 $_wuf->($self, $_, $_chunk_id);
179 15         35 next;
180             }
181             else {
182 0           for my $_i (1 .. $_chunk_size) {
183 0 0         last if ($_seq_n > $_end);
184              
185 0 0         push @_n, (defined $_fmt)
186             ? _sprintf("%$_fmt", $_seq_n) : $_seq_n;
187              
188 0           $_seq_n = $_step * $_i + $_n_begin;
189             }
190             }
191             }
192             else {
193 0           for my $_i (1 .. $_chunk_size) {
194 0 0         last if ($_seq_n < $_end);
195              
196 0 0         push @_n, (defined $_fmt)
197             ? _sprintf("%$_fmt", $_seq_n) : $_seq_n;
198              
199 0           $_seq_n = $_step * $_i + $_n_begin;
200             }
201             }
202             }
203              
204             ## -------------------------------------------------------------------
205              
206 0           $_ = \@_n;
207 0           $_wuf->($self, \@_n, $_chunk_id);
208             }
209             }
210              
211             _WORKER_SEQUENCE__LAST:
212              
213 0           return;
214             }
215              
216             1;
217              
218             __END__