File Coverage

blib/lib/MCE/Core/Input/Handle.pm
Criterion Covered Total %
statement 78 134 58.2
branch 29 70 41.4
condition 5 20 25.0
subroutine 6 9 66.6
pod n/a
total 118 233 50.6


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## File path and Scalar reference input reader.
4             ##
5             ## This package provides the read handle method used internally by the worker
6             ## process. Distribution follows a bank-queuing model.
7             ##
8             ## There is no public API.
9             ##
10             ###############################################################################
11              
12             package MCE::Core::Input::Handle;
13              
14 13     13   1061 use strict;
  13         30  
  13         688  
15 13     13   85 use warnings;
  13         25  
  13         1758  
16              
17             our $VERSION = '1.889';
18              
19             ## Items below are folded into MCE.
20              
21             package # hide from rpm
22             MCE;
23              
24 13     13   112 no warnings qw( threads recursion uninitialized );
  13         30  
  13         20543  
25              
26             my $_que_read_size = $MCE::_que_read_size;
27             my $_que_template = $MCE::_que_template;
28              
29             ###############################################################################
30             ## ----------------------------------------------------------------------------
31             ## Worker process -- Read handle.
32             ##
33             ###############################################################################
34              
35             sub _systell {
36             # To minimize memory consumption, SEEK_CUR equals 1 on most platforms.
37             # e.g. use Fcntl qw(SEEK_CUR);
38              
39 0     0   0 sysseek($_[0], 0, 1);
40             }
41              
42             sub _worker_read_handle {
43              
44 14     14   51 my ($self, $_proc_type, $_input_data) = @_;
45              
46 14         31 @_ = ();
47              
48             _croak('MCE::_worker_read_handle: (user_func) is not specified')
49 14 50       182 unless (defined $self->{user_func});
50              
51 14 50       79 my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
52 14         41 my $_QUE_R_SOCK = $self->{_que_r_sock};
53 14         33 my $_QUE_W_SOCK = $self->{_que_w_sock};
54 14         30 my $_chunk_size = $self->{chunk_size};
55 14         30 my $_use_slurpio = $self->{use_slurpio};
56 14         51 my $_parallel_io = $self->{parallel_io};
57 14   33     101 my $_RS = $self->{RS} || $/;
58 14         33 my $_wuf = $self->{_wuf};
59              
60 14         53 my ($_data_size, $_next, $_chunk_id, $_offset_pos, $_IN_FILE, $_tmp_cs);
61 14         0 my ($_DAT_LOCK, $_dat_ex, $_dat_un, $_pid, $_chop_len, $_chop_str, $_p);
62              
63 14 50       58 $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$;
64              
65             # inlined for performance
66 14         70 $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 2 + 10 )};
67             $_dat_ex = sub {
68             CORE::lock($_DAT_LOCK->{_t_lock}), MCE::Util::_sock_ready($_DAT_LOCK->{_r_sock})
69 65 50   65   134 if $_is_MSWin32;
70             MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
71 65 50       301 unless $_DAT_LOCK->{ $_pid };
72 14         71 };
73             $_dat_un = sub {
74             syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
75 65 50   65   725 if $_DAT_LOCK->{ $_pid };
76 14         44 };
77              
78 14 50 33     71 if (length $_RS > 1 && substr($_RS, 0, 1) eq "\n") {
79 0         0 $_chop_str = substr($_RS, 1);
80 0         0 $_chop_len = length $_chop_str;
81             } else {
82 14         51 $_chop_str = '';
83 14         27 $_chop_len = 0;
84             }
85              
86             $_data_size = ($_proc_type == READ_MEMORY)
87 14 50       269 ? length ${ $_input_data } : -s $_input_data;
  0         0  
88              
89 14         45 $_chunk_id = $_offset_pos = 0;
90              
91 14 50       576 open $_IN_FILE, '<', $_input_data or die "$_input_data: $!\n";
92 14         64 binmode $_IN_FILE;
93              
94             ## -------------------------------------------------------------------------
95              
96 14     0   78 $self->{_next_jmp} = sub { goto _WORKER_READ_HANDLE__NEXT; };
  0         0  
97 14     0   47 $self->{_last_jmp} = sub { goto _WORKER_READ_HANDLE__LAST; };
  0         0  
98              
99 14         29 local $_;
100              
101             _WORKER_READ_HANDLE__NEXT:
102              
103 14         26 while (1) {
104 65 50       112 my @_recs; undef $_ if (length > MAX_GC_SIZE);
  65         179  
105              
106 65         108 $_ = '';
107              
108             ## Obtain the next chunk_id and offset position.
109 65         176 $_dat_ex->();
110 65 50       214 MCE::Util::_sock_ready($_QUE_R_SOCK) if $_is_MSWin32;
111 65         192 MCE::Util::_sysread($_QUE_R_SOCK, $_next, $_que_read_size);
112              
113 65         333 ($_chunk_id, $_offset_pos) = unpack($_que_template, $_next);
114              
115 65 100       191 if ($_offset_pos >= $_data_size) {
116 14         211 syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_offset_pos));
117 14         88 $_dat_un->();
118 14         223 close $_IN_FILE; undef $_IN_FILE;
  14         80  
119 14         146 return;
120             }
121              
122 51 50 66     233 if (++$_chunk_id > 1 && $_chop_len) {
123 0         0 $_p = $_chop_len; $_ = $_chop_str;
  0         0  
124             } else {
125 51         83 $_p = 0;
126             }
127              
128             ## Read data.
129 51 50       115 if ($_chunk_size <= MAX_RECS_SIZE) { # One or many records.
130 51 50       165 local $/ = $_RS if ($/ ne $_RS);
131 51         923 seek $_IN_FILE, $_offset_pos, 0;
132              
133 51 100       166 if ($_chunk_size == 1) {
134 36 50       96 if ($_p) {
135 0         0 $_ .= <$_IN_FILE>;
136             } else {
137 36         539 $_ = <$_IN_FILE>;
138             }
139             }
140             else {
141 15 50       34 if ($_use_slurpio) {
142 0         0 for my $i (0 .. $_chunk_size - 1) {
143 0         0 $_ .= <$_IN_FILE>;
144             }
145             }
146             else {
147 15 50       29 if ($_chop_len) {
148 0 0       0 $_recs[0] = ($_chunk_id > 1) ? $_chop_str : '';
149 0         0 $_recs[0] .= <$_IN_FILE>;
150 0         0 for my $i (1 .. $_chunk_size - 1) {
151 0         0 $_recs[$i] = $_chop_str;
152 0         0 $_recs[$i] .= <$_IN_FILE>;
153 0 0       0 if (length $_recs[$i] == $_chop_len) {
154 0         0 delete $_recs[$i];
155 0         0 last;
156             }
157             }
158             }
159             else {
160 15         48 for my $i (0 .. $_chunk_size - 1) {
161 30         346 $_recs[$i] = <$_IN_FILE>;
162 30 100       114 unless (defined $_recs[$i]) {
163 3         12 delete $_recs[$i];
164 3         7 last;
165             }
166             }
167             }
168             }
169             }
170              
171 51         1022 syswrite(
172             $_QUE_W_SOCK, pack($_que_template, $_chunk_id, tell $_IN_FILE)
173             );
174 51         180 $_dat_un->();
175             }
176             else { # Large chunk.
177 0 0       0 local $/ = $_RS if ($/ ne $_RS);
178              
179 0 0 0     0 if ($_parallel_io && $_RS eq $LF) {
180 0         0 syswrite(
181             $_QUE_W_SOCK,
182             pack($_que_template, $_chunk_id, $_offset_pos + $_chunk_size)
183             );
184 0         0 $_dat_un->();
185              
186 0         0 $_tmp_cs = $_chunk_size;
187 0         0 seek $_IN_FILE, $_offset_pos, 0;
188              
189 0 0       0 if ($_offset_pos) {
190 0   0     0 $_tmp_cs -= length <$_IN_FILE> || 0;
191             }
192              
193 0 0       0 if ($_proc_type == READ_FILE) {
194 0         0 sysseek($_IN_FILE, tell( $_IN_FILE ), 0);
195 0         0 sysread($_IN_FILE, $_, $_tmp_cs, $_p);
196 0         0 seek $_IN_FILE, _systell($_IN_FILE), 0;
197             }
198             else {
199 0         0 read $_IN_FILE, $_, $_tmp_cs, $_p;
200             }
201              
202 0         0 $_ .= <$_IN_FILE>;
203             }
204             else {
205 0 0       0 if ($_proc_type == READ_FILE) {
206 0         0 sysseek($_IN_FILE, $_offset_pos, 0);
207 0         0 sysread($_IN_FILE, $_, $_chunk_size, $_p);
208 0         0 seek $_IN_FILE, _systell($_IN_FILE), 0;
209             }
210             else {
211 0         0 seek $_IN_FILE, $_offset_pos, 0;
212 0         0 read $_IN_FILE, $_, $_chunk_size, $_p;
213             }
214              
215 0         0 $_ .= <$_IN_FILE>;
216              
217 0         0 syswrite(
218             $_QUE_W_SOCK, pack($_que_template, $_chunk_id, tell $_IN_FILE)
219             );
220 0         0 $_dat_un->();
221             }
222             }
223              
224             ## Call user function.
225 51 50       195 if ($_use_slurpio) {
226 0 0 0     0 if ($_chop_len && substr($_, -$_chop_len) eq $_chop_str) {
227 0         0 substr($_, -$_chop_len, $_chop_len, '');
228             }
229 0         0 local $_ = \$_;
230 0         0 $_wuf->($self, $_, $_chunk_id);
231             }
232             else {
233 51 100       136 if ($_chunk_size == 1) {
234 36 50 33     103 if ($_chop_len && substr($_, -$_chop_len) eq $_chop_str) {
235 0         0 substr($_, -$_chop_len, $_chop_len, '');
236             }
237 36         170 $_wuf->($self, [ $_ ], $_chunk_id);
238             }
239             else {
240 15 50       43 if ($_chunk_size > MAX_RECS_SIZE) {
241 0 0       0 local $/ = $_RS if ($/ ne $_RS);
242 0         0 _sync_buffer_to_array(\$_, \@_recs, $_chop_str);
243 0         0 undef $_;
244             }
245 15 50       42 if ($_chop_len) {
246 0         0 for my $i (0 .. @_recs - 1) {
247 0 0       0 if (substr($_recs[$i], -$_chop_len) eq $_chop_str) {
248 0         0 substr($_recs[$i], -$_chop_len, $_chop_len, '');
249             }
250             }
251             }
252 15         32 local $_ = \@_recs;
253 15         57 $_wuf->($self, \@_recs, $_chunk_id);
254             }
255             }
256             }
257              
258             _WORKER_READ_HANDLE__LAST:
259              
260 0           close $_IN_FILE; undef $_IN_FILE;
  0            
261              
262 0           return;
263             }
264              
265             1;
266              
267             __END__