| line | stmt | bran | cond | sub | pod | time | code | 
| 1 |  |  |  |  |  |  | package AnyEvent::ProcessPool::Process; | 
| 2 |  |  |  |  |  |  | # ABSTRACT: Supervisor for a single, forked process | 
| 3 |  |  |  |  |  |  | $AnyEvent::ProcessPool::Process::VERSION = '0.06_001'; # TRIAL | 
| 4 |  |  |  |  |  |  |  | 
| 5 | 3 |  |  | 3 |  | 157939 | $AnyEvent::ProcessPool::Process::VERSION = '0.06001';use common::sense; | 
|  | 3 |  |  |  |  | 19 |  | 
|  | 3 |  |  |  |  | 19 |  | 
| 6 |  |  |  |  |  |  |  | 
| 7 | 3 |  |  | 3 |  | 1080 | use Moo; | 
|  | 3 |  |  |  |  | 23001 |  | 
|  | 3 |  |  |  |  | 14 |  | 
| 8 | 3 |  |  | 3 |  | 3062 | use Carp; | 
|  | 3 |  |  |  |  | 6 |  | 
|  | 3 |  |  |  |  | 134 |  | 
| 9 | 3 |  |  | 3 |  | 622 | use AnyEvent; | 
|  | 3 |  |  |  |  | 3738 |  | 
|  | 3 |  |  |  |  | 64 |  | 
| 10 | 3 |  |  | 3 |  | 1357 | use AnyEvent::Handle; | 
|  | 3 |  |  |  |  | 24612 |  | 
|  | 3 |  |  |  |  | 112 |  | 
| 11 | 3 |  |  | 3 |  | 24 | use AnyEvent::Util qw(fork_call portable_socketpair fh_nonblocking); | 
|  | 3 |  |  |  |  | 6 |  | 
|  | 3 |  |  |  |  | 146 |  | 
| 12 | 3 |  |  | 3 |  | 927 | use AnyEvent::ProcessPool::Task; | 
|  | 3 |  |  |  |  | 9 |  | 
|  | 3 |  |  |  |  | 98 |  | 
| 13 | 3 |  |  | 3 |  | 993 | use AnyEvent::ProcessPool::Util qw(next_id cpu_count); | 
|  | 3 |  |  |  |  | 12 |  | 
|  | 3 |  |  |  |  | 271 |  | 
| 14 | 3 |  |  | 3 |  | 20 | use Try::Catch; | 
|  | 3 |  |  |  |  | 7 |  | 
|  | 3 |  |  |  |  | 2099 |  | 
| 15 |  |  |  |  |  |  |  | 
| 16 |  |  |  |  |  |  | has limit   => (is => 'ro'); | 
| 17 |  |  |  |  |  |  | has handle  => (is => 'rw', clearer => 1, predicate => 'is_running'); | 
| 18 |  |  |  |  |  |  | has count   => (is => 'rw', default => sub{ 0 }); | 
| 19 |  |  |  |  |  |  | has stopped => (is => 'rw', default => sub{ 0 }); | 
| 20 |  |  |  |  |  |  |  | 
| 21 |  |  |  |  |  |  | sub DEMOLISH { | 
| 22 | 55 |  |  | 55 | 0 | 12743 | my $self = shift; | 
| 23 | 55 | 100 |  |  |  | 630 | $self->stop if $self->is_running; | 
| 24 |  |  |  |  |  |  | } | 
| 25 |  |  |  |  |  |  |  | 
| 26 |  |  |  |  |  |  | sub stop { | 
| 27 | 69 |  |  | 69 | 0 | 105 | my $self = shift; | 
| 28 | 69 |  |  |  |  | 157 | $self->stopped(1); | 
| 29 | 69 | 100 |  |  |  | 221 | $self->handle->push_shutdown if $self->handle; | 
| 30 |  |  |  |  |  |  | } | 
| 31 |  |  |  |  |  |  |  | 
| 32 |  |  |  |  |  |  | sub has_limit { | 
| 33 | 21 |  |  | 21 | 0 | 45 | my $self = shift; | 
| 34 | 21 |  |  |  |  | 194 | return defined $self->limit; | 
| 35 |  |  |  |  |  |  | } | 
| 36 |  |  |  |  |  |  |  | 
| 37 |  |  |  |  |  |  | sub has_capacity { | 
| 38 | 38 |  |  | 38 | 0 | 89 | my $self = shift; | 
| 39 | 38 |  | 66 |  |  | 214 | return $self->is_running && (!$self->has_limit || $self->count < $self->limit); | 
| 40 |  |  |  |  |  |  | } | 
| 41 |  |  |  |  |  |  |  | 
| 42 |  |  |  |  |  |  | sub run { | 
| 43 | 38 |  |  | 38 | 0 | 101 | my ($self, $task) = @_; | 
| 44 | 38 | 50 |  |  |  | 154 | return if $self->stopped; | 
| 45 |  |  |  |  |  |  |  | 
| 46 | 38 | 100 |  |  |  | 142 | if (!$self->has_capacity) { | 
| 47 | 25 | 100 |  |  |  | 157 | if (my $handle = $self->handle) { | 
| 48 | 8 |  |  |  |  | 259 | $self->clear_handle; | 
| 49 | 8 |  |  | 0 |  | 114 | $handle->on_eof(sub{ undef $handle }); | 
|  | 0 |  |  |  |  | 0 |  | 
| 50 | 8 |  |  |  |  | 86 | $handle->push_shutdown; | 
| 51 |  |  |  |  |  |  | } | 
| 52 |  |  |  |  |  |  |  | 
| 53 | 25 |  |  |  |  | 332 | $self->spawn; | 
| 54 |  |  |  |  |  |  | } | 
| 55 |  |  |  |  |  |  |  | 
| 56 | 38 |  |  |  |  | 2761 | my $cv = AE::cv; | 
| 57 |  |  |  |  |  |  |  | 
| 58 | 38 |  |  |  |  | 1029 | $self->count($self->count + 1); | 
| 59 | 38 |  |  |  |  | 434 | $self->handle->push_write($task->encode . "\n"); | 
| 60 |  |  |  |  |  |  | $self->handle->push_read(line => "\n", sub{ | 
| 61 | 38 |  |  | 38 |  | 23361 | my ($handle, $line, $eol) = @_; | 
| 62 | 38 |  |  |  |  | 140 | my $task = AnyEvent::ProcessPool::Task->decode($line); | 
| 63 | 38 |  |  |  |  | 192 | $cv->send($task); | 
| 64 | 38 |  |  |  |  | 5398 | }); | 
| 65 |  |  |  |  |  |  |  | 
| 66 | 38 |  |  |  |  | 4008 | return $cv; | 
| 67 |  |  |  |  |  |  | } | 
| 68 |  |  |  |  |  |  |  | 
| 69 |  |  |  |  |  |  | sub spawn { | 
| 70 | 27 |  |  | 27 | 0 | 57 | my $self = shift; | 
| 71 | 27 | 50 |  |  |  | 80 | return if $self->stopped; | 
| 72 |  |  |  |  |  |  |  | 
| 73 | 27 |  |  |  |  | 113 | my ($child, $parent) = portable_socketpair; | 
| 74 |  |  |  |  |  |  |  | 
| 75 | 27 |  |  |  |  | 1561 | fh_nonblocking $child, 1; | 
| 76 |  |  |  |  |  |  | my $handle = AnyEvent::Handle->new( | 
| 77 |  |  |  |  |  |  | fh => $child, | 
| 78 | 0 |  |  | 0 |  | 0 | on_eol => sub{ warn "$$ EOL: @_" }, | 
| 79 | 0 |  |  | 0 |  | 0 | on_error => sub{ warn "$$ ERROR: @_" }, | 
| 80 | 27 |  |  |  |  | 775 | ); | 
| 81 |  |  |  |  |  |  |  | 
| 82 |  |  |  |  |  |  | my $forked = fork_call { | 
| 83 | 0 |  |  | 0 |  | 0 | close $child; | 
| 84 | 0 |  |  |  |  | 0 | local $| = 1; | 
| 85 |  |  |  |  |  |  |  | 
| 86 | 0 |  |  |  |  | 0 | my $count = 0; | 
| 87 |  |  |  |  |  |  |  | 
| 88 | 0 |  |  |  |  | 0 | while (defined(my $line = <$parent>)) { | 
| 89 | 0 |  |  |  |  | 0 | my $task = AnyEvent::ProcessPool::Task->decode($line); | 
| 90 | 0 |  |  |  |  | 0 | $task->execute; | 
| 91 |  |  |  |  |  |  |  | 
| 92 | 0 |  |  |  |  | 0 | syswrite $parent, $task->encode . "\n"; | 
| 93 |  |  |  |  |  |  |  | 
| 94 | 0 | 0 | 0 |  |  | 0 | if ($self->has_limit && ++$count >= $self->limit) { | 
| 95 | 0 |  |  |  |  | 0 | break; | 
| 96 |  |  |  |  |  |  | } | 
| 97 |  |  |  |  |  |  | } | 
| 98 |  |  |  |  |  |  | } | 
| 99 |  |  |  | 14 |  |  | sub { | 
| 100 | 27 |  |  |  |  | 5478 | }; | 
| 101 |  |  |  |  |  |  |  | 
| 102 | 27 |  |  |  |  | 36571 | close $parent; | 
| 103 |  |  |  |  |  |  |  | 
| 104 | 27 |  |  |  |  | 436 | $self->handle($handle); | 
| 105 | 27 |  |  |  |  | 609 | $self->count(0); | 
| 106 |  |  |  |  |  |  | } | 
| 107 |  |  |  |  |  |  |  | 
| 108 |  |  |  |  |  |  | 1; | 
| 109 |  |  |  |  |  |  |  | 
| 110 |  |  |  |  |  |  | __END__ |