File Coverage

blib/lib/ETL/Pipeline.pm
Criterion Covered Total %
statement 390 413 94.4
branch 166 216 76.8
condition 16 30 53.3
subroutine 49 49 100.0
pod 21 22 95.4
total 642 730 87.9


line stmt bran cond sub pod time code
1             =pod
2              
3             =head1 NAME
4              
5             ETL::Pipeline - Extract-Transform-Load pattern for data file conversions
6              
7             =head1 SYNOPSIS
8              
9             use ETL::Pipeline;
10              
11             # The object oriented interface...
12             ETL::Pipeline->new( {
13             work_in => {search => 'C:\Data', iname => qr/Ficticious/},
14             input => ['Excel', iname => qr/\.xlsx?$/ ],
15             mapping => {Name => 'A', Address => 'B', ID => 'C' },
16             constants => {Type => 1, Information => 'Demographic' },
17             output => ['SQL', table => 'NewData' ],
18             } )->process;
19              
20             # Or using method calls...
21             my $etl = ETL::Pipeline->new;
22             $etl->work_in ( search => 'C:\Data', iname => qr/Ficticious/ );
23             $etl->input ( 'Excel', iname => qr/\.xlsx?$/i );
24             $etl->mapping ( Name => 'A', Address => 'B', ID => 'C' );
25             $etl->constants( Type => 1, Information => 'Demographic' );
26             $etl->output ( 'SQL', table => 'NewData' );
27             $etl->process;
28              
29             =cut
30              
31             package ETL::Pipeline;
32              
33 14     14   6782 use 5.014000;
  14         19303  
34 14     14   153 use warnings;
  14         464  
  14         10540  
35              
36 14     14   166 use Carp;
  14         73  
  14         1010  
37 14     14   5797 use Data::DPath qw/dpath/;
  14         1254872  
  14         89  
38 14     14   7568 use Data::Traverse qw/traverse/;
  14         7272  
  14         639  
39 14     14   6591 use List::AllUtils qw/any first/;
  14         181804  
  14         1006  
40 14     14   6449 use Moose;
  14         4575252  
  14         70  
41 14     14   80644 use MooseX::Types::Path::Class qw/Dir/;
  14         1434838  
  14         83  
42 14     11   19182 use Path::Class::Rule;
  11         134287  
  11         397  
43 11     11   119 use Scalar::Util qw/blessed/;
  11         28  
  11         508  
44 11     11   6031 use String::Util qw/hascontent trim/;
  11         37196  
  11         37042  
45              
46              
47             our $VERSION = '3.00';
48              
49              
50             =head1 DESCRIPTION
51              
52             B<ETL> stands for I<Extract-Transform-Load>. ETL isn't just for Data
53             Warehousing. ETL works on almost any type of data conversion. You read the
54             source, translate the data for your target, and store the result.
55              
56             By dividing a conversion into 3 steps, we isolate the input from the output...
57              
58             =over
59              
60             =item * Centralizes data formatting and validation.
61              
62             =item * Makes new input formats a breeze.
63              
64             =item * Makes new outputs just as easy.
65              
66             =back
67              
68             B<ETL::Pipeline> takes your data files from extract to load. It reads an input
69             source, translates the data, and writes it to an output destination. For
70             example, I use these pipelines for reading an Excel spread sheet (input) and
71             saving the information in an SQL database (output).
72              
73             use ETL::Pipeline;
74             ETL::Pipeline->new( {
75             work_in => {search => 'C:\Data', find => qr/Ficticious/},
76             input => ['Excel', find => qr/\.xlsx?$/],
77             mapping => {Name => 'A', Complaint => 'B', ID => 'C'},
78             constants => {Client => 1, Type => 'Complaint'}
79             output => ['SQL', table => 'NewData']
80             } )->process;
81              
82             Or like this, calling the methods instead of through the constructor...
83              
84             use ETL::Pipeline;
85             my $etl = ETL::Pipeline->new;
86             $etl->work_in ( search => 'C:\Data', find => qr/Ficticious/ );
87             $etl->input ( 'Excel', find => qr/\.xlsx?$/ );
88             $etl->mapping ( Name => 'A', Complaint => 'B', ID => 'C' );
89             $etl->constants( Client => 1, Type => 'Complaint' );
90             $etl->output ( 'SQL', table => 'NewData' );
91             $etl->process;
92              
93             These are equivalent. They do exactly the same thing. You can pick whichever
94             best suits your style.
95              
96             =head2 What is a pipeline?
97              
98             The term I<pipeline> describes a complete ETL process - extract, transform,
99             and load. Or more accurately - input, mapping, output. Raw data enters one end
100             of the pipe (input) and useful information comes out the other (output). An
101             B<ETL::Pipeline> object represents a complete pipeline.
102              
103             =head2 Upgrade Warning
104              
105             B<WARNING:> The API for input sources has changed in version 3.00. Custom input
106             sources written for an earlier version will not work with version 3.00 and
107             later. You will need to re-write your custom input sources.
108              
109             See L<ETL::Pipeline::Input> for more details.
110              
111             =head1 METHODS & ATTRIBUTES
112              
113             =head2 Managing the pipeline
114              
115             =head3 new
116              
117             Create a new ETL pipeline. The constructor accepts a hash reference whose keys
118             are B<ETL::Pipeline> attributes. See the corresponding attribute documentation
119             for details about acceptable values.
120              
121             =over
122              
123             =item aliases
124              
125             =item constants
126              
127             =item data_in
128              
129             =item input
130              
131             =item mapping
132              
133             =item on_record
134              
135             =item output
136              
137             =item session
138              
139             =item work_in
140              
141             =back
142              
143             =cut
144              
145             sub BUILD {
146 69     69 0 161 my $self = shift;
147 69         119 my $arguments = shift;
148              
149             # "_chain" is a special argument to the constructor that implements the
150             # "chain" method. It copies information from an existing object. This allows
151             # pipelines to share settings.
152             #
153             # Always handle "_chain" first. That way "work_in" and "data_in" arguments
154             # can override the defaults.
155 69 100       216 if (defined $arguments->{_chain}) {
156 4         25 my $object = $arguments->{_chain};
157 4 50 33     30 croak '"chain" requires an ETL::Pipeline object' unless
158             defined( blessed( $object ) )
159             && $object->isa( 'ETL::Pipeline' )
160             ;
161 4 100       69 $self->_work_in( $object->_work_in ) if defined $object->_work_in;
162 4 100       80 $self->_data_in( $object->_data_in ) if defined $object->_data_in;
163 4         74 $self->_session( $object->_session );
164             }
165              
166             # The order of these two is important. "work_in" resets "data_in" with a
167             # trigger. "work_in" must be set first so that we don't lose the value
168             # from "data_in".
169 69 100       180 if (defined $arguments->{work_in}) {
170 61         145 my $values = $arguments->{work_in};
171 61 50       288 $self->work_in( ref( $values ) eq '' ? $values : @$values );
172             }
173 69 100       239 if (defined $arguments->{data_in}) {
174 3         138 my $values = $arguments->{data_in};
175 3 50       16 $self->data_in( ref( $values ) eq '' ? $values : @$values );
176             }
177              
178             # The input and output configurations may be single string values or an
179             # array of arguments. It depends on what each source or destination expects.
180 69 100       184 if (defined $arguments->{input}) {
181 61         163 my $values = $arguments->{input};
182 61 100       339 $self->input( ref( $values ) eq '' ? $values : @$values );
183             }
184 69 100       230 if (defined $arguments->{output}) {
185 61         253 my $values = $arguments->{output};
186 61 100       285 $self->output( ref( $values ) eq '' ? $values : @$values );
187             }
188              
189             # Save any alias definition for use in "record".
190 69 50       1656 if (defined $arguments->{aliases}) {
191 1 0       27 if (ref( $arguments->{aliases} ) eq 'ARRAY') {
192 1         8 $self->aliases( @{$arguments->{aliases}} );
  1         2  
193             } else {
194 1         25 $self->aliases( $arguments->{aliases} );
195             }
196             }
197             }
198              
199              
200             =head3 aliases
201              
202             B<aliases> defines alternate names for input fields. This is how column headers
203             work, for example. You can define your own shortcuts using this method or
204             declaring B<aliases> in L</new>. Aliases can make complex field names more
205             readable.
206              
207             B<aliases> accepts a list of hash references. Each hash reference has one or
208             more alias-to-field definitions. The hash key is the alias name. The value is
209             any field name recognized by L</get>.
210              
211             Aliases are resolved in the order they are added. That way, your pipelines know
212             where each value came from, if that's important. Aliases set by the input source
213             always sort before aliases set by the script. Within a hash, all definitions are
214             considered equal and may sort in any order.
215              
216             # Array definitions to force sorting.
217             my $etl = ETL::Pipeline->new( {aliases => [{A => '0'}, {B => '1'}], ...} );
218             $etl->aliases( {C => '2'}, {D => '3'} );
219              
220             # Hash where it can sort either way.
221             my $etl = ETL::Pipeline->new( {aliases => {A => '0', B => '1'}, ...} );
222             $etl->aliases( {C => '2', D => '3'} );
223              
224             B<aliases> returns a sorted list of all aliases for fields in this input source.
225              
226             I recommend using the hash, unless order matters. In that case, use the array
227             form instead.
228              
229             B<Special Note:> Custom input sources call B<aliases> to add their own
230             shortcuts, such as column headers. These aliases are always evaluated I<before>
231             those set by L</new> or calls to this method by the script.
232              
233             =cut
234              
235             sub aliases {
236 81     81 1 165 my $self = shift;
237              
238             # Add any new aliases first.
239 81         1901 my $list = $self->_alias->{$self->_alias_type};
240 81         302 push( @$list, $_ ) foreach (@_);
241              
242             # Update the cache, if it already exists. This should be VERY, VERY rare.
243             # But I wanted to plan for it so that things behave as expected.
244 81 100       2630 if ($self->_alias_cache_built) {
245 2         36 my $cache = $self->_alias_cache;
246 2         29 foreach my $item (@_) {
247 1         7 while (my ($alias, $location) = each %$item) {
248 1 0       1 $cache->{$alias} = [] unless exists $cache->{alias};
249 1         20 push @{$cache->{alias}}, $self->_as_dpath( $location );
  1         8  
250             }
251             }
252             }
253              
254             # Return a flattened list of aliases. Input source defined aliases first.
255             # Then user defined aliases.
256 81         153 my @all;
257 81         197 push( @all, @{$self->_alias->{$_}} ) foreach (qw/input pipeline/);
  161         3652  
258 81         264 return @all;
259             }
260              
261              
262             =head3 chain
263              
264             This method creates a new pipeline using the same L</work_in>, L</data_in>, and
265             L</session> as the current pipeline. It returns a new instance of
266             B<ETL::Pipeline>.
267              
268             B<chain> takes the same arguments as L</new>. It passes those arguments through
269             to the constructor of the new object.
270              
271             See the section on L</Multiple input sources> for examples of chaining.
272              
273             =cut
274              
275             sub chain {
276 4     4 1 40 my ($self, $arguments) = @_;
277              
278             # Create the new object. Use the internal "_chain" argument to do the
279             # actual work of chaining.
280 4 50       15 if (defined $arguments) { $arguments->{_chain} = $self ; }
  1         2  
281 4         33 else { $arguments = {_chain => $self}; }
282              
283 4         74 return ETL::Pipeline->new( $arguments );
284             }
285              
286              
287             =head3 constants
288              
289             B<constants> sets output fields to literal values. L</mapping> accepts input
290             field names as strings. Instead of obtuse Perl tricks for marking literals,
291             B<constants> explicitly handles them.
292              
293             Hash keys are output field names. The L</output> class defines acceptable
294             field names. The hash values are literals.
295              
296             # Get the current mapping...
297             my $transformation = $etl->constants;
298              
299             # Set the output field "Name" to the string "John Doe"...
300             $etl->constants( Name => 'John Doe' );
301              
302             B<Note:> B<constants> does not accept code references, array references, or hash
303             references. It only works with literal values. Use L</mapping> instead for
304             calculated items.
305              
306             With no parameters, B<constants> returns the current hash reference. If you pass
307             in a hash reference, B<constants> replaces the current hash with this new one.
308             If you pass in a list of key value pairs, B<constants> adds them to the current
309             hash.
310              
311             =head3 has_constants
312              
313             Returns a true value if this pipeline has one or more constants defined. A false
314             value means no constants.
315              
316             =cut
317              
318             has '_constants' => (
319             handles => {_add_constants => 'set', has_constants => 'count'},
320             init_arg => 'constants',
321             is => 'rw',
322             isa => 'HashRef[Maybe[Str]]',
323             traits => [qw/Hash/],
324             );
325              
326              
327             sub constants {
328 1     1 1 2 my $self = shift;
329 1         29 my @pairs = @_;
330              
331 1 0 0     6 if (scalar( @pairs ) == 1 && ref( $pairs[0] ) eq 'HASH') {
    0          
332 1         2 return $self->_constants( $pairs[0] );
333             } elsif (scalar @pairs) {
334 1         26 return $self->_add_constants( @pairs );
335             } else {
336 1         7 return $self->_constants;
337             }
338             }
339              
340              
341             =head3 data_in
342              
343             The working directory (L</work_in>) usually contains the raw data files. In
344             some cases, though, the actual data sits in a subdirectory underneath
345             L</work_in>. B<data_in> tells the pipeline where to find the input file.
346              
347             B<data_in> accepts a search pattern - name, glob, or regular expression. It
348             searches L</work_in> for the first matching directory. The search is case
349             insensitive.
350              
351             If you pass an empty string to B<data_in>, the pipeline resets B<data_in> to
352             the L</work_in> directory. This is useful when chaining pipelines. If one
353             changes the data directory, the next in line can change back.
354              
355             =cut
356              
357             has '_data_in' => (
358             coerce => 1,
359             init_arg => undef,
360             is => 'rw',
361             isa => Dir,
362             );
363              
364              
365             sub data_in {
366 88     88 1 2804 my $self = shift;
367              
368 88 100       223 if (scalar @_) {
369 4 50       89 croak 'The working folder was not set' unless defined $self->_work_in;
370              
371 4         8 my $name = shift;
372 4 50       16 if (hascontent( $name )) {
373 4         70 my $next = Path::Class::Rule
374             ->new
375             ->min_depth( 1 )
376             ->iname( $name )
377             ->directory
378             ->iter( $self->_work_in )
379             ;
380 4         874 my $match = $next->();
381 4 50       9208 croak 'No matching directories' unless defined $match;
382 4         121 return $self->_data_in( $match );
383 0         0 } else { return $self->_data_in( $self->_work_in ); }
384 83         2394 } else { return $self->_data_in; }
385             }
386              
387              
388             =head3 input
389              
390             B<input> sets and returns the L<ETL::Pipeline::Input> object. This object reads
391             the data. With no parameters, B<input> returns the current
392             L<ETL::Pipeline::Input> object.
393              
394             my $source = $etl->input();
395              
396             Set the input source by calling B<input> with parameters...
397              
398             $etl->input( 'Excel', find => qr/\.xlsx/i );
399              
400             The first parameter is a class name. B<input> looks for a Perl module matching
401             this name in the C<ETL::Pipeline::Input> namespace. In this example, the actual
402             class name becomes C<ETL::Pipeline::Input::Excel>.
403              
404             The rest of the parameters are passed directly to the C<new> method of that
405             class.
406              
407             B<Technical Note:> Want to use a custom class from B<Local> instead of
408             B<ETL::Pipeline::Input>? Put a B<+> (plus sign) in front of the class name.
409             For example, this command uses the input class B<Local::CustomExtract>.
410              
411             $etl->input( '+Local::CustomExtract' );
412              
413             =cut
414              
415             has '_input' => (
416             does => 'ETL::Pipeline::Input',
417             init_arg => undef,
418             is => 'rw',
419             );
420              
421              
422             sub input {
423 199     200 1 398 my $self = shift;
424              
425 199 100       669 return $self->_input( $self->_object_of_class( 'Input', @_ ) ) if scalar @_;
426 138         3610 return $self->_input;
427             }
428              
429              
430             =head3 mapping
431              
432             B<mapping> ties the input fields with the output fields. Hash keys are output
433             field names. The L</output> class defines acceptable field names. The hash
434             values can be anything accepted by the L</get> method. See L</get> for more
435             information.
436              
437             If L</get> returns an ARRAY reference (aka multiple values), they will be
438             concatenated in the output with a semi-colon between values - B<; >. You can
439             override the seperator by setting the value to an ARRAY reference. The first
440             element is a regular field name for L</get>. The second element is a new
441             seperator string.
442              
443             With no parameters, B<mapping> returns the current hash reference. If you pass
444             in a hash reference, B<mapping> replaces the current hash with this new one. If
445             you pass in a list of key value pairs, B<mapping> adds them to the current hash.
446              
447             # Get the current mapping...
448             my $transformation = $etl->mapping;
449              
450             # Add the output field "Name" with data from input column "A"...
451             $etl->mapping( Name => 'A' );
452              
453             # Change "Name" to get data from "Full Name" or "FullName"...
454             $etl->mapping( Name => qr/Full\s*Name/i );
455              
456             # "Name" gets the lower case of input column "A"...
457             $etl->mapping( Name => sub {
458             my ($etl, $record) = @_;
459             return lc $record{A};
460             } );
461              
462             # Replace the entire mapping so only "Name" is output...
463             $etl->mapping( {Name => 'C'} );
464              
465             Want to save a literal value? Use L</constants> instead.
466              
467             =head4 Complex data structures
468              
469             B<mapping> only sets scalar values. If the matching fields contain sub-records,
470             L</record> throws an error message and sets the output field to C<undef>.
471              
472             =head3 has_mapping
473              
474             Returns a true value if this pipeline has one or more field mappings defined. A
475             false value means no mappings are present.
476              
477             =cut
478              
479             has '_mapping' => (
480             handles => {_add_mapping => 'set', has_mapping => 'count'},
481             init_arg => 'mapping',
482             is => 'rw',
483             isa => 'HashRef',
484             traits => [qw/Hash/],
485             );
486              
487              
488             sub mapping {
489 118     119 1 201 my $self = shift;
490 118         212 my @pairs = @_;
491              
492 118 50 33     445 if (scalar( @pairs ) == 1 && ref( $pairs[0] ) eq 'HASH') {
    50          
493 0         0 return $self->_mapping( $pairs[0] );
494             } elsif (scalar @pairs) {
495 0         0 return $self->_add_mapping( @pairs );
496             } else {
497 118         2715 return $self->_mapping;
498             }
499             }
500              
501              
502             =head3 on_record
503              
504             Executes a customized subroutine on every record before any mapping. The code
505             can modify the record and your changes will feed into the mapping. You can use
506             B<on_record> for filtering, debugging, or just about anything.
507              
508             B<on_record> accepts a code reference. L</record> executes this code for every
509             input record.
510              
511             The code reference receives two parameters - the C<ETL::Pipeline> object and the
512             input record. The record is passed as a hash reference. If B<on_record> returns
513             a false value, L</record> will never send this record to the output destination.
514             It's as if this record never existed.
515              
516             ETL::Pipeline->new( {
517             ...
518             on_record => sub {
519             my ($etl, $record) = @_;
520             foreach my $field (keys %$record) {
521             my $value = $record->{$field};
522             $record->{$field} = ($value eq 'NA' ? '' : $value);
523             }
524             },
525             ...
526             } )->process;
527              
528             # -- OR --
529             $etl->on_record( sub {
530             my ($etl, $record) = @_;
531             foreach my $field (keys %$record) {
532             my $value = $record->{$field};
533             $record->{$field} = ($value eq 'NA' ? '' : $value);
534             }
535             } );
536              
537             B<Note:> L</record> automatically removes leading and trailing whitespace. You
538             do not need B<on_record> for that.
539              
540             =cut
541              
542             has 'on_record' => (
543             is => 'rw',
544             isa => 'Maybe[CodeRef]',
545             );
546              
547              
548             =head3 output
549              
550             B<output> sets and returns the L<ETL::Pipeline::Output> object. This object
551             writes records to their final destination. With no parameters, B<output> returns
552             the current L<ETL::Pipeline::Output> object.
553              
554             Set the output destination by calling B<output> with parameters...
555              
556             $etl->output( 'SQL', table => 'NewData' );
557              
558             The first parameter is a class name. B<output> looks for a Perl module
559             matching this name in the C<ETL::Pipeline::Output> namespace. In this example,
560             the actual class name becomes C<ETL::Pipeline::Output::SQL>.
561              
562             The rest of the parameters are passed directly to the C<new> method of that
563             class.
564              
565             B<Technical Note:> Want to use a custom class from B<Local> instead of
566             B<ETL::Pipeline::Output>? Put a B<+> (plus sign) in front of the class name.
567             For example, this command uses the input class B<Local::CustomLoad>.
568              
569             $etl->output( '+Local::CustomLoad' );
570              
571             =cut
572              
573             has '_output' => (
574             does => 'ETL::Pipeline::Output',
575             init_arg => undef,
576             is => 'rw',
577             );
578              
579              
580             sub output {
581 135     136 1 10774 my $self = shift;
582              
583 135 100       467 return $self->_output( $self->_object_of_class( 'Output', @_ ) ) if scalar @_;
584 74         2036 return $self->_output;
585             }
586              
587              
588             =head3 process
589              
590             B<process> kicks off the entire data conversion process. It takes no
591             parameters. All of the setup is done by the other methods.
592              
593             B<process> returns the B<ETL::Pipeline> object so you can do things like
594             this...
595              
596             ETL::Pipeline->new( {...} )->process->chain( ... )->process;
597              
598             =cut
599              
600             sub process {
601 52     53 1 118 my $self = shift;
602              
603             # Make sure we have all the required information.
604 52         230 my ($success, $error) = $self->is_valid;
605 52 50       138 croak $error unless $success;
606              
607             # Sort aliases from the input source before any that were set in the object.
608 52         1405 $self->_alias_type( 'input' );
609              
610             # Kick off the process. The input source loops over the records. It calls
611             # the "record" method, described below.
612 52         1255 $self->_output->open( $self );
613 52         220 $self->status( 'START' );
614 52         277 $self->input->run( $self );
615 51         1946 $self->_decrement_count; # "record" adds 1 at the end, so this goes one past the last record.
616 51         163 $self->status( 'END' );
617 51         1660 $self->_output->close( $self );
618              
619             # Return the pipeline object so that we can chain calls. Useful shorthand
620             # when running multiple pipelines.
621 51         156 return $self;
622             }
623              
624              
625             =head3 session
626              
627             B<ETL::Pipeline> supports sessions. A session allows input and output objects
628             to share information along a chain. For example, imagine 3 Excel files being
629             loaded into an Access database. All 3 files go into the same Access database.
630             The first pipeline creates the database and saves its path in the session. That
631             pipeline chains with a second pipeline. The second pipeline retrieves the
632             Access filename from the session.
633              
634             The B<session> method provides access to session level variables. As you write
635             your own L<ETL::Pipeline::Output> classes, they can use session variables for
636             sharing information.
637              
638             The first parameter is the variable name. If you pass only the variable name,
639             B<session> returns the value.
640              
641             my $database = $etl->session( 'access_file' );
642             my $identifier = $etl->session( 'session_identifier' );
643              
644             A second parameter is the value.
645              
646             $etl->session( access_file => 'C:\ExcelData.accdb' );
647              
648             You can set multiple variables in one call.
649              
650             $etl->session( access_file => 'C:\ExcelData.accdb', name => 'Abe' );
651              
652             If you pass in a hash referece, it completely replaces the current session with
653             the new values.
654              
655             When retrieving an array or hash reference, B<session> automatically
656             derefernces it if called in a list context. In a scalar context, B<session>
657             returns the reference.
658              
659             # Returns the list of names as a list.
660             foreach my $name ($etl->session( 'name_list' )) { ... }
661              
662             # Returns a list reference instead of a list.
663             my $reference = $etl->session( 'name_list' );
664              
665             =head3 session_has
666              
667             B<session_has> checks for a specific session variable. It returns I<true> if
668             the variable exists and I<false> if it doesn't.
669              
670             B<session_has> only checks existence. It does not tell you if the value is
671             defined.
672              
673             if ($etl->session_has( 'access_file' )) { ... }
674              
675             =cut
676              
677             # Alternate design: Use attributes for session level information.
678             # Result: Discarded
679             #
680             # Instead of keeping session variables in a hash, the class would have an
681             # attribute corresponding to the session data it can keep. Since
682             # ETL::Pipeline::Input and ETL::Pipeline::Output objects have access to the
683             # the pipeline, they can share data through the attributes.
684             #
685             # For any session information, the developer must subclass ETL::Pipeline. The
686             # ETL::Pipeline::Input or ETL::Pipeline::Output classes would be tied to that
687             # specific subclass. And if you needed to combine two sets of session
688             # variables, well that just means another class type. That's very confusing.
689             #
690             # Attributes make development of new input and output classes very difficult.
691             # The hash is simple. It decouples the input/output classes from pipeline.
692             # That keeps customization simpler.
693              
694              
695             has '_session' => (
696             default => sub { {} },
697             handles => {
698             _add_session => 'set',
699             _get_session => 'get',
700             session_has => 'exists',
701             },
702             init_arg => undef,
703             is => 'rw',
704             isa => 'HashRef[Any]',
705             traits => [qw/Hash/],
706             );
707              
708              
709             sub session {
710 15     16 1 13613 my $self = shift;
711              
712 15 100       44 if (scalar( @_ ) > 1) {
    50          
713 6         17 my %parameters = @_;
714 6         25 while (my ($key, $value) = each %parameters) {
715 7         233 $self->_add_session( $key, $value );
716             }
717 6         20 return $_[1];
718             } elsif (scalar( @_ ) == 1) {
719 9         14 my $key = shift;
720 9 50       24 if (ref( $key ) eq 'HASH') {
    100          
721 0         0 return $self->_session( $key );
722             } elsif (wantarray) {
723 1         36 my $result = $self->_get_session( $key );
724 1 50       6 if (ref( $result ) eq 'ARRAY') { return @$result; }
  1 0       3  
725 0         0 elsif (ref( $result ) eq 'HASH' ) { return %$result; }
726 0         0 else { return $result; }
727 8         272 } else { return $self->_get_session( $key ); }
728 0         0 } else { return undef; }
729             }
730              
731              
732             =head3 work_in
733              
734             The working directory sets the default place for finding files. All searches
735             start here and only descend subdirectories. Temporary or output files go into
736             this directory as well.
737              
738             B<work_in> has two forms: C<work_in( 'C:\Data' );> or
739             C<< work_in( root => 'C:\Data', iname => 'Ficticious' ); >>.
740              
741             The first form specifies the exact directory path. In our example, the working
742             directory is F<C:\Data>.
743              
744             The second form searches the file system for a matching directory. Take this
745             example...
746              
747             $etl->work_in( root => 'C:\Data', iname => 'Ficticious' );
748              
749             It scans the F<C:\Data> directory for a subdirectory named F<Fictious>, like
750             this: F<C:\Data\Ficticious>. The search is B<not> recursive. It locates files
751             in the B<root> folder.
752              
753             B<work_in> accepts any of the tests provided by L<Path::Iterator::Rule>. The
754             values of these arguments are passed directly into the test. For boolean tests
755             (e.g. readable, exists, etc.), pass an C<undef> value.
756              
757             B<work_in> automatically applies the C<directory> filter. Do not set it
758             yourself.
759              
760             C<iname> is the most common one that I use. It matches the file name, supports
761             wildcards and regular expressions, and is case insensitive.
762              
763             # Search using a regular expression...
764             $etl->work_in( iname => qr/\.xlsx$/, root => 'C:\Data' );
765              
766             # Search using a file glob...
767             $etl->work_in( iname => '*.xlsx', root => 'C:\Data' );
768              
769             The code throws an error if no directory matches the criteria. Only the first
770             match is used.
771              
772             B<work_in> automatically resets L</data_in>.
773              
774             =cut
775              
776             has '_work_in' => (
777             coerce => 1,
778             init_arg => undef,
779             is => 'rw',
780             isa => Dir,
781             trigger => \&_trigger_work_in,
782             );
783              
784              
785             sub work_in {
786 139     140 1 2920 my $self = shift;
787              
788 139 100       434 if (scalar( @_ ) == 1) {
    100          
789 61         2019 return $self->_work_in( shift );
790             } elsif(scalar( @_ ) > 1) {
791 5         14 my %options = @_;
792              
793 5   100     16 my $root = $options{root} // '.';
794 5         8 delete $options{root};
795              
796 5 50       11 if (scalar %options) {
797 5         36 my $rule = Path::Class::Rule->new->directory;
798 5         174 while (my ($name, $value) = each %options) {
799 5         249 eval "\$rule = \$rule->$name( \$value )";
800 5 50       815 confess $@ unless $@ eq '';
801             }
802 5         16 my $next = $rule->iter( $root );
803 5         707 my $match = $next->();
804 5 50       13895 croak 'No matching directories' unless defined $match;
805 5         149 return $self->_work_in( $match );
806 0         0 } else { return $self->_work_in( $root ); }
807 73         1759 } else { return $self->_work_in; }
808             }
809              
810              
811             sub _trigger_work_in {
812 134     135   232 my $self = shift;
813 134         201 my $new = shift;
814              
815             # Force absolute paths. Changing the value will fire this trigger again.
816             # I only want to change "_data_in" once.
817 134 100 66     546 if (defined( $new ) && $new->is_relative) {
818 66         2499 $self->_work_in( $new->cleanup->absolute );
819             } else {
820 68         3521 $self->_data_in( $new );
821             }
822             }
823              
824              
825             =head2 Used in mapping
826              
827             These methods may be used by code references in the L</mapping> attribute. They
828             will return information about/from the current record.
829              
830             =head3 count
831              
832             This attribute tells you how many records have been read from the input source.
833             This value is incremented before any filtering. So it even counts records that
834             are bypassed by L</on_record>.
835              
836             The first record is always number B<1>.
837              
838             =cut
839              
840             has 'count' => (
841             default => '1',
842             handles => {_decrement_count => 'dec', _increment_count => 'inc'},
843             is => 'ro',
844             isa => 'Int',
845             traits => [qw/Counter/],
846             );
847              
848              
849             =head3 get
850              
851             Retrieve the value from the record for the given field name. This method accepts
852             two parameters - a field name and the current record. It returns the exact value
853             found at the matching node.
854              
855             =head4 Field names
856              
857             The field name can be...
858              
859             =over
860              
861             =item A string containing a hash key
862              
863             =item An array index (all digits)
864              
865             =item A string containing a L<Data::DPath> path (starts with B</>)
866              
867             =item A regular expression reference
868              
869             =item A code reference
870              
871             =back
872              
873             Hash keys and regular expressions match both field names and aliases. These are
874             the only types that match aliases. Hash keys cannot be all digits and cannot
875             begin with the B</> character. Otherwise B<get> mis-identifies them.
876              
877             B<get> interprets strings of all digits as array indexes (numbers). Excel files,
878             for example, return an array instead of a hash. And this is an easy way to
879             reference columns in order.
880              
881             B<get> treats a string beginning with a B</> (slash) as a L<Data::DPath> path.
882             This lets you very specifically traverse a complex data sturcture, such as those
883             from XML or JSON.
884              
885             For a regular expression, B<get> matches hash keys at the top level of the data
886             structure plus aliases.
887              
888             And with a a code reference, B<get> executes the subroutine. The return value
889             becomes the field value. The code reference is called in a scalar context. If
890             you need to return multiple values, then return an ARRAY or HASH reference.
891              
892             A code reference receives two parameters - the B<ETL::Pipeline> object and the
893             current record.
894              
895             =head4 Current record
896              
897             The current record is optional. B<get> will use L</this> if you do not pass in a
898             record. By accepting a record, you can use B<get> on sub-records. So by default,
899             B<get> returns a value from the top record. Use the second parameter to retrieve
900             values from a sub-record.
901              
902             B<get> only applies aliases when using L</this>. Aliases do not apply to
903             sub-records.
904              
905             =head4 Return value
906              
907             B<get> always returns a scalar value, but not always a string. The return value
908             might be a string, ARRAY reference, or HASH reference.
909              
910             B<get> does not flatten out the nodes that it finds. It merely returns a
911             reference to whatever is in the data structure at the named point. The calling
912             code must account for the possibility of finding an array or hash or string.
913              
914             =cut
915              
916             sub get {
917 460     461 1 856 my ($self, $field, $record) = @_;
918              
919             # Because the reference may be stored, I want to force a new copy every
920             # time. Otherwise scripts may get invalid values from previous records.
921 460         667 my $found = [];
922              
923             # Use the current record from the attribute unless the programmer explicilty
924             # sent in a record. By sending in a record, "get" works on sub-records. But
925             # the default behaviour is what you would expect.
926 460         588 my $full = 0;
927 460 50       839 unless (defined $record) {
928 460         11033 $record = $self->this;
929 460         636 $full = 1;
930             }
931              
932             # Execute code reference. This is all there is to do. We send back whatever
933             # the code returns.
934 460 100       817 if (ref( $field ) eq 'CODE') {
935 100         213 @$found = $field->( $self, $record );
936             }
937              
938             # Anything else we match - either a field name or an alias. The sequence is
939             # the same for both.
940             else {
941             # Match field names first.
942 360         491 my $check_alias = 0;
943 360         792 $field = $self->_as_dpath( $field, \$check_alias );
944 360         966 @$found = dpath( $field )->match( $record );
945              
946 360 100 66     102238 if ($check_alias && $full) {
947             # Build the cache first time through. Re-use it later to save time.
948 89 100       3450 unless ($self->_alias_cache_built) {
949 41         1029 my $cache = $self->_alias_cache;
950 41         115 foreach my $item ($self->aliases) {
951 46         155 while (my ($alias, $location) = each %$item) {
952 70 100       166 $cache->{$alias} = [] unless exists $cache->{$alias};
953 70         93 push @{$cache->{$alias}}, $self->_as_dpath( $location );
  70         150  
954             }
955             }
956             }
957              
958             # Search the actual data in all of the fields from matching aliases.
959 89         267 my @search = dpath( $field )->match( $self->_alias_cache );
960 89         18345 foreach my $list (@search) {
961 51         102 foreach my $location (@$list) {
962 55         136 my @values = dpath( $location )->match( $record );
963 55         13000 push @$found, @values;
964             }
965             }
966             }
967             }
968              
969             # Send back the final value.
970 460 100       2004 return (scalar( @$found ) <= 1 ? $found->[0] : $found);
971             }
972              
973              
974             # Format the match string for Data::DPath. I allow scripts to use shortcut
975             # formatting so they are easier to read. This method translates those into a
976             # correct Data::DPath path.
977             sub _as_dpath {
978 430     431   748 my ($self, $field, $alias) = @_;
979              
980 430 100       1567 if (ref( $field ) eq 'Regexp') {
    100          
    100          
981 4 50       15 $$alias = 1 if ref( $alias ) eq 'SCALAR';
982 4         16 return "/*[key =~ /$field/]";
983             } elsif ($field =~ m/^\d+$/) {
984 239 100       508 $$alias = 0 if ref( $alias ) eq 'SCALAR';
985 239         782 return "/*[$field]";
986             } elsif ($field !~ m|^/|) {
987 85 50       250 $$alias = 1 if ref( $alias ) eq 'SCALAR';
988 85         305 return "/$field";
989             } else {
990 102 50       240 $$alias = 0 if ref( $alias ) eq 'SCALAR';
991 102         218 return $field;
992             }
993             }
994              
995              
996             # Alternate designs...
997             #
998             # I considered building a temporary hash keyed by the alias names. Then I could
999             # apply a full Data::DPath to retrieve aliased fields. But a path like "/*[0]"
1000             # would always match both the main record and the aliases. I would always be
1001             # returning multiple values when the user clearly expected one. It makes aliases
1002             # pretty much useless.
1003              
1004              
1005             =head3 this
1006              
1007             The current record. The L</record> method sets B<this> before it does anything
1008             else. L</get> will use B<this> if you don't pass in a record. It makes a
1009             convenient shortcut so you don't have to pass the record into every call.
1010              
1011             B<this> can be any valid Perl data. Usually a hash reference or array reference.
1012             The input source controls the type.
1013              
1014             =cut
1015              
1016             has 'this' => (
1017             is => 'ro',
1018             isa => 'Maybe[Any]',
1019             writer => '_set_this',
1020             );
1021              
1022              
1023             =head2 Used by input sources
1024              
1025             =head3 aliases (see above)
1026              
1027             Your input source can use the L</aliases> method documented above to set
1028             column headers as field names. Excel files, for example, would call L</aliases>
1029             to assign letters to column numbers, like a real spreadsheet.
1030              
1031             =head3 record
1032              
1033             The input source calls this method for each data record. This is where
1034             L<ETL::Pipeline> applies the mapping, constants, and sends the results on to the
1035             L<ETL::Pipeline> applies the mapping, constants, and sends the results on to the
1036             output destination.
1037              
1038             B<record> takes one parameter - he current record. The record can be any Perl
1039             data structure - hash, array, or scalar. B<record> uses L<Data::DPath> to
1040             traverse the structure.
1041              
1042             B<record> calls L</get> on each field in L</mapping>. B</get> traverses the
1043             data structure retrieving the correct values. B<record> concatenates multiple
1044             matches into a single, scalar value for the output.
1045              
1046             =cut
1047              
1048             sub record {
1049 119     120 1 7083 my ($self, $record) = @_;
1050              
1051             # Save the current record so that other methods and helper functions can
1052             # access it without the programmer passing it around.
1053 119         3438 $self->_set_this( $record );
1054              
1055             # Remove leading and trailing whitespace from all fields. We always want to
1056             # do this. Otherwise we end up with weird looking text. I do this first so
1057             # that all the customized code sees is the filtered data.
1058 119 100   10711   753 traverse { trim( m/HASH/ ? $b : $a ) } $record;
  10710         269974  
1059              
1060             # Run the custom record filter, if there is one. If the filter returns
1061             # "false", then we bypass this entire record.
1062 119         4596 my $code = $self->on_record;
1063 119         186 my $continue = 1;
1064              
1065 119 100       268 $continue = $code->( $self, $record ) if defined $code;
1066 119 100       261 unless ($continue) {
1067 1         27 $self->_increment_count; # Record processed.
1068 1         2 return;
1069             }
1070              
1071             # Insert constants into the output. Do this before the mapping. The mapping
1072             # will always override constants. I want data from the input.
1073             #
1074             # I had used a regular hash. Perl kept re-using the same memory location.
1075             # The records were overwriting each other. Switched to a hash reference so I
1076             # can force Perl to allocate new memory for every record.
1077 118         210 my $save = {};
1078 118 100       3644 if ($self->has_constants) {
1079 11         278 my $constants = $self->_constants;
1080 11         61 %$save = %$constants;
1081             }
1082              
1083             # This is the transform step. It converts the input record into an output
1084             # record.
1085 118         332 my $mapping = $self->mapping;
1086 118         433 while (my ($to, $from) = each %$mapping) {
1087 278         422 my $seperator = '; ';
1088 278 50       568 if (ref( $from ) eq 'ARRAY') {
1089 0         0 $seperator = $from->[1];
1090 0         0 $from = $from->[0]; # Do this LAST!
1091             }
1092              
1093 278         573 my $values = $self->get( $from );
1094 278 100       614 if (ref( $values ) eq '' ) { $save->{$to} = $values; }
  261 50       1239  
1095             elsif (ref( $values ) eq 'ARRAY') {
1096 17 50   22   103 my $invalid = first { defined( $_ ) && ref( $_ ) ne '' } @$values;
  21         95  
1097 17 100       75 if (defined $invalid) {
1098 13         29 my $type = ref( $invalid );
1099 13         63 $self->status( 'ERROR', "Data structure of type $type found by mapping '$from' to '$to'" );
1100 13         114 $save->{$to} = undef;
1101             } else {
1102 4         7 my @usable = grep { hascontent( $_ ) } @$values;
  8         53  
1103 4 50       30 if(scalar @usable) { $save->{$to} = join( $seperator, @usable ); }
  4         30  
1104 0         0 else { $save->{$to} = undef; }
1105             }
1106 0         0 } else { $save->{$to} = undef; }
1107             }
1108              
1109             # We're done with this record. Finish up.
1110 118         3200 $self->_output->write( $self, $save );
1111 118         379 $self->status( 'STATUS' );
1112              
1113             # Increase the record count. Do this last so that any status messages from
1114             # the input source reflect the correct record number.
1115 118         3486 $self->_increment_count;
1116             }
1117              
1118              
1119             =head3 status
1120              
1121             This method displays a status message. B<ETL::Pipeline> calls this method to
1122             report on the progress of pipeline. It takes one or two parameters - the message
1123             type (required) and the message itself (optional).
1124              
1125             The type can be anything. These are the ones that B<ETL::Pipeline> uses...
1126              
1127             =over
1128              
1129             =item DEBUG
1130              
1131             Messages used for debugging problems. You should only use these temporarily to
1132             look for specific issues. Otherwise they clog up the display for the end user.
1133              
1134             =item END
1135              
1136             The pipeline has finished. The input source is closed. The output destination
1137             is still open. It will be closed immediately after. There is no message text.
1138              
1139             =item ERROR
1140              
1141             Report an error message to the user. These are not necessarily fatal errors.
1142              
1143             =item INFO
1144              
1145             An informational message to the user.
1146              
1147             =item START
1148              
1149             The pipeline is just starting. The output destination is open. But the input
1150             source is not. There is no message text.
1151              
1152             =item STATUS
1153              
1154             Progress update. This is sent every after every input record.
1155              
1156             =back
1157              
1158             See L</Custom logging> for information about adding your own log method.
1159              
1160             =cut
1161              
1162             sub status {
1163 298     298 1 654 my ($self, $type, $message) = @_;
1164 298         595 $type = uc( $type );
1165              
1166 298 100       906 if ($type eq 'START') {
    100          
    100          
1167 52         80 my $name;
1168 52         11735 say 'Processing...';
1169             } elsif ($type eq 'END') {
1170 51         81 my $name;
1171 51         2020 say 'Finished!';
1172             } elsif ($type eq 'STATUS') {
1173 118         2846 my $count = $self->count;
1174 118 50       401 say "Processed record #$count..." unless $count % 50;
1175             } else {
1176 77         1959 my $count = $self->count;
1177 77         186 my $source = $self->input->source;
1178              
1179 77 50       262 if (hascontent( $source )) {
1180 77         4151 say "$type [record #$count at $source] $message";
1181             } else {
1182 0         0 say "$type [record #$count] $message";
1183             }
1184             }
1185             }
1186              
1187              
1188             =head2 Utility Functions
1189              
1190             These methods can be used inside L</mapping> code references. Unless otherwise
1191             noted, these all work on L<the current record|/this>.
1192              
1193             my $etl = ETL::Pipeline->new( {
1194             ...
1195             mapping => {A => sub { shift->function( ... ) }},
1196             ...
1197             } );
1198              
1199             =head3 coalesce
1200              
1201             Emulates the SQL Server C<COALESCE> command. It takes a list of field names for
1202             L</get> and returns the value of the first non-blank field.
1203              
1204             # First non-blank field
1205             $etl->coalesce( 'Patient', 'Complainant', 'From' );
1206              
1207             # Actual value if no non-blank fields
1208             $etl->coalesce( 'Date', \$today );
1209              
1210             In the first example, B<coalesce> looks at the B<Patient> field first. If it's
1211             blank, then B<coalesce> looks at the B<Complainant> field. Same thing - if it's
1212             blank, B<coalesce> returns the B<From> field.
1213              
1214             I<Blank> means C<undef>, empty string, or all whitespace. This is different
1215             than the SQL version.
1216              
1217             The second examples shows an actual value passed as a scalar reference. Because
1218             it's a reference, B<coalesce> recognizes that it is not a field name for
1219             L</get>. B<coalesce> uses the value in C<$today> if the B<Date> field is blank.
1220              
1221             B<coalesce> returns an empty string if all of the fields are blank.
1222              
1223             =cut
1224              
1225             sub coalesce {
1226 8     8 1 24 my $self = shift;
1227              
1228 8         69 my $result = '';
1229 8         18 foreach my $field (@_) {
1230 14 100       60 my $value = (ref( $field ) eq 'SCALAR') ? $$field : $self->get( $field );
1231 14 100       28 if (hascontent( $value )) {
1232 8         54 $result = $value;
1233 8         12 last;
1234             }
1235             }
1236 8         17 return $result;
1237             }
1238              
1239              
1240             =head3 foreach
1241              
1242             Executes a CODE reference against repeating sub-records. XML files, for example,
1243             have repeating nodes. B<foreach> allows you to format multiple fields from the
1244             same record. It looks like this...
1245              
1246             # Capture the resulting strings.
1247             my @results = $etl->foreach( sub { ... }, '/File/People' );
1248              
1249             # Combine the resulting strings.
1250             join( '; ', $etl->foreach( sub { ... }, '/File/People' ) );
1251              
1252             B<foreach> calls L</get> to retrieve a list of sub-records. It replaces L</this>
1253             with each sub-record in turn and executes the code reference. You can use any of
1254             the standard unitlity functions inside the code reference. They will operate
1255             only on the current sub-record.
1256              
1257             B<foreach> returns a single string per sub-record. Blank strings are discarded.
1258             I<Blank> means C<undef>, empty strings, or all whitespace. You can filter
1259             sub-records by returning C<undef> from the code reference.
1260              
1261             For example, you might do something like this to format names from XML...
1262              
1263             # Format names "last, first" and put a semi-colon between multiple names.
1264             $etl->format( '; ', $etl->foreach(
1265             sub { $etl->format( ', ', '/Last', '/First' ) },
1266             '/File/People'
1267             ) );
1268              
1269             # Same thing, but using parameters.
1270             $etl->format( '; ', $etl->foreach(
1271             sub {
1272             my ($object, $record) = @_;
1273             $object->format( ', ', '/Last', '/First' )
1274             },
1275             '/File/People'
1276             ) );
1277              
1278             B<foreach> passed two parameters to the code reference...
1279              
1280             =over
1281              
1282             =item The current B<ETL::Pipeline> object.
1283              
1284             =item The current sub-record. This will be the same value as L</this>.
1285              
1286             =back
1287              
1288             The code reference should return a string. If it returns an ARRAY reference,
1289             B<foreach> flattens it, discarding any blank elements. So if you have to return
1290             multiple values, B<foreach> tries to do something intelligent.
1291              
1292             B<foreach> sets L</this> before executing the CODE reference. The code can call
1293             any of the other utility functions with field names relative to the sub-record.
1294             I<Please note, the code cannot access fields outside of the sub-record>.
1295             Instead, cache these in a local variable before called B<foreach>.
1296              
1297             my $x = $etl->get( '/File/MainPerson' );
1298             join( '; ', $etl->foreach( sub {
1299             my $y = $etl->format( ', ', '/Last', '/First' );
1300             "$y is with $x";
1301             }, '/File/People' );
1302              
1303             =head4 Calling foreach
1304              
1305             B<foreach> accepts the code reference as the first parameter. All remaining
1306             parameters are field names. B<foreach> passes them through L</get> one at a
1307             time. Each field should resolve to a repeating node.
1308              
1309             B<foreach> returns a list. The list may be empty or have one element. But it is
1310             always a list. You can use Perl functions such as C<join> to convert the list
1311             into a single value.
1312              
1313             =cut
1314              
1315             sub foreach {
1316 29     29 1 151 my $self = shift;
1317 29         40 my $code = shift;
1318              
1319             # Cache the current record. I need to restore this later so other function
1320             # calls work normally.
1321 29         586 my $this = $self->this;
1322              
1323             # Retrieve the repeating sub-records.
1324 29         43 my $all = [];
1325 29         47 foreach my $item (@_) {
1326 29         51 my $current = $self->get( $item );
1327 29 50       60 if (ref( $current ) eq 'ARRAY') { push @$all, @$current; }
  0         0  
1328 29         84 else { push @$all, $current; }
1329             }
1330              
1331             # Execute the code reference against each sub-record.
1332 29         33 my @results;
1333 29         41 foreach my $record (@$all) {
1334 29         820 $self->_set_this( $record );
1335 29         40 local $_ = $record;
1336 29         63 my @values = $code->( $self, $_ );
1337              
1338 29 50 33     108 if (scalar( @values ) == 1 && ref( $values[0] ) eq 'ARRAY') {
1339 0         0 push @results, @{$values[0]};
  0         0  
1340 29         62 } else { push @results, @values; }
1341             }
1342              
1343             # Restore the current record and return all of the results.
1344 29         789 $self->_set_this( $this );
1345 29 50       36 return grep { ref( $_ ) eq '' && hascontent( $_ ) } @results;
  29         85  
1346             }
1347              
1348              
1349             =head3 format
1350              
1351             Builds a string from a list of fields, discarding blank fields. That's the main
1352             purpose of the function - don't use entirely blank strings. This prevents things
1353             like orphanded commas from showing up in your data.
1354              
1355             B<format> can both concateneate (C<join>) fields or format them (C<sprintf>).
1356             A SCALAR reference signifies a format. A regular string indicates concatenation.
1357              
1358             # Concatenate fields (aka join)
1359             $etl->format( "\n\n", 'D', 'E', 'F' );
1360              
1361             # Format fields (aka sprintf)
1362             $etl->format( \'%s, %s (%s)', 'D', 'E', 'F' );
1363              
1364             You can nest constructs with an ARRAY reference. The seperator or format string
1365             is the first element. The remaining elements are more fields (or other nested
1366             ARRAY references). Basically, B<format> recursively calls itself passing the
1367             array as parameters.
1368              
1369             # Blank lines between. Third line is two fields seperated by a space.
1370             $etl->format( "\n\n", 'D', 'E', [' ', 'F', 'G'] );
1371              
1372             # Blank lines between. Third line is formatted.
1373             $etl->format( "\n\n", 'D', 'E', [\'-- from %s %s', 'F', 'G'] );
1374              
1375             I<Blank> means C<undef>, empty string, or all whitespace. B<format> returns an
1376             empty string if all of fields are blank.
1377              
1378             =head4 Format until
1379              
1380             B<format> optionally accepts a CODE reference to stop processing early.
1381             B<format> passes each value into the code reference. If the code returns
1382             B<true>, then B<format> stops processing fields and returns. The code reference
1383             comes before the seperator/format.
1384              
1385             # Concantenate fields until one of them is the word "END".
1386             $etl->format( sub { $_ eq 'END' }, "\n\n", '/*[idx > 8]' );
1387              
1388             B<format> sets C<$_> to the field value. It also passes the value as the first
1389             and only parameter. Your code can use either C<$_> or C<shift> to access the
1390             value.
1391              
1392             You can include code references inside an ARRAY reference too. The code only
1393             stops processing inside that substring. It continues processing the outer set of
1394             fields after the ARRAY.
1395              
1396             # The last line concatenates fields until one of them is the word "END".
1397             $etl->format( "\n\n", 'A', 'B', [sub { $_ eq 'END' }, ' ', '/*[idx > 8]'] );
1398              
1399             # Do the conditional concatenate in the middle. Results in 3 lines.
1400             $etl->format( "\n\n", 'A', [sub { $_ eq 'END' }, ' ', '/*[idx > 8]'], 'B' );
1401              
1402             What happens if you have a CODE reference and an ARRAY reference, like this?
1403              
1404             $etl->format( sub { $_ eq 'END' }, "\n\n", 'A', [' ', 'B', 'C'], 'D' );
1405              
1406             B<format> retrieves the ARRAY reference as a single string. It then sends that
1407             entire string through the CODE reference. If the code returns B<true>,
1408             processing stops. In other words, B<format> treats the results of an ARRAY
1409             reference just like any other field.
1410              
1411             =cut
1412              
1413             sub format {
1414 86     86 1 142 my $self = shift;
1415 86         90 my $conditional = shift;
1416 86         88 my $seperator;
1417              
1418             # Process the fixed parameters.
1419 86 100       114 if (ref( $conditional ) eq 'CODE') {
1420 4         5 $seperator = shift;
1421             } else {
1422 82         92 $seperator = $conditional;
1423 82         88 $conditional = undef ;
1424             }
1425              
1426             # Retrieve the fields.
1427 86         86 my @results;
1428 86         88 my $stop = 0;
1429              
1430 86         111 foreach my $name (@_) {
1431             # Retrieve the value for this field.
1432 146         151 my $values;
1433 146 100       197 if (ref( $name ) eq 'ARRAY') {
1434 56         104 $values = $self->format( @$name );
1435             } else {
1436 90         134 $values = $self->get( $name );
1437             }
1438              
1439             # Check the results.
1440 146 50       550 $values = [$values] unless ref( $values ) eq 'ARRAY';
1441 146 100       213 if (defined $conditional) {
1442 12         17 foreach my $item (@$values) {
1443 12         16 local $_ = $item;
1444 12 100       22 if ($conditional->( $_ )) {
1445 2         6 $stop = 1;
1446 2         4 last;
1447 10         37 } else { push @results, $item; }
1448             }
1449 134         163 } else { push @results, @$values; }
1450              
1451             # Terminate the loop early.
1452 146 100       278 last if $stop;
1453             }
1454              
1455             # Return the formatted results.
1456 86 100       123 if (ref( $seperator ) eq 'SCALAR') {
1457 22 100   14   117 if (any { hascontent( $_ ) } @results) {
  14         34  
1458 11     11   149 no warnings 'redundant';
  11         32  
  11         12428  
1459 14         174 return sprintf( $$seperator, @results );
1460 8         19 } else { return ''; }
1461 64         82 } else { return join( $seperator, grep { hascontent( $_ ) } @results ); }
  120         469  
1462             }
1463              
1464              
1465             =head3 from
1466              
1467             Return data from a hash, like the one from L<ETL::Pipeline::Output::Memory>. The
1468             first parameter is the hash reference. The remaining parameters are field names
1469             whose values become the hash keys. It's a convenient shorthand for accessing
1470             a hash, with all of the error checking built in.
1471              
1472             $etl->from( $etl->output->hash, qr/myID/i, qr/Site/i );
1473              
1474             To pass a string literal, use a scalar reference.
1475              
1476             $etl->from( \%hash, qr/myID/i, \'Date' );
1477              
1478             This is equivalent to...
1479              
1480             $hash{$etl->get( qr/myID/i )}->{'Date'}
1481              
1482             B<from> returns C<undef> is any one key does not exist.
1483              
1484             B<from> automatically dereferences arrays. So if you store multiple values, the
1485             function returns them as a list instead of the list reference. Scalar values and
1486             hash references are returned as-is.
1487              
1488             =cut
1489              
1490             sub from {
1491 6     6 1 22 my $self = shift;
1492 6         7 my $value = shift;
1493              
1494 6         11 foreach my $field (@_) {
1495 8 100       29 if (ref( $value ) ne 'HASH' ) { return undef ; }
  1 50       3  
    100          
1496 0         0 elsif (!defined( $field ) ) { return undef ; }
1497 1         3 elsif (ref( $field ) eq 'SCALAR' ) { $value = $value->{$$field}; }
1498             else {
1499 6         13 my $key = $self->get( $field );
1500 6 50       16 if (hascontent( $key )) { $value = $value->{$key}; }
  6         50  
1501 0         0 else { return undef ; }
1502             }
1503             }
1504 5 100       19 return (ref( $value ) eq 'ARRAY' ? @$value : $value);
1505             }
1506              
1507              
1508             =head3 name
1509              
1510             Format fields as a person's name. Names are common data elements. This function
1511             provides a common format. Yet is flexible enough to handle customization.
1512              
1513             # Simple name formatted as "last, first".
1514             $etl->name( 'Last', 'First' );
1515              
1516             # Simple name formatted "first last". The format is the first element.
1517             $etl->name( \'%s %s', 'First', 'Last' );
1518              
1519             # Add a role or description in parenthesis, if it's there.
1520             $etl->name( 'Last', 'First', ['Role'] );
1521              
1522             # Add two fields with a custom format if at least one exists.
1523             $etl->name( 'Last', 'First', [\'(%s; %s)', 'Role', 'Type'] );
1524              
1525             # Same thing, but only adds the semi-colon if both values are there.
1526             $etl->name( 'Last', 'First', [['; ', 'Role', 'Type']] );
1527              
1528             # Long hand way of writing the above.
1529             $etl->name( 'Last', 'First', [\'(%s)', ['; ', 'Role', 'Type']] );
1530              
1531             If B<name> doesn't do what you want, try L</build>. L</build> is more flexible.
1532             As a matter of fact, B<name> calls L</build> internally.
1533              
1534             =cut
1535              
1536             sub name {
1537 16     16 1 52 my $self = shift;
1538             # Initialize name format.
1539 16 100       30 my $name_format = ref( $_[0] ) eq 'SCALAR' ? shift : ', ';
1540 16         19 my @name_fields;
1541              
1542 16         17 my $role_format = \'(%s)';
1543 16         16 my @role_fields;
1544              
1545             # Process name and role fields. Anything after that is just extra text
1546             # appended to the result.
1547 16         33 for (my $item = shift; defined $item; $item = shift) {
1548 38 100       52 if (ref( $item ) eq 'ARRAY') {
1549 8 100       13 $role_format = shift( @$item ) if ref( $item->[0] ) eq 'SCALAR';
1550 8         16 @role_fields = @$item;
1551 8         17 last;
1552 30         51 } else { push @name_fields, $item; }
1553             }
1554 16         20 my $last_name = shift @name_fields;
1555              
1556             # Build the string using the "build" method. Elements are concatenated with
1557             # a single space between them. This properly leaves out any blank elements.
1558 16         50 return $self->format( ' ',
1559             [$name_format, $last_name, [' ', @name_fields]],
1560             [$role_format, @role_fields],
1561             @_
1562             );
1563             }
1564              
1565              
1566             =head3 piece
1567              
1568             Split a string and extract one or more of the individual pieces. This can come
1569             in handy with file names, for example. A file split on the period has two pieces
1570             - the name and the extension, piece 1 and piece 2 respectively. Here are some
1571             examples...
1572              
1573             # File name: Example.JPG
1574             # Returns: Example
1575             $etl->piece( 'Filename', qr|\.|, 1 );
1576              
1577             # Returns: JPG
1578             $etl->piece( 'Filename', qr|\.|, 2 );
1579              
1580             B<piece> takes a minimum of 3 parameters...
1581              
1582             =over
1583              
1584             =item 1. Any field name valid for L</get>
1585              
1586             =item 2. Regular expression for splitting the field
1587              
1588             =item 3. Piece number to extract (the first piece is B<1>, not B<0>)
1589              
1590             =back
1591              
1592             B<piece> accepts any field name valid with L</get>. Multiple values are
1593             concatenated with a single space. You can specify a different seperator using
1594             the same syntax as L</mapping> - an array reference. In that array, the first
1595             element is the field name and the second is the seperator string.
1596              
1597             The second parameter for B<piece> is a regular expression. B<piece> passes this
1598             to C<split> and breaks apart the field value.
1599              
1600             The third parameter returns one or more pieces from the split string. In the
1601             simplest form, this is a single number. And B<piece> returns that piece from the
1602             split string. Note that pieces start at number 1, not 0 like array indexes.
1603              
1604             A negative piece number starts from the end of the string. For example, B<-2>
1605             returns the second to last piece. You can also include a length - number of
1606             pieces to return starting at the given position. The default length is B<1>.
1607              
1608             # Filename: abc_def_ghi_jkl_mno_pqr
1609             # Returns: abc def
1610             $etl->piece( 'Filename', qr/_/, '1,2' );
1611              
1612             # Returns: ghi jkl mno
1613             $etl->piece( 'Filename', qr/_/, '3,3' );
1614              
1615             # Returns: mno pqr
1616             $etl->piece( 'Filename', qr/_/, '-2,2' );
1617              
1618             Notice that the multiple pieces are re-joined using a space. You can specify the
1619             seperator string after the length. Do not put spaces after the commas. B<piece>
1620             will mistakenly use it as part of the seperator.
1621              
1622             # Filename: abc_def_ghi_jkl_mno_pqr
1623             # Returns: abc+def
1624             $etl->piece( 'Filename', qr/_/, '1,2,+' );
1625              
1626             # Returns: ghi,jkl,mno
1627             $etl->piece( 'Filename', qr/_/, '3,3,,' );
1628              
1629             # Returns: ghi -jkl -mno
1630             $etl->piece( 'Filename', qr/_/, '3,3, -' );
1631              
1632             A blank length returns all pieces from the start position to the end, just like
1633             the Perl C<splice> function.
1634              
1635             # Filename: abc_def_ghi_jkl_mno_pqr
1636             # Returns: ghi jkl mno pqr
1637             $etl->piece( 'Filename', qr/_/, '3,' );
1638              
1639             # Returns: ghi+jkl+mno+pqr
1640             $etl->piece( 'Filename', qr/_/, '3,,+' );
1641              
1642             =head4 Recursive pieces
1643              
1644             Imagine a name like I<Public, John Q., MD>. How would you parse out the middle
1645             initial by hand? First, you piece the string by comma. Next you split the
1646             second piece of that by a space. B<piece> lets you do the same thing.
1647              
1648             # Name: Public, John Q., MD
1649             # Returns: Q.
1650             $etl->piece( 'Name', qr/,/, 2, qr/ /, 2 );
1651              
1652             # Returns: John
1653             $etl->piece( 'Name', qr/,/, 2, qr/ /, 1 );
1654              
1655             B<piece> will take the results from the first split and use it as the input to
1656             the second split. It will continue to do this for as many pairs of expressions
1657             and piece numbers as you send.
1658              
1659             =cut
1660              
1661             sub piece {
1662 16     16 1 75 my $self = shift;
1663 16         18 my $field = shift;
1664              
1665             # Retrieve the initial value from the field.
1666 16         16 my $seperator = ' ';
1667 16 50       25 if (ref( $field ) eq 'ARRAY') {
1668 0   0     0 $seperator = $field->[1] // ' ';
1669 0         0 $field = $field->[0];
1670             }
1671 16         30 my $value = $self->get( $field );
1672 16 50       32 $value = trim( join( $seperator, @$value ) ) if ref( $value ) eq 'ARRAY';
1673              
1674             # Recursively split the string.
1675 16         31 while (scalar @_) {
1676 18         40 my $split = shift;
1677 18         40 my @location = split /,/, shift, 3;
1678              
1679 18         59 my @pieces = split( $split, $value );
1680 18 50       45 if (scalar( @location ) == 0) {
    100          
    100          
1681 0         0 $value = $pieces[0];
1682             } elsif (scalar( @location ) == 1) {
1683 12 100       27 my $index = $location[0] > 0 ? $location[0] - 1 : $location[0];
1684 12         21 $value = $pieces[$index];
1685             } elsif (scalar( @location ) == 2) {
1686 2         3 my @parts;
1687 2 50       7 if (hascontent( $location[1] )) {
1688 2         19 @parts = splice @pieces, $location[0] - 1, $location[1];
1689             } else {
1690 0         0 @parts = splice @pieces, $location[0] - 1;
1691             }
1692 2         6 $value = join( ' ', @parts );
1693             } else {
1694 4         6 my @parts;
1695 4 100       11 if (hascontent( $location[1] )) {
1696 2         20 @parts = splice @pieces, $location[0] - 1, $location[1];
1697             } else {
1698 2         18 @parts = splice @pieces, $location[0] - 1;
1699             }
1700 4         10 $value = join( $location[2], @parts );
1701             }
1702 18         37 $value = trim( $value );
1703             }
1704              
1705             # Return the value extracted from the last split.
1706 16   100     184 return $value // '';
1707             }
1708              
1709              
1710             =head3 replace
1711              
1712             Substitute one string for another. This function uses the C<s///> operator and
1713             returns the modified string. B<replace> accepts a field name for L</get>. A
1714             little more convenient that calling L</get> and applying C<s///> yourself.
1715              
1716             B<replace> takes three parameters...
1717              
1718             =over
1719              
1720             =item The field to change
1721              
1722             =item The regular expression to match against
1723              
1724             =item The string to replace the match with
1725              
1726             =back
1727              
1728             All instances of the matching pattern are replaced. For the patterns, you can
1729             use strings or regular expression references.
1730              
1731             =cut
1732              
1733             sub replace {
1734 4     4 1 21 my ($self, $field, $match, $change) = @_;
1735              
1736 4         9 my $string = $self->get( $field );
1737 4         44 $string =~ s/$match/$change/g;
1738 4         14 return $string;
1739             }
1740              
1741              
1742             =head2 Other
1743              
1744             =head3 is_valid
1745              
1746             This method returns true or false. True means that the pipeline is ready to
1747             go. False, of course, means that there's a problem. In a list context,
1748             B<is_invalid> returns the false value and an error message. On success, the
1749             error message is C<undef>.
1750              
1751             =cut
1752              
1753             sub is_valid {
1754 67     67 1 148 my $self = shift;
1755 67         111 my $error = undef;
1756              
1757 67 100 100     1548 if (!defined $self->_work_in) {
    100          
    100          
    100          
1758 3         6 $error = 'The working folder was not set';
1759             } elsif (!defined $self->_input) {
1760 4         7 $error = 'The "input" object was not set';
1761             } elsif (!defined $self->_output) {
1762 2         4 $error = 'The "output" object was not set';
1763             } elsif (!$self->has_mapping && !$self->has_constants) {
1764 2         5 $error = 'The mapping was not set';
1765             }
1766              
1767 67 100       187 if (wantarray) {
1768 58 100       223 return ((defined( $error ) ? 0 : 1), $error);
1769             } else {
1770 9 100       45 return (defined( $error ) ? 0 : 1);
1771             }
1772             }
1773              
1774              
1775             #----------------------------------------------------------------------
1776             # Internal methods and attributes.
1777              
1778             # These attributes define field aliases. This is how column names work for Excel
1779             # and CSV. The script may also define aliases to shortcut long names.
1780              
1781             has '_alias' => (
1782             default => sub { {input => [], pipeline => []} },
1783             init_arg => undef,
1784             is => 'ro',
1785             isa => 'HashRef[ArrayRef[HashRef[Str]]]',
1786             );
1787              
1788             has '_alias_cache' => (
1789             default => sub { {} },
1790             handles => {_alias_cache_built => 'count'},
1791             is => 'ro',
1792             isa => 'HashRef[ArrayRef[Str]]',
1793             traits => [qw/Hash/],
1794             );
1795              
1796             has '_alias_type' => (
1797             default => 'pipeline',
1798             init_arg => undef,
1799             is => 'rw',
1800             isa => 'Str',
1801             );
1802              
1803              
1804             # This private method creates the ETL::Pipeline::Input and ETL::Pipeline::Output
1805             # objects. It allows me to centralize the error handling. The program dies if
1806             # there's an error. It means that something is wrong with the corresponding
1807             # class. And I don't want to hide those errors. You can only fix errors if you
1808             # know about them.
1809             #
1810             # Override or modify this method if you want to perform extra checks.
1811             #
1812             # The first parameter is a string with either "Input" or "Output".
1813             # The method appends this value onto "ETL::Pipeline". For example, "Input"
1814             # becomes "ETL::Pipeline::Input".
1815             #
1816             # The rest of the parameters are passed directly into the constructor for the
1817             # class this method instantiates.
1818             sub _object_of_class {
1819 122     122   205 my $self = shift;
1820 122         276 my $action = shift;
1821              
1822 122         313 my @arguments = @_;
1823 122 50 66     579 @arguments = @{$arguments[0]} if
  0         0  
1824             scalar( @arguments ) == 1
1825             && ref( $arguments[0] ) eq 'ARRAY'
1826             ;
1827              
1828 122         251 my $class = shift @arguments;
1829 122 100       402 if (substr( $class, 0, 1 ) eq '+') {
1830 11         27 $class = substr( $class, 1 );
1831             } else {
1832 111         261 my $base = "ETL::Pipeline::$action";
1833 111 50       441 $class = "${base}::$class" if substr( $class, 0, length( $base ) ) ne $base;
1834             }
1835              
1836 122         286 my %attributes = @arguments;
1837 122         266 $attributes{pipeline} = $self;
1838              
1839 11     11   5519 my $object = eval "use $class; $class->new( \%attributes )";
  11     11   27877  
  11     9   357  
  11     9   6468  
  11     8   4486  
  11     8   378  
  9     6   135  
  9     6   34  
  9         256  
  9         83  
  9         30  
  9         230  
  8         126  
  8         34  
  8         270  
  8         94  
  8         22  
  8         184  
  6         82  
  6         24  
  6         146  
  6         56  
  6         42  
  6         131  
  122         9032  
1840 122 50       695 croak "Error creating $class...\n$@\n" unless defined $object;
1841 122         4116 return $object;
1842             }
1843              
1844              
1845             =head1 ADVANCED TOPICS
1846              
1847             =head2 Multiple input sources
1848              
1849             It is not uncommon to receive your data spread across more than one file. How
1850             do you guarantee that each pipeline pulls files from the same working directory
1851             (L</work_in>)? You L</chain> the pipelines together.
1852              
1853             The L</chain> method works like this...
1854              
1855             ETL::Pipeline->new( {
1856             work_in => {search => 'C:\Data', find => qr/Ficticious/},
1857             input => ['Excel', iname => 'main.xlsx' ],
1858             mapping => {Name => 'A', Address => 'B', ID => 'C' },
1859             constants => {Type => 1, Information => 'Demographic' },
1860             output => ['SQL', table => 'NewData' ],
1861             } )->process->chain( {
1862             input => ['Excel', iname => 'notes.xlsx' ],
1863             mapping => {User => 'A', Text => 'B', Date => 'C' },
1864             constants => {Type => 2, Information => 'Note' },
1865             output => ['SQL', table => 'OtherData' ],
1866             } )->process;
1867              
1868             When the first pipeline finishes, it creates a new object with the same
1869             L</work_in>. The code then calls L</process> on the new object. The second
1870             pipeline copies L</work_in> from the first pipeline.
1871              
1872             =head2 Writing an input source
1873              
1874             B<ETL::Pipeline> provides some basic, generic input sources. Inevitably, you
1875             will come across data that doesn't fit one of these. No problem.
1876             B<ETL::Pipeline> lets you create your own input sources.
1877              
1878             An input source is a L<Moose> class that implements the L<ETL::Pipeline::Input>
1879             role. The role requires that you define the L<ETL::Pipeline::Input/run> method.
1880             B<ETL::Pipeline> calls that method. Name your class B<ETL::Pipeline::Input::*>
1881             and the L</input> method can find it automatically.
1882              
1883             See L<ETL::Pipeline::Input> for more details.
1884              
1885             =head2 Writing an output destination
1886              
1887             B<ETL::Pipeline> does not have any default output destinations. Output
1888             destinations are customized. You have something you want done with the data.
1889             And that something intimately ties into your specific business. You will have
1890             to write at least one output destination to do anything useful.
1891              
1892             An output destination is a L<Moose> class that implements the
1893             L<ETL::Pipeline::Output> role. The role defines required methods.
1894             B<ETL::Pipeline> calls those methods. Name your class
1895             B<ETL::Pipeline::Output::*> and the L</output> method can find it automatically.
1896              
1897             See L<ETL::Pipeline::Output> for more details.
1898              
1899             =head2 Why are the inputs and outputs separate?
1900              
1901             Wouldn't it make sense to have an input source for Excel and an output
1902             destination for Excel?
1903              
1904             Input sources are generic. It takes the same code to read from one Excel file
1905             as another. Output destinations, on the other hand, are customized for your
1906             business - with data validation and business logic.
1907              
1908             B<ETL::Pipeline> assumes that you have multiple input sources. Different
1909             feeds use different formats. But output destinations will be much fewer.
1910             You're writing data into a centralized place.
1911              
1912             For these reasons, it makes sense to keep the input sources and output
1913             destinations separate. You can easily add more inputs without affecting the
1914             outputs.
1915              
1916             =head2 Custom logging
1917              
1918             The default L<status> method send updates to STDOUT. If you want to add log
1919             files or integrate with a GUI, then subclass B<ETL::Pipeline> and
1920             L<override|Moose::Manual::MethodModifiers/OVERRIDE-AND-SUPER> the L</status>
1921             method.
1922              
1923             =head1 SEE ALSO
1924              
1925             L<ETL::Pipeline::Input>, L<ETL::Pipeline::Output>
1926              
1927             =head2 Input Source Formats
1928              
1929             L<ETL::Pipeline::Input::Excel>, L<ETL::Pipeline::Input::DelimitedText>,
1930             L<ETL::Pipeline::Input::JsonFiles>, L<ETL::Pipeline::Input::Xml>,
1931             L<ETL::Pipeline::Input::XmlFiles>
1932              
1933             =head1 REPOSITORY
1934              
1935             L<https://github.com/rbwohlfarth/ETL-Pipeline>
1936              
1937             =head1 AUTHOR
1938              
1939             Robert Wohlfarth <robert.j.wohlfarth@vumc.org>
1940              
1941             =head1 COPYRIGHT AND LICENSE
1942              
1943             Copyright (c) 2021 Robert Wohlfarth
1944              
1945             This module is free software; you can redistribute it and/or modify it
1946             under the same terms as Perl 5.10.0. For more details, see the full text
1947             of the licenses in the directory LICENSES.
1948              
1949             This program is distributed in the hope that it will be useful, but
1950             without any warranty; without even the implied
1951              
1952             =cut
1953              
1954 11     11   119 no Moose;
  11         51  
  11         135  
1955             __PACKAGE__->meta->make_immutable;