| line | stmt | bran | cond | sub | pod | time | code | 
| 1 |  |  |  |  |  |  | package WorkerManager::TheSchwartz; | 
| 2 | 1 |  |  | 1 |  | 1203 | use strict; | 
|  | 1 |  |  |  |  | 3 |  | 
|  | 1 |  |  |  |  | 28 |  | 
| 3 | 1 |  |  | 1 |  | 5 | use warnings; | 
|  | 1 |  |  |  |  | 2 |  | 
|  | 1 |  |  |  |  | 24 |  | 
| 4 |  |  |  |  |  |  |  | 
| 5 | 1 |  |  | 1 |  | 583 | use TheSchwartz; | 
|  | 1 |  |  |  |  | 72374 |  | 
|  | 1 |  |  |  |  | 33 |  | 
| 6 | 1 |  |  | 1 |  | 10 | use Time::Piece; | 
|  | 1 |  |  |  |  | 2 |  | 
|  | 1 |  |  |  |  | 14 |  | 
| 7 | 1 |  |  | 1 |  | 147 | use Module::Load (); | 
|  | 1 |  |  |  |  | 2 |  | 
|  | 1 |  |  |  |  | 37 |  | 
| 8 | 1 |  |  | 1 |  | 490 | use Time::HiRes qw( time ); | 
|  | 1 |  |  |  |  | 1334 |  | 
|  | 1 |  |  |  |  | 13 |  | 
| 9 | 1 |  |  | 1 |  | 209 | use POSIX qw(getppid); | 
|  | 1 |  |  |  |  | 2 |  | 
|  | 1 |  |  |  |  | 8 |  | 
| 10 | 1 |  |  | 1 |  | 80 | use Carp; | 
|  | 1 |  |  |  |  | 1 |  | 
|  | 1 |  |  |  |  | 681 |  | 
| 11 |  |  |  |  |  |  |  | 
| 12 |  |  |  |  |  |  | sub new { | 
| 13 | 0 |  |  | 0 | 0 |  | my ($class, $worker, $options) = @_; | 
| 14 | 0 |  | 0 |  |  |  | $options ||= {}; | 
| 15 |  |  |  |  |  |  |  | 
| 16 | 0 |  |  |  |  |  | my $databases; | 
| 17 | 0 | 0 |  |  |  |  | if ($databases = delete $options->{databases}) { | 
| 18 | 0 | 0 |  |  |  |  | $databases = [$databases] unless UNIVERSAL::isa($databases, 'ARRAY'); | 
| 19 |  |  |  |  |  |  | } else { | 
| 20 | 0 |  |  |  |  |  | croak 'not specified database information in config file for worker manager'; | 
| 21 |  |  |  |  |  |  | } | 
| 22 | 0 |  |  |  |  |  | my $client = TheSchwartz->new( databases => $databases, %$options); | 
| 23 |  |  |  |  |  |  |  | 
| 24 | 0 |  |  |  |  |  | my $self = bless { | 
| 25 |  |  |  |  |  |  | client => $client, | 
| 26 |  |  |  |  |  |  | worker => $worker, | 
| 27 |  |  |  |  |  |  | terminate => undef, | 
| 28 |  |  |  |  |  |  | start_time => undef, | 
| 29 |  |  |  |  |  |  | }, $class; | 
| 30 | 0 |  |  |  |  |  | $self->init; | 
| 31 | 0 |  |  |  |  |  | $self; | 
| 32 |  |  |  |  |  |  | } | 
| 33 |  |  |  |  |  |  |  | 
| 34 |  |  |  |  |  |  | sub init { | 
| 35 | 0 |  |  | 0 | 0 |  | my $self = shift; | 
| 36 |  |  |  |  |  |  | $self->{client}->set_verbose( | 
| 37 |  |  |  |  |  |  | sub { | 
| 38 | 0 |  |  | 0 |  |  | my $msg = shift; | 
| 39 | 0 |  |  |  |  |  | my $job = shift; | 
| 40 |  |  |  |  |  |  | # $WorkerManager::LOGGER->('TheSchwartz', $msg) if($msg =~ /Working/); | 
| 41 | 0 | 0 |  |  |  |  | if($msg =~ /Working/){ | 
| 42 | 0 |  |  |  |  |  | $self->{start_time} = time; | 
| 43 |  |  |  |  |  |  | } | 
| 44 | 0 | 0 |  |  |  |  | return if($msg =~ /found no jobs/); | 
| 45 | 0 | 0 |  |  |  |  | if($msg =~ /^job completed|^job failed/){ | 
| 46 | 0 |  |  |  |  |  | $msg .= sprintf " %s", $job->funcname; | 
| 47 | 0 | 0 |  |  |  |  | $msg .= sprintf " process:%d", (time - $self->{start_time}) * 1000 if($self->{start_time}); | 
| 48 | 0 | 0 | 0 |  |  |  | $msg .= sprintf " delay:%d", ($self->{start_time} - $job->insert_time) * 1000 if($job && $self->{start_time}); | 
| 49 | 0 |  |  |  |  |  | $self->{start_time} = undef; | 
| 50 |  |  |  |  |  |  | }; | 
| 51 | 0 | 0 |  |  |  |  | $WorkerManager::LOGGER->('TheSchwartz', $msg) unless($msg =~ /found no jobs/); | 
| 52 | 0 |  |  |  |  |  | }); | 
| 53 | 0 | 0 |  |  |  |  | if (UNIVERSAL::isa($self->{worker}, 'ARRAY')){ | 
| 54 | 0 |  |  |  |  |  | for (@{$self->{worker}}){ | 
|  | 0 |  |  |  |  |  |  | 
| 55 | 0 |  |  |  |  |  | Module::Load::load($_); | 
| 56 | 0 | 0 |  |  |  |  | $_->can('work') or die "cannot ${_}->work"; | 
| 57 | 0 |  |  |  |  |  | $self->{client}->can_do($_); | 
| 58 |  |  |  |  |  |  | } | 
| 59 |  |  |  |  |  |  | } else { | 
| 60 | 0 |  |  |  |  |  | Module::Load::load($self->{worker}); | 
| 61 | 0 | 0 |  |  |  |  | $_->can('work') or die "cannot ${_}->work"; | 
| 62 | 0 |  |  |  |  |  | $self->{client}->can_do($self->{worker}); | 
| 63 |  |  |  |  |  |  | } | 
| 64 |  |  |  |  |  |  | } | 
| 65 |  |  |  |  |  |  |  | 
| 66 |  |  |  |  |  |  | sub work { | 
| 67 | 0 |  |  | 0 | 0 |  | my $self = shift; | 
| 68 | 0 |  | 0 |  |  |  | my $max = shift || 100; | 
| 69 | 0 |  | 0 |  |  |  | my $delay = shift || 5; | 
| 70 | 0 |  |  |  |  |  | my $count = 0; | 
| 71 | 0 |  | 0 |  |  |  | while ($count < $max && ! $self->{terminate}) { | 
| 72 | 0 | 0 |  |  |  |  | if (getppid == 1) { | 
| 73 | 0 |  |  |  |  |  | die "my dad may be killed."; | 
| 74 | 0 |  |  |  |  |  | exit(1); | 
| 75 |  |  |  |  |  |  | } | 
| 76 | 0 | 0 |  |  |  |  | if($self->{client}->work_once){ | 
| 77 | 0 |  |  |  |  |  | $count++; | 
| 78 |  |  |  |  |  |  | } else { | 
| 79 | 0 |  |  |  |  |  | sleep $delay; | 
| 80 |  |  |  |  |  |  | } | 
| 81 |  |  |  |  |  |  | } | 
| 82 |  |  |  |  |  |  | } | 
| 83 |  |  |  |  |  |  |  | 
| 84 |  |  |  |  |  |  | sub terminate { | 
| 85 | 0 |  |  | 0 | 0 |  | my $self = shift; | 
| 86 | 0 |  |  |  |  |  | $self->{terminate} = 1; | 
| 87 |  |  |  |  |  |  | } | 
| 88 |  |  |  |  |  |  |  | 
| 89 |  |  |  |  |  |  | 1; |