File Coverage

blib/lib/POE/Component/EasyDBI.pm
Criterion Covered Total %
statement 232 391 59.3
branch 88 206 42.7
condition 20 78 25.6
subroutine 30 35 85.7
pod 1 19 5.2
total 371 729 50.8


line stmt bran cond sub pod time code
1             package POE::Component::EasyDBI;
2              
3 2     2   469820 use strict;
  2         20  
  2         72  
4 2     2   12 use warnings FATAL =>'all';
  2         4  
  2         132  
5              
6             # Initialize our version
7             our $VERSION = '1.30';
8              
9             # Import what we need from the POE namespace
10 2     2   11 use POE;
  2         4  
  2         13  
11 2     2   715 use POE::Session;
  2         4  
  2         9  
12 2     2   1160 use POE::Filter::Reference;
  2         15205  
  2         62  
13 2     2   944 use POE::Filter::Line;
  2         3148  
  2         76  
14 2     2   1485 use POE::Wheel::Run;
  2         34819  
  2         65  
15 2     2   1249 use POE::Component::EasyDBI::SubProcess;
  2         6  
  2         87  
16              
17 2     2   1078 use Params::Util qw( _ARRAY _HASH );
  2         12716  
  2         145  
18 2     2   22 use Scalar::Util qw( reftype );
  2         5  
  2         79  
19              
20              
21             # Miscellaneous modules
22 2     2   41 use Carp;
  2         5  
  2         108  
23 2     2   11 use vars qw($AUTOLOAD);
  2         4  
  2         9828  
24              
25             our %COMMANDS = map { $_ => 1 } qw(
26             commit
27             rollback
28             begin_work
29             func
30             method
31             insert
32             do
33             single
34             quote
35             arrayhash
36             hashhash
37             hasharray
38             array
39             arrayarray
40             hash
41             keyvalhash
42             shutdown
43             combo
44             );
45              
46             sub AUTOLOAD {
47 13     13   8917 my $self = shift;
48 13         29 my $method = $AUTOLOAD;
49 13         105 $method =~ s/.*:://;
50 13 50       69 my $call = ( $method =~ s/^_// ) ? 1 : 0;
51              
52 13 50       70 return unless $method =~ /[^A-Z]/;
53              
54 13         70 $method = lc( $method );
55              
56             croak "EasyDBI command $method does not exist"
57 13 50       36 unless ( exists( $COMMANDS{ $method } ) );
58              
59 13 100       180 my @args = ( $method eq 'shutdown' ) ? @_ : { @_ };
60 13 50       37 if ( $call ) {
61 0         0 $poe_kernel->call($self->{ID} => $method => @args);
62             } else {
63 13         85 $poe_kernel->post($self->{ID} => $method => @args);
64             }
65             }
66              
67             # TODO use constants?
68             sub MAX_RETRIES () { 5 }
69             sub DEBUG () { 0 }
70              
71             # Autoflush on STDOUT
72             # Select returns last selected handle
73             # So, reselect it after selecting STDOUT and setting Autoflush
74             #select((select(STDOUT), $| = 1)[0]);
75              
76             sub spawn {
77 1     1 0 1623 my ($self,$session) = &new;
78 1         6 return $session;
79             }
80              
81             sub create {
82 0     0 0 0 &spawn;
83             }
84              
85             sub new {
86 2     2 0 1598 my $class = shift;
87              
88             # The options hash
89 2         4 my %opt;
90              
91             # Support passing in a hash ref or a regular hash
92 2 50 33     14 if ((@_ & 1) && _HASH($_[0])) {
93 0         0 %opt = %{$_[0]};
  0         0  
94             } else {
95             # Sanity checking
96 2 50       8 if (@_ & 1) {
97 0         0 croak('POE::Component::EasyDBI requires an even number of options '
98             .'passed to new()/spawn() call');
99             }
100 2         22 %opt = @_;
101             }
102              
103             # lowercase keys
104 2         11 %opt = map { lc($_) => $opt{$_} } keys %opt;
  16         51  
105              
106 2         28 my @valid = qw(
107             dsn
108             username
109             password
110             options
111             alias
112             max_retries
113             ping_timeout
114             use_cancel
115             no_connect_failures
116             connect_error
117             reconnect_wait
118             connected
119             no_warnings
120             sig_ignore_off
121             no_cache
122             alt_fork
123             stopwatch
124             query_logger
125             );
126              
127             # check the DSN
128             # username/password/port other options
129             # should be part of the DSN
130 2 50       8 if (!exists($opt{dsn})) {
131 0         0 croak('DSN is required to spawn a new POE::Component::EasyDBI '
132             .'instance!');
133             }
134              
135             # check the USERNAME
136 2 50       9 if (!exists($opt{username})) {
137 0         0 croak('username is required to spawn a new POE::Component::EasyDBI '
138             .'instance!');
139             }
140              
141             # check the PASSWORD
142 2 50       6 if (!exists($opt{password})) {
143 0         0 croak('password is required to spawn a new POE::Component::EasyDBI '
144             .'instance!');
145             }
146              
147             # check the reconnect wait time
148 2 50 33     17 if (exists($opt{reconnect_wait}) && $opt{reconnect_wait} < 1) {
    50          
149 0         0 warn('A reconnect_wait of less than 1 second is NOT recommended, '
150             .'continuing anyway');
151             } elsif (!$opt{reconnect_wait}) {
152 2         7 $opt{reconnect_wait} = 2;
153             }
154              
155             # check the session alias
156 2 50       7 if (!exists($opt{alias})) {
157             # Debugging info...
158 0         0 DEBUG && warn 'Using default Alias EasyDBI';
159              
160             # Set the default
161 0         0 $opt{alias} = 'EasyDBI';
162             }
163              
164             # check for connect error event
165 2 50       6 if (exists($opt{connect_error})) {
166 2 50       23 if (_ARRAY($opt{connect_error})) {
167 2 50       5 unless ($#{$opt{connect_error}} > 0) {
  2         11  
168 0         0 warn('connect_error must be an array reference that contains '
169             .'at least a session and event, ignoring');
170 0         0 delete $opt{connect_error};
171             }
172             } else {
173 0         0 warn('connect_error must be an array reference that contains at '
174             .'least a session and event, ignoring');
175 0         0 delete $opt{connect_error};
176             }
177             }
178              
179             # check for connect event
180 2 50       8 if (exists($opt{connected})) {
181 2 50       9 if (_ARRAY($opt{connected})) {
182 2 50       4 unless ($#{$opt{connected}} > 0) {
  2         12  
183 0         0 warn('connected must be an array reference that contains '
184             .'at least a session and event, ignoring');
185 0         0 delete $opt{connected};
186             }
187             } else {
188 0         0 warn('connected must be an array reference that contains at '
189             .'least a session and event, ignoring');
190 0         0 delete $opt{connected};
191             }
192             }
193              
194 2 50       8 if (exists($opt{options})) {
195 0 0       0 unless (_HASH($opt{options})) {
196 0         0 warn('options must be a hash ref, ignoring');
197 0         0 delete $opt{options};
198             }
199             }
200              
201 2 50       8 if ($opt{stopwatch}) {
202 2     2   19 eval "use Time::Stopwatch";
  2         4  
  2         26  
  2         143  
203 2 50       9 if ($@) {
204 0         0 warn('cannot use stopwatch (ignored), Time::Stopwatch not installed? ');
205 0         0 delete $opt{stopwatch};
206             }
207             }
208              
209 2         8 my $keep = { map { $_ => delete $opt{$_} } @valid };
  36         88  
210 2         13 $keep->{'parent_pid'} = $$;
211              
212             # Anything left over is unrecognized
213 2 50       9 if (keys %opt) {
214 0         0 croak('Unrecognized keys/options ('.join(',',(keys %opt))
215             .') were present in new() call to POE::Component::EasyDBI!');
216             }
217              
218 2   33     15 my $self = bless($keep,ref $class || $class);
219              
220             # Create a new session for ourself
221             my $session = POE::Session->create(
222             # Our subroutines
223             'object_states' => [
224             $self => {
225             # Maintenance events
226             '_start' => 'start',
227             '_stop' => 'stop',
228             'setup_wheel' => 'setup_wheel',
229             'sig_child' => 'sig_child',
230              
231             # child events
232             'child_error' => 'child_error',
233             'child_closed' => 'child_closed',
234             'child_STDOUT' => 'child_STDOUT',
235             'child_STDERR' => 'child_STDERR',
236              
237             # database events
238 36         218 (map { $_ => 'db_handler', uc($_) => 'db_handler' } keys %COMMANDS),
239              
240             # redefine
241             'combo' => 'combo_query',
242             'COMBO' => 'combo_query',
243             'shutdown' => 'shutdown_poco',
244             'SHUTDOWN' => 'shutdown_poco',
245              
246              
247             # Queue handling
248             'send_query' => 'send_query',
249             'check_queue' => 'check_queue',
250             'print_queue' => 'print_queue',
251             },
252             ],
253              
254             # Set up the heap for ourself
255             'heap' => {
256             # The queue of DBI calls
257             'queue' => [],
258             'idcounter' => 0,
259              
260             # The Wheel::Run object placeholder
261             'wheel' => undef,
262              
263             # How many times have we re-created the wheel?
264             'retries' => 0,
265              
266             # Are we shutting down?
267             'shutdown' => 0,
268              
269             # Valid options
270             'opts' => $keep,
271              
272             # The alia/s we will run under
273             'alias' => $keep->{alias},
274              
275             # Number of times to retry connection
276             # (if connection failures option is off)
277             'max_retries' => $keep->{max_retries} || MAX_RETRIES,
278              
279             # Connection failure option
280 2 50 50     19 'no_connect_failures' => $keep->{no_connect_failures} || 0,
      50        
281              
282             # extra params for actions
283             action_params => {
284             commit => [],
285             rollback => [],
286             begin_work => [],
287             func => [qw( args )],
288             method => [qw( function args )],
289             single => [qw( separator )],
290             insert => [qw( insert hash table last_insert_id )],
291             array => [qw( chunked separator )],
292             arrayarray => [qw( chunked )],
293             keyvalhash => [qw( primary_key chunked )],
294             hashhash => [qw( primary_key chunked )],
295             hasharray => [qw( primary_key chunked )],
296             arrayhash => [qw( chunked )],
297             },
298             },
299             ) or die 'Unable to create a new session!';
300              
301             # save the session id
302 2         1011 $self->{ID} = $session->ID;
303              
304 2 100       200 return wantarray ? ($self,$session) : $self;
305             }
306              
307             # This subroutine handles shutdown signals
308             sub shutdown_poco {
309 2     2 0 777 my ($kernel, $heap) = @_[KERNEL,HEAP];
310              
311             # Check for duplicate shutdown signals
312 2 50       12 if ($heap->{shutdown}) {
313             # Okay, let's see what's going on
314 0 0 0     0 if ($heap->{shutdown} == 1 && ! defined $_[ARG0]) {
    0          
315             # Duplicate shutdown events
316 0         0 DEBUG && warn 'Duplicate shutdown event fired!';
317 0         0 return;
318             } elsif ($heap->{shutdown} == 2) {
319             # Tried to shutdown_NOW again...
320 0         0 DEBUG && warn 'Duplicate shutdown NOW fired!';
321 0         0 return;
322             }
323             }
324              
325             # Check if we got "NOW"
326 2 50 33     14 if (defined($_[ARG0]) && uc($_[ARG0]) eq 'NOW') {
327             # Actually shut down!
328 0         0 $heap->{shutdown} = 2;
329              
330 0 0       0 if ($heap->{wheel}) {
331             # KILL our subprocess
332 0         0 $heap->{wheel}->kill(9);
333             }
334              
335             # Delete the wheel, so we have nothing to keep
336             # the GC from destructing us...
337 0         0 delete $heap->{wheel};
338              
339             # Go over our queue, and do some stuff
340 0         0 foreach my $queue (@{$heap->{queue}}) {
  0         0  
341             # Skip the special EXIT actions we might have put on the queue
342 0 0 0     0 if ($queue->{action} && $queue->{action} eq 'EXIT') { next }
  0         0  
343              
344             # Post a failure event to all the queries on the Queue
345             # informing them that we have been shutdown...
346 0         0 $queue->{error} = 'POE::Component::EasyDBI was '
347             .'shut down forcibly!';
348 0         0 $kernel->post( $queue->{session}, $queue->{event}, $queue);
349              
350             # Argh, decrement the refcount
351 0         0 $kernel->refcount_decrement( $queue->{session}, 'EasyDBI' );
352             }
353              
354             # Tell the kernel to kill us!
355 0         0 $kernel->signal( $_[SESSION], 'KILL' );
356             } else {
357             # Gracefully shut down...
358 2         7 $heap->{shutdown} = 1;
359              
360             # Put into the queue EXIT for the child
361 2         15 $kernel->yield( 'send_query', {
362             action => 'EXIT',
363             sql => undef,
364             placeholders => undef,
365             }
366             );
367             }
368             }
369              
370             sub combo_query {
371 2     2 0 334 my ($kernel, $heap, $args) = @_[KERNEL,HEAP,ARG0];
372              
373             # Get the arguments
374 2 50       15 unless (_HASH($args)) {
375 0         0 $args = { @_[ ARG0 .. $#_ ] };
376             }
377              
378             # Add some stuff to the args
379             # defaults to sender, but can be specified
380 2 50       17 unless (defined($args->{session})) {
381 2         8 $args->{session} = $_[SENDER]->ID();
382 2         14 DEBUG && print "setting session to $args->{session}\n";
383             }
384              
385 2         7 $args->{action} = $_[STATE];
386              
387 2 50       10 if (!exists($args->{event})) {
388             # Nothing much we can do except drop this quietly...
389 0         0 warn "Did not receive an event argument from caller ".$_[SENDER]->ID
390             ." State: " . $_[STATE] . " Args: " . join('',%$args);
391 0         0 return;
392             } else {
393 2         8 my $a = ref($args->{event});
394 2 0 33     19 unless (!ref($a) || $a =~ m/postback/i || $a =~ m/callback/i) {
      33        
395 0         0 warn "Received an malformed event argument from caller"
396             ." (only postbacks, callbacks and scalar allowed) "
397             .$_[SENDER]->ID." -> State: " . $_[STATE] . " Event: $args->{event}"
398             ." Args: " . %$args;
399 0         0 return;
400             }
401             }
402              
403 2         7 my @res;
404             my $handle = sub {
405 4     4   10 push(@res, shift);
406 4 100       27 $poe_kernel->post( $args->{session} => $args->{event} => @res ) if (delete $res[ -1 ]->{__last});
407 2         102 };
408              
409 2         8 foreach my $i ( 0 .. $#{ $args->{queries} } ) {
  2         13  
410 4         20 my ($type, $arg) = %{ $args->{queries}->[ $i ] };
  4         28  
411              
412             # Copy pass-through options
413 4         9 for my $key ( keys %{ $args } ) {
  4         21  
414 16 100 66     87 next if defined $arg->{$key} || $key eq 'queries';
415 12         31 $arg->{$key} = $args->{$key};
416             }
417              
418 4         12 $arg->{event} = $handle;
419 4         8 $arg->{__last} = ( $i == $#{ $args->{queries} } );
  4         31  
420 4         29 $kernel->call( $_[SESSION] => $type => $arg );
421             }
422              
423 2         39 return;
424             }
425              
426             # This subroutine handles queries
427             sub db_handler {
428 27     27 0 6799 my ($kernel, $heap) = @_[KERNEL,HEAP];
429              
430             # Get the arguments
431 27         46 my $args;
432 27 50       118 if (_HASH($_[ARG0])) {
433 27         55 $args = $_[ARG0];
434             } else {
435 0         0 warn "first parameter must be a hash ref, trying to adjust. "
436             ."(fix this to get rid of this message)";
437 0         0 $args = { @_[ARG0 .. $#_ ] };
438             }
439              
440             # fix a stupid spelling mistake
441 27 50       71 if ($args->{seperator}) {
442 0         0 $args->{separator} = $args->{seperator};
443             }
444              
445             # Add some stuff to the args
446             # defaults to sender, but can be specified
447 27 100       78 unless (defined($args->{session})) {
448 23         60 $args->{session} = $_[SENDER]->ID();
449 23         121 DEBUG && print "setting session to $args->{session}\n";
450             }
451              
452 27         123 $args->{action} = $_[STATE];
453              
454 27 50       168 if (!exists($args->{event})) {
455             # Nothing much we can do except drop this quietly...
456 0         0 warn "Did not receive an event argument from caller ".$_[SENDER]->ID
457             ." State: " . $_[STATE] . " Args: " . join(',',%$args);
458 0         0 return;
459             } else {
460 27         76 my $a = ref($args->{event});
461 27 0 33     85 unless (!ref($a) || $a =~ m/postback/i || $a =~ m/callback/i) {
      33        
462 0         0 warn "Received an malformed event argument from caller"
463             ." (only postbacks, callbacks and scalar allowed) "
464             .$_[SENDER]->ID." -> State: " . $_[STATE] . " Event: $args->{event}"
465             ." Args: " . %$args;
466 0         0 return;
467             }
468             }
469              
470 27 100       69 if (!defined($args->{sql})) {
471 6 50       60 unless ($args->{action} =~ m/^(insert|func|method|commit|rollback|begin_work)$/i) {
472 0         0 $args->{error} = 'sql is not defined!';
473             # Okay, send the error to the Failure Event
474 0         0 $kernel->post($args->{session}, $args->{event}, $args);
475 0         0 return;
476             }
477             } else {
478 21 50       50 if (ref($args->{sql})) {
479 0         0 $args->{error} = 'sql is not a scalar!';
480 0 0 0     0 if ( reftype( $args->{event} ) && reftype( $args->{event} ) eq 'CODE' ) {
481 0         0 my $callback = delete $args->{event};
482 0         0 $callback->($args);
483             } else {
484             # send the error to the Failure Event
485 0         0 $kernel->post($args->{session}, $args->{event}, $args);
486             }
487 0         0 return;
488             }
489             }
490              
491             # Check for placeholders
492 27 100       64 if (!defined($args->{placeholders})) {
493             # Create our own empty placeholders
494 23         96 $args->{placeholders} = [];
495             } else {
496 4 50       19 unless (_ARRAY($args->{placeholders})) {
497 0         0 $args->{error} = 'placeholders is not an array!';
498 0 0 0     0 if ( reftype( $args->{event} ) && reftype( $args->{event} ) eq 'CODE' ) {
499 0         0 my $callback = delete $args->{event};
500 0         0 $callback->($args);
501             } else {
502             # Okay, send the error to the Failure Event
503 0         0 $kernel->post($args->{session}, $args->{event}, $args);
504             }
505 0         0 return;
506             }
507             }
508              
509             # Check for primary_key on HASHHASH or ARRAYHASH queries
510 27 50 33     193 if ($args->{action} eq 'HASHHASH' || $args->{action} eq 'HASHARRAY') {
511 0 0       0 if (!defined($args->{primary_key})) {
512 0         0 $args->{error} = 'primary_key is not defined! It must '
513             .'be a column name or a 1 based index of a column';
514 0 0 0     0 if ( reftype( $args->{event} ) && reftype( $args->{event} ) eq 'CODE' ) {
515 0         0 my $callback = delete $args->{event};
516 0         0 $callback->($args);
517             } else {
518 0         0 $kernel->post($args->{session}, $args->{event}, $args);
519             }
520 0         0 return;
521             } else {
522 0         0 $args->{error} = 'primary_key is not a scalar!';
523 0 0       0 if (ref($args->{sql})) {
524             # send the error to the Failure Event
525 0 0 0     0 if ( reftype( $args->{event} ) && reftype( $args->{event} ) eq 'CODE' ) {
526 0         0 my $callback = delete $args->{event};
527 0         0 $callback->($args);
528             } else {
529 0         0 $kernel->post($args->{session}, $args->{event}, $args);
530             }
531 0         0 return;
532             }
533             }
534             }
535              
536             # Check if we have shutdown or not
537 27 50       133 if ($heap->{shutdown}) {
538 0         0 $args->{error} = 'POE::Component::EasyDBI is shutting'
539             .' down now, requests are not accepted!';
540             # Do not accept this query
541 0 0 0     0 if ( reftype( $args->{event} ) && reftype( $args->{event} ) eq 'CODE' ) {
542 0         0 my $callback = delete $args->{event};
543 0         0 $callback->($args);
544             } else {
545 0         0 $kernel->post($args->{session}, $args->{event}, $args);
546             }
547 0         0 return;
548             }
549              
550             # Increment the refcount for the session that is sending us this query
551 27         117 $kernel->refcount_increment($args->{session}, 'EasyDBI');
552              
553 27 100       1009 if ($args->{session} ne $_[SENDER]->ID()) {
554 4         44 $kernel->refcount_increment($_[SENDER]->ID(), 'EasyDBI');
555 4         129 $args->{sendersession} = $_[SENDER]->ID();
556             }
557              
558             # Okay, fire off this query!
559 27         234 $kernel->call($_[SESSION] => 'send_query' => $args);
560              
561 27         187 return;
562             }
563              
564             # This subroutine starts the process of sending a query
565             sub send_query {
566 29     29 0 1608 my ($kernel, $heap, $args) = @_[KERNEL,HEAP,ARG0];
567              
568             # Validate that we have something
569 29 50 33     169 if (!defined($args) || !_HASH($args) ) {
570 0         0 return;
571             }
572              
573             # Add the ID to the query
574 29         72 $args->{id} = $heap->{idcounter}++;
575              
576             # Add this query to the queue
577 29         43 push(@{ $heap->{queue} }, $args);
  29         70  
578              
579             # Send the query!
580 29         106 $kernel->call($_[SESSION], 'check_queue');
581              
582 29         212 return;
583             }
584              
585             # This subroutine is the meat - sends queries to the subprocess
586             sub check_queue {
587 58     58 0 2532 my ($kernel, $heap) = @_[KERNEL,HEAP];
588              
589             # Check if the subprocess is currently active
590 58 100       175 return unless (!$heap->{active});
591              
592             # Check if we have a query in the queue
593 56 100       93 return unless (scalar(@{ $heap->{queue} }) > 0);
  56         183  
594              
595             # shutting down?
596 29 50       72 return unless ($heap->{shutdown} != 2);
597              
598 29 50       80 if ($heap->{opts}{stopwatch}) {
599 29         222 tie $heap->{queue}->[0]->{stopwatch}, 'Time::Stopwatch';
600             }
601              
602             # Copy what we need from the top of the queue
603 29         289 my %queue;
604 29         52 foreach (
605             qw(id sql action placeholders no_cache begin_work commit method)
606 29         124 ,@{$heap->{action_params}->{$heap->{queue}->[0]->{action}}}
607             ) {
608 266 100       583 next unless (defined($heap->{queue}->[0]->{$_}));
609 122         260 $queue{$_} = $heap->{queue}->[0]->{$_};
610             }
611              
612             # Send data only if we are not shutting down...
613             # Set the child to 'active'
614 29         52 $heap->{active} = 1;
615              
616             # Put it in the wheel
617 29         149 $heap->{wheel}->put(\%queue);
618              
619 29         4248 return;
620             }
621              
622             sub print_queue {
623 0     0 0 0 my ($kernel, $heap) = @_[KERNEL,HEAP];
624 0         0 return scalar @{$heap->{queue}};
  0         0  
625             }
626              
627             # This starts the EasyDBI
628             sub start {
629 2     2 0 1628 my ($kernel, $heap) = @_[KERNEL,HEAP];
630              
631             # Set up the alias for ourself
632 2 50       23 $kernel->alias_set($heap->{alias}) if ($heap->{alias} ne '');
633              
634             # Create the wheel
635 2         83 $kernel->call( $_[SESSION] => 'setup_wheel' );
636              
637 2         79 return;
638             }
639              
640             # This sets up the WHEEL
641             sub setup_wheel {
642 2     2 0 117 my ($kernel, $heap) = @_[KERNEL,HEAP];
643              
644             # Are we shutting down?
645 2 50       9 if ($heap->{shutdown}) {
646             # if ($heap->{wheel}) {
647             # $heap->{wheel}->kill();
648             # }
649             # Do not re-create the wheel...
650 0         0 delete $heap->{wheel};
651 0         0 return;
652             }
653              
654             # Check if we should set up the wheel
655 2 50       9 if ($heap->{retries} == $heap->{max_retries}) {
656             die 'POE::Component::EasyDBI tried '.$heap->{max_retries}
657 0         0 .' times to create a Wheel and is giving up...';
658             }
659              
660             my %prog = (
661             'Program' => \&POE::Component::EasyDBI::SubProcess::main,
662 2         11 'ProgramArgs' => [ $heap->{opts} ],
663             );
664              
665 2 50 33     23 if ($heap->{opts}{alt_fork} && $^O ne 'MSWin32') {
666 0 0       0 my $X = $heap->{opts}{alt_fork} ne "1" ? $heap->{opts}{alt_fork} : $^X;
667 0         0 %prog = (
668             'Program' => "$X -MPOE::Component::EasyDBI::SubProcess -I".join(' -I',@INC)
669             ." -e 'POE::Component::EasyDBI::SubProcess::main(1)'",
670             );
671             }
672              
673             # $kernel->sig_child( $heap->{wheel_pid} )
674             # if ( $heap->{wheel_pid} );
675              
676             # Set up the SubProcess we communicate with
677 2 50       34 $heap->{wheel} = POE::Wheel::Run->new(
678             # What we will run in the separate process
679             %prog,
680              
681             # Kill off existing FD's unless we're running in HELL^H^H^H^HMSWin32
682             'CloseOnCall' => ($^O eq 'MSWin32' ? 0 : 1),
683              
684             # Redirect errors to our error routine
685             'ErrorEvent' => 'child_error',
686              
687             # Send child died to our child routine
688             'CloseEvent' => 'child_closed',
689              
690             # Send input from child
691             'StdoutEvent' => 'child_STDOUT',
692              
693             # Send input from child STDERR
694             'StderrEvent' => 'child_STDERR',
695              
696             # Set our filters
697             # Communicate with child via Storable::nfreeze
698             'StdinFilter' => POE::Filter::Reference->new(),
699             # Receive input via Storable::nfreeze
700             'StdoutFilter' => POE::Filter::Reference->new(),
701             # Plain ol' error lines
702             'StderrFilter' => POE::Filter::Line->new(),
703             );
704              
705 2         13995 $heap->{wheel_pid} = $heap->{wheel}->PID();
706              
707 2 50       128 if ( $kernel->can( "sig_child" ) ) {
708 2         71 $kernel->sig_child( $heap->{wheel_pid} => 'sig_child' );
709             } else {
710 0         0 $kernel->sig( 'CHLD', 'sig_child' );
711             }
712              
713             # Check for errors
714 2 50       873 if (! defined $heap->{wheel}) {
715 0         0 die 'Unable to create a new wheel!';
716             } else {
717             # Increment our retry count
718 2         28 $heap->{retries}++;
719              
720             # Set the wheel to inactive
721 2         29 $heap->{active} = 0;
722              
723 2 50       48 if ($heap->{opts}{alt_fork}) {
724 0         0 $heap->{wheel}->put($heap->{opts});
725             }
726              
727             # Check for queries
728 2         98 $kernel->call($_[SESSION], 'check_queue');
729             }
730              
731 2         103 return;
732             }
733              
734       2 0   sub stop {
735             # nothing to see here, move along
736             }
737              
738             # Deletes a query from the queue, if it is not active
739             sub delete_query {
740 0     0 0 0 my ($kernel, $heap) = @_[KERNEL,HEAP];
741             # ARG0 = ID
742 0         0 my $id = $_[ARG0];
743              
744             # Validation
745 0 0       0 if (!defined($id)) {
746             # Debugging
747 0         0 DEBUG && warn 'In Delete_Query event with no arguments!';
748 0         0 return;
749             }
750              
751             # Check if the id exists + not at the top of the queue :)
752 0 0       0 if (defined($heap->{queue}->[0])) {
753 0 0       0 if ($heap->{queue}->[0]->{id} eq $id) {
754             # Query is still active, nothing we can do...
755 0         0 return undef;
756             } else {
757             # Search through the rest of the queue and see what we get
758 0         0 foreach my $count (@{ $heap->{queue} }) {
  0         0  
759 0 0       0 if ($heap->{queue}->[$count]->{id} eq $id) {
760             # Found a match, delete it!
761 0         0 splice(@{ $heap->{queue} }, $count, 1);
  0         0  
762              
763             # Return success
764 0         0 return 1;
765             }
766             }
767             }
768             }
769              
770             # If we got here, we didn't find anything
771 0         0 return undef;
772             }
773              
774             # Handles child DIE'ing
775             sub child_closed {
776 0     0 0 0 my ($kernel, $heap) = @_[KERNEL,HEAP];
777              
778 0         0 DEBUG && warn 'POE::Component::EasyDBI\'s Wheel closed';
779 0 0       0 if ($heap->{shutdown}) {
780             # if ($heap->{wheel}) {
781             # $heap->{wheel}->kill();
782             # }
783 0         0 delete $heap->{wheel};
784 0         0 return;
785             }
786              
787             # Emit debugging information
788 0         0 DEBUG && warn 'Restarting it...';
789              
790             # Create the wheel again
791 0         0 delete $heap->{wheel};
792 0         0 $kernel->call($_[SESSION], 'setup_wheel');
793              
794 0         0 return;
795             }
796              
797             # Handles child error
798             sub child_error {
799 2     2 0 1772 my $heap = $_[HEAP];
800              
801             # Emit warnings only if debug is on
802 2         15 DEBUG && do {
803             # Copied from POE::Wheel::Run manpage
804             my ($operation, $errnum, $errstr) = @_[ARG0 .. ARG2];
805             warn "POE::Component::EasyDBI got an $operation error $errnum from Subprocess: '$errstr' shutdown: $heap->{shutdown}\n";
806             };
807              
808 2 50       13 if ($heap->{shutdown}) {
809 2 50       10 if ($heap->{wheel}) {
810 2         11 $heap->{wheel}->kill();
811             }
812 2         126 delete $heap->{wheel};
813 2         1326 return;
814             }
815             }
816              
817             # Handles child STDOUT output
818             sub child_STDOUT {
819 29     29 0 17510958 my ($self, $kernel, $heap, $data) = @_[OBJECT,KERNEL,HEAP,ARG0];
820              
821             # Validate the argument
822 29 50       155 unless ( _HASH($data) ) {
823 0         0 warn "POE::Component::EasyDBI did not get a hash from the child ( $data )";
824 0         0 return;
825             }
826              
827              
828             # DEBUG && do {
829             # require Data::Dumper;
830             # print Data::Dumper->Dump([$data,$heap->{queue}[0]]);
831             # };
832              
833             # Check for special DB messages with ID of 'DBI'
834 29 50       116 if ($data->{id} eq 'DBI') {
835             # Okay, we received a DBI error -> error in connection...
836 0 0       0 if ($heap->{no_connect_failures}) {
837 0         0 my $qc = {};
838 0 0       0 if (defined($heap->{queue}->[0])) {
839 0         0 $qc = $heap->{queue}[0];
840             }
841 0         0 $qc->{error} = $data->{error};
842 0 0 0     0 if (_ARRAY($heap->{opts}{connect_error})) {
    0          
843 0         0 $kernel->post(@{$heap->{opts}{connect_error}}, $qc);
  0         0  
844 0 0       0 if (_HASH($heap->{queue}[0])) {
845 0         0 delete $heap->{queue}[0]->{error};
846             }
847             } elsif ($qc->{session} && $qc->{event}) {
848 0 0 0     0 if ( reftype( $qc->{event} ) && reftype( $qc->{event} ) eq 'CODE' ) {
849 0         0 my $callback = delete $qc->{event};
850 0         0 $callback->($qc);
851             } else {
852 0         0 $kernel->post($qc->{session}, $qc->{event}, $qc);
853             }
854             } else {
855 0         0 warn "No connect_error defined and no queries in the queue while "
856             ."error occurred: $data->{error}";
857             }
858             # print "DBI error: $data->{error}, retrying\n";
859 0         0 return;
860             }
861              
862             # Shutdown ourself!
863 0         0 $kernel->call($_[SESSION], 'shutdown', 'NOW');
864              
865             # Too bad that we have to die...
866 0         0 croak("Could not connect to DBI or database went away: $data->{error}");
867             }
868              
869 29 100       106 if ($data->{id} eq 'DBI-CONNECTED') {
870 2 50       35 if (_ARRAY($heap->{opts}{connected})) {
871 2         18 my $query_copy = {};
872 2 50       29 if (defined($heap->{queue}->[0])) {
873 0         0 $query_copy = { %{$heap->{queue}[0]} };
  0         0  
874             }
875 2         27 $kernel->post(@{$heap->{opts}{connected}}, $query_copy);
  2         36  
876             }
877 2         523 return;
878             }
879              
880 27         38 my $query;
881 27         44 my $refcount_decrement = 0;
882              
883 27 50       73 if (exists($data->{chunked})) {
884             # Get the query from the queue
885 0 0       0 if (exists($data->{last_chunk})) {
886             # last chunk, delete it out of the queue
887 0         0 $query = shift(@{ $heap->{queue} });
  0         0  
888 0         0 $refcount_decrement = 1;
889             } else {
890 0         0 $query = $heap->{queue}->[0];
891             }
892             } else {
893             # Check to see if the ID matches with the top of the queue
894 27 50       85 if ($data->{id} ne $heap->{queue}->[0]->{id}) {
895 0         0 die "Internal error in queue/child consistency! ( CHILD: $data->{id} "
896             ."QUEUE: $heap->{queue}->[0]->{id} )";
897             }
898             # Get the query from the top of the queue
899 27         39 $query = shift(@{ $heap->{queue} });
  27         59  
900 27         54 $refcount_decrement = 1;
901             }
902              
903             # copy the query data, so we don't clobber the
904             # stored data when using chunks
905             #my $query_copy = { %{ $query } };
906              
907             # marry data from the child to the data from the queue
908             #%$query_copy = (%$query_copy, %$data);
909              
910             # my $query_copy = { (%$query, %$data) };
911              
912             # my $query_copy = $query;
913             # foreach (keys %$data) { $query_copy->{$_} = $data->{$_}; }
914              
915 27         250 my $query_copy = { %$query, %$data };
916              
917             # undocumented
918             $poe_kernel->call( $self->{query_logger} => _log => $query_copy )
919 27 50       340 if ( $self->{query_logger} );
920              
921             # my ($ses,$evt) = ("$query_copy->{session}", "$query_copy->{event}");
922              
923             # $kernel->call($ses => $evt => $query_copy);
924              
925             #undef $query;
926             #foreach my $k (keys %$data) {
927             # $query_copy->{$k} = $data->{$k};
928             #}
929              
930 27 100 66     192 if ( reftype( $query_copy->{event} ) && reftype( $query_copy->{event} ) eq 'CODE' ) {
931 7         17 DEBUG && print "calling callback\n";
932 7         21 my $callback = delete $query_copy->{event};
933 7         45 $callback->($query_copy);
934             } else {
935             #DEBUG && print "calling event $query->{event} in session $query->{session} from our session ".$_[SESSION]->ID."\n";
936 20         63 $kernel->call($query_copy->{session} => $query_copy->{event} => $query_copy);
937             }
938              
939             # Decrement the refcount for the session that sent us a query
940 27 50       11073 if ($refcount_decrement) {
941 27         86 $heap->{active} = 0;
942 27         92 $kernel->refcount_decrement($query_copy->{session}, 'EasyDBI');
943              
944 27 100 66     1280 if (defined($query_copy->{sendersession}) && $query_copy->{sendersession} ne $query_copy->{session}) {
945 4         17 $kernel->refcount_decrement($query_copy->{sendersession}, 'EasyDBI');
946             }
947              
948             # Now, that we have got a result, check if we need to send another query
949 27         202 $kernel->call($_[SESSION], 'check_queue');
950             }
951              
952 27         343 return;
953             }
954              
955             # Handles child STDERR output
956             sub child_STDERR {
957 0     0 0 0 my $input = $_[ARG0];
958              
959             # Skip empty lines as the POE::Filter::Line manpage says...
960 0 0       0 if ($input eq '') { return }
  0         0  
961              
962 0 0 0     0 return if ($_[HEAP]->{opts}->{no_warnings} && !DEBUG);
963              
964 0         0 warn "$input\n";
965             }
966              
967             sub sig_child {
968 2     2 0 5490 my ($kernel, $heap) = @_[KERNEL, HEAP];
969              
970 2         8 delete $heap->{wheel_pid};
971 2         32 $kernel->sig_handled();
972              
973 2 50       38 if ( $heap->{shutdown} ) {
974             # Remove our alias so we can be properly terminated
975 2 50       14 $kernel->alias_remove($heap->{alias}) if ($heap->{alias} ne '');
976             # and the child
977 2         128 $kernel->sig( 'CHLD' );
978             }
979             }
980              
981             # ----------------
982             # Object methods
983              
984             sub ID {
985 1     1 1 3 my $self = shift;
986 1         47 return $self->{ID};
987             }
988              
989             # not called directly
990             sub DESTROY {
991 1     1   366 my $self = shift;
992 1 50 33     12 if (ref($self) && $self->ID) {
993             # $poe_kernel->post($self->ID => 'shutdown' => @_);
994             } else {
995 0           return undef;
996             }
997             }
998              
999             # End of module
1000             1;
1001              
1002             # egg: I like Sealab 2021. This is why you can see lots of 2021 refences in my code.
1003              
1004             #=item C
1005             #
1006             #Optional. If set to a true value, it will install a signal handler that will
1007             #call $sth->cancel when a INT signal is received by the sub-process. You may
1008             #want to read the docs on your driver to see if this is supported.
1009              
1010             __END__