File Coverage

blib/lib/MCE/Grep.pm
Criterion Covered Total %
statement 175 236 74.1
branch 81 168 48.2
condition 24 58 41.3
subroutine 15 19 78.9
pod 5 5 100.0
total 300 486 61.7


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Parallel grep model similar to the native grep function.
4             ##
5             ###############################################################################
6              
7             package MCE::Grep;
8              
9 4     4   338360 use strict;
  4         5  
  4         170  
10 4     4   18 use warnings;
  4         21  
  4         262  
11              
12 4     4   27 no warnings qw( threads recursion uninitialized );
  4         11  
  4         299  
13              
14             our $VERSION = '1.902';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 4     4   25 use Scalar::Util qw( looks_like_number weaken );
  4         11  
  4         257  
21 4     4   2088 use MCE;
  4         12  
  4         24  
22              
23             our @CARP_NOT = qw( MCE );
24              
25             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
26              
27             sub CLONE {
28 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
29             }
30              
31             ###############################################################################
32             ## ----------------------------------------------------------------------------
33             ## Import routine.
34             ##
35             ###############################################################################
36              
37             my ($_MCE, $_def, $_params, $_prev_c, $_tag) = ({}, {}, {}, {}, 'MCE::Grep');
38              
39             sub import {
40 4     4   66 my ($_class, $_pkg) = (shift, caller);
41              
42 4         16 my $_p = $_def->{$_pkg} = {
43             MAX_WORKERS => 'auto',
44             CHUNK_SIZE => 'auto',
45             };
46              
47             ## Import functions.
48 4 50       22 if ($_pkg !~ /^MCE::/) {
49 4     4   26 no strict 'refs'; no warnings 'redefine';
  4     4   6  
  4         164  
  4         17  
  4         5  
  4         12876  
50 4         10 *{ $_pkg.'::mce_grep_f' } = \&run_file;
  4         24  
51 4         6 *{ $_pkg.'::mce_grep_s' } = \&run_seq;
  4         16  
52 4         6 *{ $_pkg.'::mce_grep' } = \&run;
  4         17  
53             }
54              
55             ## Process module arguments.
56 4         12 while ( my $_argument = shift ) {
57 0         0 my $_arg = lc $_argument;
58              
59 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
60 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
61 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
62 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
63 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
64 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
65 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
66              
67             ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
68 0 0       0 if ( $_arg eq 'sereal' ) {
69 0 0       0 if ( shift eq '0' ) {
70 0         0 require Storable;
71 0         0 $_p->{FREEZE} = \&Storable::freeze;
72 0         0 $_p->{THAW} = \&Storable::thaw;
73             }
74 0         0 next;
75             }
76              
77 0         0 _croak("Error: ($_argument) invalid module option");
78             }
79              
80 4         25 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
81              
82 4         21 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
83             MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
84 4 50       20 unless ($_p->{CHUNK_SIZE} eq 'auto');
85              
86 4         102 return;
87             }
88              
89             ###############################################################################
90             ## ----------------------------------------------------------------------------
91             ## Gather callback for storing by chunk_id => chunk_ref into a hash.
92             ##
93             ###############################################################################
94              
95             my ($_total_chunks, %_tmp);
96              
97             sub _gather {
98              
99 37     37   54 my ($_chunk_id, $_data_ref) = @_;
100              
101 37         138 $_tmp{$_chunk_id} = $_data_ref;
102 37         30 $_total_chunks++;
103              
104 37         66 return;
105             }
106              
107             ###############################################################################
108             ## ----------------------------------------------------------------------------
109             ## Init and finish routines.
110             ##
111             ###############################################################################
112              
113             sub MCE::Grep::_guard::DESTROY {
114 0     0   0 my ($_pkg, $_id) = @{ $_[0] };
  0         0  
115              
116 0 0 0     0 if (defined $_pkg && $_id eq "$$.$_tid") {
117 0         0 @{ $_[0] } = ();
  0         0  
118 0         0 MCE::Grep->finish($_pkg);
119             }
120              
121 0         0 return;
122             }
123              
124             sub init (@) {
125              
126 3 50 33 3 1 591 shift if (defined $_[0] && $_[0] eq 'MCE::Grep');
127 3         27 my $_pkg = "$$.$_tid.".caller();
128              
129 3 50       24 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
130              
131             _croak("$_tag: (HASH) not allowed as input by this MCE model")
132 3 50       12 if ( ref $_params->{$_pkg}{input_data} eq 'HASH' );
133              
134 3         6 @_ = ();
135              
136             defined wantarray
137 3 50       9 ? bless([$_pkg, "$$.$_tid"], MCE::Grep::_guard::)
138             : ();
139             }
140              
141             sub finish (@) {
142              
143 7 50 33 7 1 990 shift if (defined $_[0] && $_[0] eq 'MCE::Grep');
144 7 100       87 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
145              
146 7 100 66     92 if ( $_pkg eq 'MCE' ) {
    100          
147 4         10 for my $_k ( keys %{ $_MCE } ) { MCE::Grep->finish($_k, 1); }
  4         40  
  2         108  
148             }
149             elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
150 1 50       23 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
151 1         5 $_total_chunks = undef, undef %_tmp;
152              
153 1         6 delete $_prev_c->{$_pkg};
154 1         14 delete $_MCE->{$_pkg};
155             }
156              
157 7         18 @_ = ();
158              
159 7         23 return;
160             }
161              
162             ###############################################################################
163             ## ----------------------------------------------------------------------------
164             ## Parallel grep with MCE -- file.
165             ##
166             ###############################################################################
167              
168             sub run_file (&@) {
169              
170 2 50 33 2 1 1387 shift if (defined $_[0] && $_[0] eq 'MCE::Grep');
171              
172 2         3 my $_code = shift; my $_file = shift;
  2         3  
173 2         13 my $_pid = "$$.$_tid.".caller();
174              
175 2 50       6 if (defined (my $_p = $_params->{$_pid})) {
176 2 50       5 delete $_p->{input_data} if (exists $_p->{input_data});
177 2 50       4 delete $_p->{sequence} if (exists $_p->{sequence});
178             }
179             else {
180 0         0 $_params->{$_pid} = {};
181             }
182              
183 2 100 66     50 if (defined $_file && ref $_file eq '' && $_file ne '') {
    50 66        
      33        
184 1 50       36 _croak("$_tag: ($_file) does not exist") unless (-e $_file);
185 1 50       9 _croak("$_tag: ($_file) is not readable") unless (-r $_file);
186 1 50       6 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
187 1         11 $_params->{$_pid}{_file} = $_file;
188             }
189             elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
190 1         3 $_params->{$_pid}{_file} = $_file;
191             }
192             else {
193 0         0 _croak("$_tag: (file) is not specified or valid");
194             }
195              
196 2         3 @_ = ();
197              
198 2         5 return run($_code);
199             }
200              
201             ###############################################################################
202             ## ----------------------------------------------------------------------------
203             ## Parallel grep with MCE -- sequence.
204             ##
205             ###############################################################################
206              
207             sub run_seq (&@) {
208              
209 1 50 33 1 1 936 shift if (defined $_[0] && $_[0] eq 'MCE::Grep');
210              
211 1         2 my $_code = shift;
212 1         7 my $_pid = "$$.$_tid.".caller();
213              
214 1 50       4 if (defined (my $_p = $_params->{$_pid})) {
215 1 50       3 delete $_p->{input_data} if (exists $_p->{input_data});
216 1 50       3 delete $_p->{_file} if (exists $_p->{_file});
217             }
218             else {
219 0         0 $_params->{$_pid} = {};
220             }
221              
222 1         1 my ($_begin, $_end);
223              
224 1 50 33     7 if (ref $_[0] eq 'HASH') {
    50          
    50          
225 0         0 $_begin = $_[0]->{begin}, $_end = $_[0]->{end};
226 0         0 $_params->{$_pid}{sequence} = $_[0];
227             }
228             elsif (ref $_[0] eq 'ARRAY') {
229 0 0 0     0 if (@{ $_[0] } > 3 && $_[0]->[3] =~ /\d$/) {
  0         0  
230 0         0 $_begin = $_[0]->[0], $_end = $_[0]->[-1];
231 0         0 $_params->{$_pid}{sequence} = [ $_[0]->[0], $_[0]->[-1] ];
232             }
233             else {
234 0         0 $_begin = $_[0]->[0], $_end = $_[0]->[1];
235 0         0 $_params->{$_pid}{sequence} = $_[0];
236             }
237             }
238             elsif (ref $_[0] eq '' || ref($_[0]) =~ /^Math::/) {
239 1 50 33     3 if (@_ > 3 && $_[3] =~ /\d$/) {
240 0         0 $_begin = $_[0], $_end = $_[-1];
241 0         0 $_params->{$_pid}{sequence} = [ $_[0], $_[-1] ];
242             }
243             else {
244 1         2 $_begin = $_[0], $_end = $_[1];
245 1         3 $_params->{$_pid}{sequence} = [ @_ ];
246             }
247             }
248             else {
249 0         0 _croak("$_tag: (sequence) is not specified or valid");
250             }
251              
252 1 50       2 _croak("$_tag: (begin) is not specified for sequence")
253             unless (defined $_begin);
254 1 50       3 _croak("$_tag: (end) is not specified for sequence")
255             unless (defined $_end);
256              
257 1         1 $_params->{$_pid}{sequence_run} = undef;
258              
259 1         3 @_ = ();
260              
261 1         3 return run($_code);
262             }
263              
264             ###############################################################################
265             ## ----------------------------------------------------------------------------
266             ## Parallel grep with MCE.
267             ##
268             ###############################################################################
269              
270             sub run (&@) {
271              
272 7 50 33 7 1 2994 shift if (defined $_[0] && $_[0] eq 'MCE::Grep');
273              
274 7         12 my $_code = shift; $_total_chunks = 0; undef %_tmp;
  7         12  
  7         39  
275 7 100       40 my $_pkg = caller() eq 'MCE::Grep' ? caller(1) : caller();
276 7         39 my $_pid = "$$.$_tid.$_pkg";
277              
278 7         11 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
  7         13  
279 7         11 my $_r = ref $_[0];
280              
281 7 100 66     63 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|CODE|GLOB|FileHandle|IO::|Iterator::)/) {
282 1 50       8 _croak("$_tag: (HASH) not allowed as input by this MCE model")
283             if $_r eq 'HASH';
284 1         4 $_input_data = shift;
285             }
286              
287 7 50       23 if (defined (my $_p = $_params->{$_pid})) {
288             $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
289 7 50       63 if (exists $_p->{max_workers});
290              
291 7 100 100     50 delete $_p->{sequence} if (defined $_input_data || scalar @_);
292 7 50       15 delete $_p->{user_func} if (exists $_p->{user_func});
293 7 50       15 delete $_p->{user_tasks} if (exists $_p->{user_tasks});
294 7 50       11 delete $_p->{use_slurpio} if (exists $_p->{use_slurpio});
295 7 50       13 delete $_p->{bounds_only} if (exists $_p->{bounds_only});
296 7 50       19 delete $_p->{gather} if (exists $_p->{gather});
297             }
298              
299 7         8 my $_chunk_size = do {
300 7   50     15 my $_p = $_params->{$_pid} || {};
301             (defined $_p->{init_relay} || defined $_def->{$_pkg}{INIT_RELAY}) ? 1 :
302             MCE::_parse_chunk_size(
303 7 50 33     79 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
304             $_input_data, scalar @_
305             );
306             };
307              
308 7 50       21 if (defined (my $_p = $_params->{$_pid})) {
309 7 100       14 if (exists $_p->{_file}) {
310 2         5 $_input_data = delete $_p->{_file};
311             } else {
312 5 50       12 $_input_data = $_p->{input_data} if exists $_p->{input_data};
313             }
314             }
315              
316             ## -------------------------------------------------------------------------
317              
318 7         50 MCE::_save_state($_MCE->{$_pid});
319              
320 7 100 66     90 if (!defined $_prev_c->{$_pid} || $_prev_c->{$_pid} != $_code) {
321 3 50       9 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
322 3         9 $_prev_c->{$_pid} = $_code;
323              
324             my %_opts = (
325             max_workers => $_max_workers, task_name => $_tag,
326             user_func => sub {
327              
328 37     37   77 my ($_mce, $_chunk_ref, $_chunk_id) = @_;
329 37         79 my $_wantarray = $_mce->{user_args}[0];
330              
331 37 50       69 if ($_wantarray) {
332 37         54 my @_a;
333              
334 37 100       128 if (ref $_chunk_ref eq 'SCALAR') {
335 1 50       8 local $/ = $_mce->{RS} if defined $_mce->{RS};
336 1         64 open my $_MEM_FH, '<', $_chunk_ref;
337 1         7 binmode $_MEM_FH, ':raw';
338 1 100       8 while (<$_MEM_FH>) { push (@_a, $_) if &{ $_code }; }
  9         100  
  9         14  
339 1         5 close $_MEM_FH;
340 1         8 weaken $_MEM_FH;
341             }
342             else {
343 36 100       50 if (ref $_chunk_ref) {
344 27         35 push @_a, grep { &{ $_code } } @{ $_chunk_ref };
  27         63  
  27         110  
  27         55  
345             } else {
346 9         19 push @_a, grep { &{ $_code } } $_chunk_ref;
  9         12  
  9         31  
347             }
348             }
349              
350 37         412 MCE->gather($_chunk_id, \@_a);
351             }
352             else {
353 0         0 my $_cnt = 0;
354              
355 0 0       0 if (ref $_chunk_ref eq 'SCALAR') {
356 0 0       0 local $/ = $_mce->{RS} if defined $_mce->{RS};
357 0         0 open my $_MEM_FH, '<', $_chunk_ref;
358 0         0 binmode $_MEM_FH, ':raw';
359 0 0       0 while (<$_MEM_FH>) { $_cnt++ if &{ $_code }; }
  0         0  
  0         0  
360 0         0 close $_MEM_FH;
361 0         0 weaken $_MEM_FH;
362             }
363             else {
364 0 0       0 if (ref $_chunk_ref) {
365 0         0 $_cnt += grep { &{ $_code } } @{ $_chunk_ref };
  0         0  
  0         0  
  0         0  
366             } else {
367 0         0 $_cnt += grep { &{ $_code } } $_chunk_ref;
  0         0  
  0         0  
368             }
369             }
370              
371 0 0       0 MCE->gather($_cnt) if defined $_wantarray;
372             }
373             },
374 3         60 );
375              
376 3 50       15 if (defined (my $_p = $_params->{$_pid})) {
377 3         3 for my $_k (keys %{ $_p }) {
  3         18  
378 3 50       9 next if ($_k eq 'sequence_run');
379 3 50       12 next if ($_k eq 'input_data');
380 3 50       9 next if ($_k eq 'chunk_size');
381              
382             _croak("$_tag: ($_k) is not a valid constructor argument")
383 3 50       9 unless (exists $MCE::_valid_fields_new{$_k});
384              
385 3         9 $_opts{$_k} = $_p->{$_k};
386             }
387             }
388              
389 3         9 for my $_k (qw/ tmp_dir freeze thaw init_relay use_threads /) {
390             $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
391 15 50 33     99 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
392             }
393              
394 3         36 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
395             }
396              
397             ## -------------------------------------------------------------------------
398              
399 7         13 my $_cnt = 0; my $_wantarray = wantarray;
  7         13  
400              
401 7 100       39 $_MCE->{$_pid}{use_slurpio} = ($_chunk_size > &MCE::MAX_RECS_SIZE) ? 1 : 0;
402 7         40 $_MCE->{$_pid}{user_args} = [ $_wantarray ];
403              
404             $_MCE->{$_pid}{gather} = $_wantarray
405 7 50   0   43 ? \&_gather : sub { $_cnt += $_[0]; return; };
  0         0  
  0         0  
406              
407 7 100       25 if (defined $_input_data) {
    100          
408 3         5 @_ = ();
409 3         25 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
410 3         11 delete $_MCE->{$_pid}{input_data};
411             }
412             elsif (scalar @_) {
413 3         36 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
414 1         23 delete $_MCE->{$_pid}{input_data};
415             }
416             else {
417 1 50 33     15 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
418             $_MCE->{$_pid}->run({
419             chunk_size => $_chunk_size,
420             sequence => $_params->{$_pid}{sequence}
421 1         7 }, 0);
422 1 50       4 if (exists $_params->{$_pid}{sequence_run}) {
423 1         2 delete $_params->{$_pid}{sequence_run};
424 1         16 delete $_params->{$_pid}{sequence};
425             }
426 1         2 delete $_MCE->{$_pid}{sequence};
427             }
428             }
429              
430 5         29 MCE::_restore_state();
431              
432 5 50       8 if ($_wantarray) {
    0          
433 5         27 return map { @{ $_ } } delete @_tmp{ 1 .. $_total_chunks };
  37         27  
  37         181  
434             }
435             elsif (defined $_wantarray) {
436 0           return $_cnt;
437             }
438              
439 0           return;
440             }
441              
442             ###############################################################################
443             ## ----------------------------------------------------------------------------
444             ## Private methods.
445             ##
446             ###############################################################################
447              
448             sub _croak {
449              
450 0     0     goto &MCE::_croak;
451             }
452              
453             1;
454              
455             __END__
456              
457             ###############################################################################
458             ## ----------------------------------------------------------------------------
459             ## Module usage.
460             ##
461             ###############################################################################
462              
463             =head1 NAME
464              
465             MCE::Grep - Parallel grep model similar to the native grep function
466              
467             =head1 VERSION
468              
469             This document describes MCE::Grep version 1.902
470              
471             =head1 SYNOPSIS
472              
473             ## Exports mce_grep, mce_grep_f, and mce_grep_s
474             use MCE::Grep;
475              
476             ## Array or array_ref
477             my @a = mce_grep { $_ % 5 == 0 } 1..10000;
478             my @b = mce_grep { $_ % 5 == 0 } \@list;
479              
480             ## Important; pass an array_ref for deeply input data
481             my @c = mce_grep { $_->[1] % 2 == 0 } [ [ 0, 1 ], [ 0, 2 ], ... ];
482             my @d = mce_grep { $_->[1] % 2 == 0 } \@deeply_list;
483              
484             ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
485             ## Workers read directly and not involve the manager process
486             my @e = mce_grep_f { /pattern/ } "/path/to/file"; # efficient
487              
488             ## Involves the manager process, therefore slower
489             my @f = mce_grep_f { /pattern/ } $file_handle;
490             my @g = mce_grep_f { /pattern/ } $io;
491             my @h = mce_grep_f { /pattern/ } \$scalar;
492              
493             ## Sequence of numbers (begin, end [, step, format])
494             my @i = mce_grep_s { %_ * 3 == 0 } 1, 10000, 5;
495             my @j = mce_grep_s { %_ * 3 == 0 } [ 1, 10000, 5 ];
496              
497             my @k = mce_grep_s { %_ * 3 == 0 } {
498             begin => 1, end => 10000, step => 5, format => undef
499             };
500              
501             =head1 DESCRIPTION
502              
503             This module provides a parallel grep implementation via Many-Core Engine.
504             MCE incurs a small overhead due to passing of data. A fast code block will
505             run faster natively. However, the overhead will likely diminish as the
506             complexity increases for the code.
507              
508             my @m1 = grep { $_ % 5 == 0 } 1..1000000; ## 0.065 secs
509             my @m2 = mce_grep { $_ % 5 == 0 } 1..1000000; ## 0.194 secs
510              
511             Chunking, enabled by default, greatly reduces the overhead behind the scene.
512             The time for mce_grep below also includes the time for data exchanges between
513             the manager and worker processes. More parallelization will be seen when the
514             code incurs additional CPU time.
515              
516             my @m1 = grep { /[2357][1468][9]/ } 1..1000000; ## 0.353 secs
517             my @m2 = mce_grep { /[2357][1468][9]/ } 1..1000000; ## 0.218 secs
518              
519             Even faster is mce_grep_s; useful when input data is a range of numbers.
520             Workers generate sequences mathematically among themselves without any
521             interaction from the manager process. Two arguments are required for
522             mce_grep_s (begin, end). Step defaults to 1 if begin is smaller than end,
523             otherwise -1.
524              
525             my @m3 = mce_grep_s { /[2357][1468][9]/ } 1, 1000000; ## 0.165 secs
526              
527             Although this document is about MCE::Grep, the L<MCE::Stream> module can write
528             results immediately without waiting for all chunks to complete. This is made
529             possible by passing the reference to an array (in this case @m4 and @m5).
530              
531             use MCE::Stream default_mode => 'grep';
532              
533             my @m4; mce_stream \@m4, sub { /[2357][1468][9]/ }, 1..1000000;
534              
535             ## Completed in 0.203 secs. This is amazing considering the
536             ## overhead for passing data between the manager and workers.
537              
538             my @m5; mce_stream_s \@m5, sub { /[2357][1468][9]/ }, 1, 1000000;
539              
540             ## Completed in 0.120 secs. Like with mce_grep_s, specifying a
541             ## sequence specification turns out to be faster due to lesser
542             ## overhead for the manager process.
543              
544             A common scenario is grepping for pattern(s) inside a massive log file.
545             Notice how parallelism increases as complexity increases for the pattern.
546             Testing was done against a 300 MB file containing 250k lines.
547              
548             use MCE::Grep;
549              
550             my @m; open my $LOG, "<", "/path/to/log/file" or die "$!\n";
551              
552             @m = grep { /pattern/ } <$LOG>; ## 0.756 secs
553             @m = grep { /foobar|[2357][1468][9]/ } <$LOG>; ## 24.681 secs
554              
555             ## Parallelism with mce_grep. This involves the manager process
556             ## due to processing a file handle.
557              
558             @m = mce_grep { /pattern/ } <$LOG>; ## 0.997 secs
559             @m = mce_grep { /foobar|[2357][1468][9]/ } <$LOG>; ## 7.439 secs
560              
561             ## Even faster with mce_grep_f. Workers access the file directly
562             ## with zero interaction from the manager process.
563              
564             my $LOG = "/path/to/file";
565             @m = mce_grep_f { /pattern/ } $LOG; ## 0.112 secs
566             @m = mce_grep_f { /foobar|[2357][1468][9]/ } $LOG; ## 6.840 secs
567              
568             =head1 PARSING HUGE FILES
569              
570             The MCE::Grep module lacks an optimization for quickly determining if a match
571             is found from not knowing the pattern inside the code block. Use the following
572             snippet as a template to achieve better performance. Also, take a look at
573             examples/egrep.pl, included with the distribution.
574              
575             use MCE::Loop;
576              
577             MCE::Loop->init(
578             max_workers => 8, use_slurpio => 1
579             );
580              
581             my $pattern = 'karl';
582             my $hugefile = 'very_huge.file';
583              
584             my @result = mce_loop_f {
585             my ($mce, $slurp_ref, $chunk_id) = @_;
586              
587             ## Quickly determine if a match is found.
588             ## Process slurped chunk only if true.
589              
590             if ($$slurp_ref =~ /$pattern/m) {
591             my @matches;
592              
593             ## The following is fast on Unix. Performance degrades
594             ## drastically on Windows beyond 4 workers.
595              
596             open my $MEM_FH, '<', $slurp_ref;
597             binmode $MEM_FH, ':raw';
598             while (<$MEM_FH>) { push @matches, $_ if (/$pattern/); }
599             close $MEM_FH;
600              
601             ## Therefore, use the following construct on Windows.
602              
603             while ( $$slurp_ref =~ /([^\n]+\n)/mg ) {
604             my $line = $1; # save $1 to not lose the value
605             push @matches, $line if ($line =~ /$pattern/);
606             }
607              
608             ## Gather matched lines.
609              
610             MCE->gather(@matches);
611             }
612              
613             } $hugefile;
614              
615             print join('', @result);
616              
617             =head1 OVERRIDING DEFAULTS
618              
619             The following list options which may be overridden when loading the module.
620              
621             use Sereal qw( encode_sereal decode_sereal );
622             use CBOR::XS qw( encode_cbor decode_cbor );
623             use JSON::XS qw( encode_json decode_json );
624              
625             use MCE::Grep
626             max_workers => 4, # Default 'auto'
627             chunk_size => 100, # Default 'auto'
628             tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
629             freeze => \&encode_sereal, # \&Storable::freeze
630             thaw => \&decode_sereal, # \&Storable::thaw
631             init_relay => 0, # Default undef; MCE 1.882+
632             use_threads => 0, # Default undef; MCE 1.882+
633             ;
634              
635             From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if available.
636             Specify C<< Sereal => 0 >> to use Storable instead.
637              
638             use MCE::Grep Sereal => 0;
639              
640             =head1 CUSTOMIZING MCE
641              
642             =over 3
643              
644             =item MCE::Grep->init ( options )
645              
646             =item MCE::Grep::init { options }
647              
648             =back
649              
650             The init function accepts a hash of MCE options. The gather option, if
651             specified, is ignored due to being used internally by the module.
652              
653             In scalar context (API available since 1.897), call C<MCE::Grep->finish>
654             automatically upon leaving the scope or program.
655              
656             use MCE::Grep;
657              
658             my $guard = MCE::Grep->init(
659             chunk_size => 1, max_workers => 4,
660              
661             user_begin => sub {
662             print "## ", MCE->wid, " started\n";
663             },
664              
665             user_end => sub {
666             print "## ", MCE->wid, " completed\n";
667             }
668             );
669              
670             my @a = mce_grep { $_ % 5 == 0 } 1..100;
671              
672             print "\n", "@a", "\n";
673              
674             -- Output
675              
676             ## 2 started
677             ## 3 started
678             ## 1 started
679             ## 4 started
680             ## 3 completed
681             ## 4 completed
682             ## 1 completed
683             ## 2 completed
684              
685             5 10 15 20 25 30 35 40 45 50 55 60 65 70 75 80 85 90 95 100
686              
687             =head1 API DOCUMENTATION
688              
689             =over 3
690              
691             =item MCE::Grep->run ( sub { code }, list )
692              
693             =item mce_grep { code } list
694              
695             =back
696              
697             Input data may be defined using a list or an array reference. Unlike MCE::Loop,
698             Flow, and Step, specifying a hash reference as input data isn't allowed.
699              
700             ## Array or array_ref
701             my @a = mce_grep { /[2357]/ } 1..1000;
702             my @b = mce_grep { /[2357]/ } \@list;
703              
704             ## Important; pass an array_ref for deeply input data
705             my @c = mce_grep { $_->[1] =~ /[2357]/ } [ [ 0, 1 ], [ 0, 2 ], ... ];
706             my @d = mce_grep { $_->[1] =~ /[2357]/ } \@deeply_list;
707              
708             ## Not supported
709             my @z = mce_grep { ... } \%hash;
710              
711             =over 3
712              
713             =item MCE::Grep->run_file ( sub { code }, file )
714              
715             =item mce_grep_f { code } file
716              
717             =back
718              
719             The fastest of these is the /path/to/file. Workers communicate the next offset
720             position among themselves with zero interaction by the manager process.
721              
722             C<IO::All> { File, Pipe, STDIO } is supported since MCE 1.845.
723              
724             my @c = mce_grep_f { /pattern/ } "/path/to/file"; # faster
725             my @d = mce_grep_f { /pattern/ } $file_handle;
726             my @e = mce_grep_f { /pattern/ } $io; # IO::All
727             my @f = mce_grep_f { /pattern/ } \$scalar;
728              
729             =over 3
730              
731             =item MCE::Grep->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
732              
733             =item mce_grep_s { code } $beg, $end [, $step, $fmt ]
734              
735             =back
736              
737             Sequence may be defined as a list, an array reference, or a hash reference.
738             The functions require both begin and end values to run. Step and format are
739             optional. The format is passed to sprintf (% may be omitted below).
740              
741             my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
742              
743             my @f = mce_grep_s { /[1234]\.[5678]/ } $beg, $end, $step, $fmt;
744             my @g = mce_grep_s { /[1234]\.[5678]/ } [ $beg, $end, $step, $fmt ];
745              
746             my @h = mce_grep_s { /[1234]\.[5678]/ } {
747             begin => $beg, end => $end,
748             step => $step, format => $fmt
749             };
750              
751             =over 3
752              
753             =item MCE::Grep->run ( sub { code }, iterator )
754              
755             =item mce_grep { code } iterator
756              
757             =back
758              
759             An iterator reference may be specified for input_data. Iterators are described
760             under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
761              
762             my @a = mce_grep { $_ % 3 == 0 } make_iterator(10, 30, 2);
763              
764             =head1 MANUAL SHUTDOWN
765              
766             =over 3
767              
768             =item MCE::Grep->finish
769              
770             =item MCE::Grep::finish
771              
772             =back
773              
774             Workers remain persistent as much as possible after running. Shutdown occurs
775             automatically when the script terminates. Call finish when workers are no
776             longer needed.
777              
778             use MCE::Grep;
779              
780             MCE::Grep->init(
781             chunk_size => 20, max_workers => 'auto'
782             );
783              
784             my @a = mce_grep { ... } 1..100;
785              
786             MCE::Grep->finish;
787              
788             =head1 INDEX
789              
790             L<MCE|MCE>, L<MCE::Core>
791              
792             =head1 AUTHOR
793              
794             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
795              
796             =cut
797