File Coverage

blib/lib/MCE/Core/Input/Request.pm
Criterion Covered Total %
statement 82 102 80.3
branch 30 58 51.7
condition 5 15 33.3
subroutine 4 8 50.0
pod n/a
total 121 183 66.1


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Array reference and Glob reference input reader.
4             ##
5             ## This package provides the request chunk method used internally by the worker
6             ## process. Distribution follows a bank-queuing model.
7             ##
8             ## There is no public API.
9             ##
10             ###############################################################################
11              
12             package MCE::Core::Input::Request;
13              
14 44     44   2512 use strict;
  44         75  
  44         1672  
15 44     44   165 use warnings;
  44         73  
  44         4195  
16              
17             our $VERSION = '1.902';
18              
19             ## Items below are folded into MCE.
20              
21             package # hide from rpm
22             MCE;
23              
24 44     44   262 no warnings qw( threads recursion uninitialized );
  44         68  
  44         44081  
25              
26             ###############################################################################
27             ## ----------------------------------------------------------------------------
28             ## Worker process -- Request chunk.
29             ##
30             ###############################################################################
31              
32             sub _worker_request_chunk {
33              
34 57     57   212 my ($self, $_proc_type) = @_;
35              
36 57         295 @_ = ();
37              
38             _croak('MCE::_worker_request_chunk: (user_func) is not specified')
39 57 50       340 unless (defined $self->{user_func});
40              
41 57 50       479 my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
42 57         174 my $_chn = $self->{_chn};
43 57         156 my $_DAT_LOCK = $self->{_dat_lock};
44 57         178 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
45 57         144 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
46 57         162 my $_lock_chn = $self->{_lock_chn};
47 57         126 my $_chunk_size = $self->{chunk_size};
48 57         127 my $_use_slurpio = $self->{use_slurpio};
49 57   33     682 my $_RS = $self->{RS} || $/;
50 57         335 my $_wuf = $self->{_wuf};
51              
52 57         127 my ($_dat_ex, $_dat_un, $_pid);
53              
54 57 50       276 if ($_lock_chn) {
55 0 0       0 $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$;
56              
57             # inlined for performance
58             $_dat_ex = sub {
59             CORE::lock($_DAT_LOCK->{_t_lock}), MCE::Util::_sock_ready($_DAT_LOCK->{_r_sock})
60 0 0   0   0 if $_is_MSWin32;
61             MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
62 0 0       0 unless $_DAT_LOCK->{ $_pid };
63 0         0 };
64             $_dat_un = sub {
65             syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
66 0 0   0   0 if $_DAT_LOCK->{ $_pid };
67 0         0 };
68             }
69              
70 57         224 my ($_chunk_id, $_len, $_output_tag);
71 57         0 my ($_chop_len, $_chop_str, $_p);
72              
73 57 100       450 if ($_proc_type == REQUEST_ARRAY) {
    100          
74 40         324 $_output_tag = OUTPUT_A_REF;
75 40         99 $_chop_len = 0;
76             }
77             elsif ($_proc_type == REQUEST_HASH) {
78 3         48 $_output_tag = OUTPUT_H_REF;
79 3         9 $_chop_len = 0;
80             }
81             else {
82 14         75 $_output_tag = OUTPUT_G_REF;
83 14 50 33     243 if (length $_RS > 1 && substr($_RS, 0, 1) eq "\n") {
84 0         0 $_chop_str = substr($_RS, 1);
85 0         0 $_chop_len = length $_chop_str;
86             } else {
87 14         79 $_chop_str = '';
88 14         38 $_chop_len = 0;
89             }
90             }
91              
92             ## -------------------------------------------------------------------------
93              
94 57     0   990 $self->{_next_jmp} = sub { goto _WORKER_REQUEST_CHUNK__NEXT; };
  0         0  
95 57     0   624 $self->{_last_jmp} = sub { goto _WORKER_REQUEST_CHUNK__LAST; };
  0         0  
96              
97 57         164 local $_;
98              
99             _WORKER_REQUEST_CHUNK__NEXT:
100              
101 57         141 while (1) {
102 205 50       830 undef $_ if (length > MAX_GC_SIZE);
103              
104 205         638 $_ = '';
105              
106             ## Obtain the next chunk of data.
107             {
108 205 50       336 local $\ = undef if (defined $\);
  205         729  
109 205 50       3551 local $/ = $LF if ($/ ne $LF );
110              
111 205 50       498 $_dat_ex->() if $_lock_chn;
112 205         351 print {$_DAT_W_SOCK} $_output_tag . $LF . $_chn . $LF;
  205         3261  
113 205 50       619 MCE::Util::_sock_ready($_DAU_W_SOCK, -1) if $_is_MSWin32;
114 205         128373 chomp($_len = <$_DAU_W_SOCK>);
115              
116 205 100       1173 unless ($_len) {
117 57 50       354 $_dat_un->() if $_lock_chn;
118 57         442 return;
119             }
120              
121 148         859 chomp($_chunk_id = <$_DAU_W_SOCK>);
122              
123 148 50 66     1422 if ($_chunk_id > 1 && $_chop_len) {
124 0         0 $_p = $_chop_len; $_ = $_chop_str;
  0         0  
125             } else {
126 148         350 $_p = 0;
127             }
128              
129 148         626 read $_DAU_W_SOCK, $_, $_len, $_p;
130              
131 148 50       499 $_dat_un->() if $_lock_chn;
132             }
133              
134             ## Call user function.
135 148 100       469 if ($_proc_type == REQUEST_ARRAY) {
    100          
136 126         1941 my $_chunk_ref = $self->{thaw}($_); undef $_;
  126         313  
137 126 100       349 $_ = ($_chunk_size == 1) ? $_chunk_ref->[0] : $_chunk_ref;
138 126         534 $_wuf->($self, $_chunk_ref, $_chunk_id);
139             }
140             elsif ($_proc_type == REQUEST_HASH) {
141 15         26 my $_chunk_ref = { @{ $self->{thaw}($_) } }; undef $_;
  15         401  
  15         61  
142 15         23 $_ = $_chunk_ref;
143 15         62 $_wuf->($self, $_chunk_ref, $_chunk_id);
144             }
145             else {
146 7         18 $_ = ${ $self->{thaw}($_) };
  7         207  
147 7 100       42 if ($_use_slurpio) {
148 2 50 33     13 if ($_chop_len && substr($_, -$_chop_len) eq $_chop_str) {
149 0         0 substr($_, -$_chop_len, $_chop_len, '');
150             }
151 2         6 local $_ = \$_;
152 2         10 $_wuf->($self, $_, $_chunk_id);
153             }
154             else {
155 5 50       27 if ($_chunk_size == 1) {
156 0 0 0     0 if ($_chop_len && substr($_, -$_chop_len) eq $_chop_str) {
157 0         0 substr($_, -$_chop_len, $_chop_len, '');
158             }
159 0         0 $_wuf->($self, [ $_ ], $_chunk_id);
160             }
161             else {
162 5         15 my @_recs;
163             {
164 5 50       23 local $/ = $_RS if ($/ ne $_RS);
  5         30  
165 5         112 _sync_buffer_to_array(\$_, \@_recs, $_chop_str);
166 5         13 undef $_;
167             }
168 5 50       18 if ($_chop_len) {
169 0         0 for my $i (0 .. @_recs - 1) {
170 0 0       0 if (substr($_recs[$i], -$_chop_len) eq $_chop_str) {
171 0         0 substr($_recs[$i], -$_chop_len, $_chop_len, '');
172             }
173             }
174             }
175 5         16 local $_ = \@_recs;
176 5         29 $_wuf->($self, \@_recs, $_chunk_id);
177             }
178             }
179             }
180             }
181              
182             _WORKER_REQUEST_CHUNK__LAST:
183              
184 0           return;
185             }
186              
187             1;
188              
189             __END__
190              
191             ###############################################################################
192             ## ----------------------------------------------------------------------------
193             ## Module usage.
194             ##
195             ###############################################################################
196              
197             =head1 NAME
198              
199             MCE::Core::Input::Request - Array reference and Glob reference input reader
200              
201             =head1 VERSION
202              
203             This document describes MCE::Core::Input::Request version 1.902
204              
205             =head1 DESCRIPTION
206              
207             This package provides the request chunk method used internally by the worker
208             process. Distribution follows a bank-queuing model.
209              
210             There is no public API.
211              
212             =head1 SEE ALSO
213              
214             The syntax for the C<input_data> option is described in L<MCE::Core>.
215              
216             =head1 AUTHOR
217              
218             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
219              
220             =cut
221