File Coverage

blib/lib/IO/Pipeline.pm
Criterion Covered Total %
statement 63 64 98.4
branch 16 22 72.7
condition 3 3 100.0
subroutine 18 18 100.0
pod 3 7 42.8
total 103 114 90.3


line stmt bran cond sub pod time code
1             package IO::Pipeline;
2              
3 1     1   620 use strict;
  1         2  
  1         31  
4 1     1   6 use warnings FATAL => 'all';
  1         2  
  1         27  
5 1     1   30 use 5.008001;
  1         6  
  1         42  
6 1     1   6 use Scalar::Util qw(blessed);
  1         1  
  1         122  
7 1     1   1134 use IO::Handle;
  1         8501  
  1         52  
8 1     1   10 use Exporter ();
  1         3  
  1         219  
9              
10             our @ISA = qw(Exporter);
11              
12             our @EXPORT = qw(pmap pgrep psink);
13              
14             our $VERSION = '0.009002'; # 0.9.2
15              
16             $VERSION = eval $VERSION;
17              
18             sub import {
19 1     1   32 warnings->unimport('void');
20 1         130 shift->export_to_level(1, @_);
21             }
22              
23 3     3 1 1615 sub pmap (&) { IO::Pipeline->from_code_map($_[0]) }
24 1     1 1 5 sub pgrep (&) { IO::Pipeline->from_code_grep($_[0]) }
25 1     1 1 6 sub psink (&) { IO::Pipeline->from_code_sink($_[0]) }
26              
27             use overload
28 1         7 '|' => '_pipe_operator',
29 1     1   1820 fallback => 1;
  1         1063  
30              
31             sub IO::Pipeline::CodeSink::print {
32 5     5   9 my $code = (shift)->{code};
33 5         10 foreach my $line (@_) {
34 5         7 local $_ = $line;
35 5         20 $code->($line);
36             }
37             }
38              
39             sub from_code_map {
40 4     4 0 137 bless({ map => [ $_[1] ] }, $_[0]);
41             }
42              
43             sub from_code_grep {
44 1     1 0 2 my ($class, $grep) = @_;
45 1 100   8   9 $class->from_code_map(sub { $grep->($_) ? ($_) : () });
  8         17  
46             }
47              
48             sub from_code_sink {
49 1     1 0 5 bless({ code => $_[1] }, 'IO::Pipeline::CodeSink');
50             }
51              
52             sub _pipe_operator {
53 5     5   11 my ($self, $other, $reversed) = @_;
54 5 100 100     44 if (blessed($other) && $other->isa('IO::Pipeline')) {
55 3 50       9 my ($left, $right) = $reversed ? ($other, $self) : ($self, $other);
56 3         5 my %new = (map => [ @{$left->{map}}, @{$right->{map}} ]);
  3         7  
  3         10  
57 3 50       9 die "Right hand side has a source, makes no sense"
58             if $right->{source};
59 3 50       9 $new{source} = $left->{source} if $left->{source};
60 3 50       7 die "Left hand side has a sink, makes no sense"
61             if $left->{sink};
62 3 50       9 $new{sink} = $right->{sink} if $right->{sink};
63 3         21 return bless(\%new, ref($self));
64             } else {
65 2 100       8 my ($is, $isnt) = $reversed ? qw(source sink) : qw(sink source);
66 2 50       8 if (my $fail = $self->{$is}) {
67 0         0 die "Tried to add ${is} ${other} but we already had ${fail}";
68             }
69 2         10 my $new = bless({ $is => $other, %$self }, ref($self));
70 2 100       7 if ($new->{$isnt}) {
71 1         6 $new->run;
72 1         88 return;
73             } else {
74 1         8 return $new;
75             }
76             }
77             }
78              
79             sub run {
80 1     1 0 2 my ($self) = @_;
81 1         3 my $source = $self->{source};
82 1         2 my $sink = $self->{sink};
83 1         39 LINE: while (defined(my $line = $source->getline)) {
84 8         324 my @lines = ($line);
85 8         9 foreach my $map (@{$self->{map}}) {
  8         19  
86 26         56 @lines = map $map->($_), @lines;
87 26 100       284 next LINE unless @lines;
88             }
89 5         14 $sink->print(@lines);
90             }
91             }
92              
93             =head1 NAME
94              
95             IO::Pipeline - map and grep for filehandles, unix pipe style
96              
97             =head1 SYNOPSIS
98              
99             my $source = <<'END';
100             2010-03-21 16:15:30 1NtNoI-000658-6V Completed
101             2010-03-21 16:17:29 1NtNlx-00062B-0R Completed
102             2010-03-21 16:20:37 1NtNtF-0006AE-G6 Completed
103             2010-03-21 16:28:37 no host name found for IP address 218.108.42.254
104             2010-03-21 16:28:51 H=(ZTZUWWCRQY) [218.108.42.254] F= rejected RCPT : rejected because 218.108.42.254 is in a black list at zen.spamhaus.org
105             2010-03-21 16:28:51 unexpected disconnection while reading SMTP command from (ZTZUWWCRQY) [218.108.42.254] (error: Connection reset by peer)
106             2010-03-21 16:35:57 no host name found for IP address 123.122.231.66
107             2010-03-21 16:35:59 H=(LFMTSDM) [123.122.231.66] F= rejected RCPT : rejected because 123.122.231.66 is in a black list at zen.spamhaus.org
108             END
109            
110             open my $in, '<', \$source
111             or die "Failed to create filehandle from scalar: $!";
112            
113             my $out;
114            
115             $in
116             | pmap { [ /^(\S+) (\S+) (.*)$/ ] }
117             | pgrep { $_->[2] =~ /rejected|Completed/ }
118             | pmap { [ @{$_}[0, 1], $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed' ] }
119             | pmap { join(' ', @$_)."\n" }
120             | psink { $out .= $_ };
121            
122             print $out;
123              
124             will print:
125              
126             2010-03-21 16:15:30 Completed
127             2010-03-21 16:17:29 Completed
128             2010-03-21 16:20:37 Completed
129             2010-03-21 16:28:51 Rejected
130             2010-03-21 16:35:59 Rejected
131              
132             =head1 DESCRIPTION
133              
134             IO::Pipeline was born of the idea that I really like writing map/grep type
135             expressions in perl, but writing:
136              
137             map { ... } <$fh>;
138              
139             does a slurp of the filehandle, and when processing big log files I tend
140             to Not Want That To Happen. Plus, map restricts us to right-to-left processing
141             and I've always been fond of the shell metaphor of connecting commands
142             together left-to-read in a pipeline.
143              
144             So, this module was born.
145              
146             use IO::Pipeline;
147              
148             will export three functions - L, L and L. The first
149             two are the meat of the module, the last one is a means to test by sending
150             results somewhere other than a filehandle (or to chain IO::Pipeline output
151             on to ... well, anywhere else, really).
152              
153             pmap and pgrep both return pipeline objects (currently of class IO::Pipeline,
154             but this is considered an implementation detail, not a feature - so please
155             don't write code that relies on it) that provide an overloaded '|' operator.
156              
157             my $mapper = pmap { "[header] ".$_ };
158              
159             my $filter = pgrep { /ALERT/ };
160              
161             When you use | to chain two pipeline objects together, you get another
162             pipeline object:
163              
164             my $combined = $mapper | $filter;
165              
166             Although since we're going left to right, you probably want to do the grep
167             first:
168              
169             my $combined = $filter | $mapper;
170              
171             (but it's all the same to IO::Pipeline, of course)
172              
173             When you use | with a filehandle on one side, that sets the start or
174             finish of the pipeline, so:
175              
176             my $combined_with_input = $readable_fh | $combined;
177              
178             my $combined_with_output = $combined | $writeable_fh;
179              
180             and if you don't want a real filehandle for the second option, you can use
181             psink:
182              
183             my $output = '';
184            
185             my $combined_with_output = $combined | psink { $output .= $_ };
186              
187             Once both an input and an output have been provided, IO::Pipeline runs the
188             full pipeline, reading from the input and pushing one line at a time down
189             the pipe to the output until the input filehandle is exhausted.
190              
191             Non-completed pipeline objects are completely re-usable though - so you can
192             (and are expected to) do things like:
193              
194             my $combined_to_stoud = $combined | \*STDOUT;
195            
196             foreach my $file (@files_to_process) {
197            
198             open my $in, '<', $file
199             or die "Couldn't open ${file}: $!";
200            
201             $in | $combined_to_stdout;
202             }
203              
204             =head1 EXPORTED FUNCTIONS
205              
206             =head2 pmap
207              
208             my $mapper = pmap { };
209              
210             A pipeline part built with pmap gets invoked for each line on the pipeline,
211             with the line in both $_ and $_[0].
212              
213             It may, as with perl's map operator, return zero or more elements. If it
214             returns nothing at all, IO::Pipeline will go back to the start of the pipe
215             chain and read another line to restart processing with. If it returns
216             one or more lines, each one is fed in turn into the rest of the pipe chain.
217              
218             Most of the time, you probably just want to modify the line somehow and then
219             return it (note that $_ is a copy of the input line so this is safe):
220              
221             my $fix_teh = pmap { s/teh/the/g; $_; };
222              
223             Note that you still need to actively return $_ for the pipe to continue
224             (again, as with perl's map operator).
225              
226             =head2 pgrep
227              
228             my $filter = pgrep { };
229              
230             A pipeline part built with pgrep gets invoked for each line on the pipeline,
231             with the line in both $_ and $_[0].
232              
233             If it returns a true value, the line is passed on to the next stage of the
234             pipeline. If it returns a false value, the line is thrown away and IO::Pipeline
235             will go back to the start of the pipe chain and read another line to restart
236             processing with.
237              
238             The upshot of this is that any pgrep can be turned trivially into a pmap:
239              
240             my $filter = pgrep { /ALERT/ };
241              
242             is precisely equivalent to:
243              
244             my $filter = pmap { /ALERT/ ? ($_) : () };
245              
246             but the pgrep form is rather clearer.
247              
248             =head2 psink
249              
250             my $output = '';
251            
252             my $sink = psink { $output .= $_ };
253              
254             A pipe sink is an alternative to an output filehandle as the last element
255             of a pipeline. Where in the case of a normal filehandle a line would be
256             printed to the handle, given a sink IO::Pipeline will call the code block
257             provided. So:
258              
259             $pipeline | \*STDOUT;
260              
261             and
262              
263             $pipeline | psink { print STDOUT $_; }
264              
265             will have exactly the same end result.
266              
267             If you're looking for the source version of this, there isn't one built in
268             because L already
269             provides an io_from_getline construct that does that, along with a bunch
270             more things that you may find very useful.
271              
272             =head1 DECONSTRUCTING THE SYNOPSIS
273              
274             Start with an input filehandle:
275              
276             $in
277              
278             Next, we split the line up - so
279              
280             2010-03-21 16:15:30 1NtNoI-000658-6V Completed
281              
282             becomes
283              
284             [ '2010-03-21', '16:15:30', '1NtNoI-000658-6V Completed' ]
285              
286             using a regexp in list context so that all the match values fall out into
287             a new anonymous array reference:
288              
289             | pmap { [ /^(\S+) (\S+) (.*)$/ ] }
290              
291             Now we've separated out the message, we want to throw away anything that isn't
292             either a 'rejected' or 'Completed' line, so we test the last element of the
293             split line for that:
294              
295             | pgrep { $_->[2] =~ /rejected|Completed/ }
296              
297             Now we know which is which, we want to turn
298              
299             [ '2010-03-21', '16:15:30', '1NtNoI-000658-6V Completed' ]
300              
301             into
302              
303             [ '2010-03-21', '16:15:30', 'Completed' ]
304              
305             and similarly for rejected lines. Since we know both lines are one or the
306             other, we can simply test for 'rejected' in the line -
307              
308             $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed'
309              
310             and then we construct a new array reference consisting of the first two
311             elements of the original array
312              
313             @{$_}[0, 1]
314              
315             plus the new value for the third element:
316              
317             | pmap { [ @{$_}[0, 1], $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed' ] }
318              
319             This done, we can now reassemble the line using join (remembering to add a
320             newline since IO::Pipeline doesn't in case you didn't want one)
321              
322             | pmap { join(' ', @$_)."\n" }
323              
324             and then in lieu of sending it somewhere else, since this is just a
325             demonstration code fragment, add a sink that appends things onto the end of
326             a variable so that we can examine the results:
327              
328             | psink { $out .= $_ };
329              
330             =head1 AUTHOR
331              
332             Matt S. Trout (mst)
333              
334             =head2 CONTRIBUTORS
335              
336             None as yet, though I'm sure that'll change as soon as people spot the
337             giant gaping holes that inevitably exist in any software only used by
338             the author so far.
339              
340             =head1 COPYRIGHT
341              
342             Copyright (c) 2010 the IO::Pipeline L and L
343             as listed above.
344              
345             =head1 LICENSE
346              
347             This library is free software and may be distributed under the same terms
348             as perl itself.
349              
350             =head1 SUPPORT
351              
352             Right now, your best routes are probably (a) to come ask questions on
353             #perl on irc.freenode.net or #perl-help on irc.perl.org (I'm on there with
354             nick mst if nobody else around at the time manages to help you first) or
355             (b) to email me directly at the address given in L above. You're
356             also welcome to use rt.cpan.org to report bugs (which you can do without
357             a login by mailing bugs-IO-Pipeline at that domain), but please cc my
358             email address as well on grounds of me being a Bad Person and thereby not
359             always spotting tickets.
360              
361             =head1 SOURCE CODE
362              
363             This code lives in git.shadowcat.co.uk and can be viewed via gitweb using
364              
365             http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=p5sagit/IO-Pipeline.git;a=summary
366              
367             or checked out via git-daemon using
368              
369             git://git.shadowcat.co.uk/p5sagit/IO-Pipeline.git
370              
371             =cut
372              
373             1;