File Coverage

lib/AnyEvent/ForkObject.pm
Criterion Covered Total %
statement 31 33 93.9
branch n/a
condition n/a
subroutine 11 11 100.0
pod n/a
total 42 44 95.4


line stmt bran cond sub pod time code
1             package AnyEvent::ForkObject;
2              
3 1     1   60595 use 5.010001;
  1         4  
  1         38  
4 1     1   6 use strict;
  1         2  
  1         34  
5 1     1   6 use warnings;
  1         6  
  1         32  
6              
7 1     1   6 use Carp;
  1         1  
  1         101  
8 1     1   10 use AnyEvent;
  1         2  
  1         17  
9 1     1   966 use AnyEvent::Util;
  1         17479  
  1         111  
10 1     1   1234 use AnyEvent::Handle;
  1         9536  
  1         51  
11 1     1   13 use Scalar::Util qw(weaken blessed reftype);
  1         3  
  1         143  
12 1     1   1112 use POSIX;
  1         8710  
  1         10  
13 1     1   4845 use IO::Handle;
  1         9566  
  1         59  
14 1     1   603 use AnyEvent::Serialize qw(:all);
  0            
  0            
15             use AnyEvent::Tools qw(mutex);
16             use Devel::GlobalDestruction;
17              
18              
19             our $VERSION = '0.09';
20              
21             sub new
22             {
23             my ($class) = @_;
24              
25             my $self = bless { } => ref($class) || $class;
26             my ($s1, $s2) = portable_socketpair;
27              
28             if ($self->{pid} = fork) {
29             # parent
30             $self->{mutex} = mutex;
31             close $s2;
32             fh_nonblocking $s1, 1;
33             {
34             weaken(my $self = $self);
35             $self->{handle} = new AnyEvent::Handle
36             fh => $s1,
37             on_error => sub {
38             return unless $self;
39             return if $self->{destroyed};
40             delete $self->{handle};
41             $self->{fatal} = $!;
42             $self->{cb}(fatal => $self->{fatal}) if $self->{cb};
43             };
44             }
45             } elsif (defined $self->{pid}) {
46             # child
47             close $s1;
48             $self->{socket} = $s2;
49             $self->{object} = {};
50             $self->{no} = 0;
51             $self->_start_server;
52             } else {
53             die $!;
54             }
55              
56             return $self;
57             }
58              
59             sub do :method
60             {
61             my ($self, %opts) = @_;
62             my $method = $opts{method} || 'new';
63             my $invocant = $opts{module} || $opts{_invocant};
64             my $cb = $opts{cb} || sub { };
65             my $args = $opts{args} || [];
66             my $wantarray = $opts{wantarray};
67             my $require = $opts{require};
68             $wantarray = 0 unless exists $opts{wantarray};
69              
70             weaken $self;
71             $self->{mutex}->lock(sub {
72             my ($guard) = @_;
73             return unless $self;
74             return if $self->{destroyed};
75              
76             $self->{cb} = $cb;
77              
78             unless ($self->{handle}) {
79             $cb->(fatal => 'Child process was destroyed');
80             undef $guard;
81             return;
82             }
83              
84             if ($self->{fatal}) {
85             $cb->(fatal => $self->{fatal});
86             delete $self->{cb};
87             undef $guard;
88             return;
89             }
90              
91             serialize {
92             $require ? (r => $require) : (
93             i => $invocant,
94             m => $method,
95             a => $args,
96             wa => $wantarray
97             )
98             } => sub {
99             return unless $self;
100             return if $self->{destroyed} or $self->{fatal};
101              
102             $self->{handle}->push_write("$_[0]\n");
103             return unless $self;
104             return if $self->{destroyed} or $self->{fatal};
105              
106             $self->{handle}->push_read(line => "\n", sub {
107             deserialize $_[1] => sub {
108             return unless $self;
109             return if $self->{destroyed} or $self->{fatal};
110              
111             my ($o, $error, $tail) = @_;
112              
113             if ($error) {
114             $cb->(fatal => $error);
115             delete $self->{cb};
116             undef $guard;
117             return;
118             }
119              
120             my $status = shift @$o;
121             if ($status eq 'ok') {
122             for (@$o) {
123             if (exists $_->{obj}) {
124             $_ = bless {
125             no => "$_->{obj}",
126             fo => \$self,
127             } => 'AnyEvent::ForkObject::OneObject';
128             next;
129             }
130              
131             $_ = $_->{res};
132             }
133             $cb->(ok => @$o);
134             } else {
135             $cb->($status => @$o);
136             }
137             delete $self->{cb};
138             undef $guard;
139             };
140             return;
141             });
142              
143             return;
144             };
145             });
146              
147             return;
148             }
149              
150              
151             sub DESTROY
152             {
153             my ($self) = @_;
154             $self->{destroyed} = 1;
155             $self->{handle}->push_write("'bye'\n") if $self->{handle};
156             delete $self->{handle};
157              
158             return if in_global_destruction;
159              
160             # kill zombies
161             my $cw;
162             $cw = AE::child $self->{pid} => sub {
163             my ($pid, $code) = @_;
164             undef $cw;
165             };
166             }
167              
168             sub _start_server
169             {
170             my ($self) = @_;
171             croak "Something wrong" if $self->{pid};
172             my $err_code = 0;
173              
174             require Data::StreamSerializer;
175              
176             my $socket = $self->{socket};
177             $socket->autoflush(1);
178             while(<$socket>) {
179             my $response;
180             next unless /\S/;
181             my $cmd = eval $_;
182             if ($@) {
183             $err_code = 1;
184             last;
185             }
186              
187             unless (ref $cmd) {
188             if ($cmd eq 'bye') {
189             undef $_ for values %{ $self->{object} };
190             delete $self->{object};
191             last;
192             }
193              
194             eval $cmd;
195              
196             if ($@) {
197             $response = [ die => $@ ];
198             goto RESPONSE;
199             }
200              
201             $response = [ 'ok' ];
202             goto RESPONSE;
203             }
204              
205             # require
206             if ($cmd->{r}) {
207             eval "require $cmd->{r}";
208             if ($@) {
209             $response = [ die => $@ ];
210             goto RESPONSE;
211             }
212              
213             $response = [ 'ok' ];
214             goto RESPONSE;
215             }
216              
217              
218             my ($invocant, $method, $args, $wantarray) = @$cmd{qw(i m a wa)};
219             if ($invocant =~ /^\d+$/) {
220             if ($method eq 'DESTROY') {
221             delete $self->{object}{$invocant};
222             $response = [ 'ok' ];
223             goto RESPONSE;
224             } else {
225             $invocant = $self->{object}{$invocant}
226             }
227             }
228              
229             my @o;
230              
231             if ($method eq 'fo_attr') {
232             unless (ref $invocant) {
233             $response = [ die => 'fo_attr should be called as method' ];
234             goto RESPONSE;
235             }
236              
237             if ('ARRAY' eq reftype $invocant) {
238             $invocant->[ $args->[0] ] = $args->[1] if @$args > 1;
239             $o[0] = $invocant->[ $args->[0] ];
240             } elsif ('HASH' eq reftype $invocant) {
241             $invocant->{ $args->[0] } = $args->[1] if @$args > 1;
242             $o[0] = $invocant->{ $args->[0] };
243             } else {
244             $response = [
245             die => "fo_attr can't access on blessed " .
246             reftype $invocant
247             ];
248             goto RESPONSE;
249             }
250              
251             } else {
252             if ($wantarray) {
253             @o = eval { $invocant -> $method ( @$args ) };
254             } elsif (defined $wantarray) {
255             $o[0] = eval { $invocant -> $method ( @$args ) };
256             } else {
257             eval { $invocant -> $method ( @$args ) };
258             }
259              
260             if ($@) {
261             $response = [ die => $@ ];
262             goto RESPONSE;
263             }
264             }
265              
266             for (@o) {
267             if (ref $_ and blessed $_) {
268             my $no = ++$self->{no};
269             $self->{object}{$no} = $_;
270              
271             $_ = { obj => $no };
272             next;
273             }
274              
275             $_ = { res => $_ };
276             }
277              
278             $response = [ ok => @o ];
279              
280             RESPONSE:
281             my $sr = new Data::StreamSerializer($response);
282             while(defined(my $part = $sr->next)) {
283             print $socket $part;
284             }
285             print $socket "\n";
286             }
287              
288             # destroy internal objects
289             delete $self->{object};
290              
291             # we don't want to call any other destructors
292             POSIX::_exit($err_code);
293             }
294              
295             package AnyEvent::ForkObject::OneObject;
296             use Carp;
297             use Scalar::Util qw(blessed);
298             use Devel::GlobalDestruction;
299              
300             sub AUTOLOAD
301             {
302             our $AUTOLOAD;
303             my ($foo) = $AUTOLOAD =~ /([^:]+)$/;
304              
305             my ($self, @args) = @_;
306             my $cb = pop @args;
307             my $wantarray = 0;
308             if ('CODE' ne ref $cb) {
309             $wantarray = $cb;
310             $cb = pop @args;
311             }
312             croak "Callback is required" unless 'CODE' eq ref $cb;
313              
314             my $fo = $self->{fo};
315              
316             unless ($$fo) {
317             $cb->(fatal => 'Child process was already destroyed');
318             return;
319             }
320              
321             $$fo -> do(
322             _invocant => $self->{no},
323             method => $foo,
324             args => \@args,
325             cb => $cb,
326             wantarray => $wantarray
327             );
328             return;
329             }
330              
331             sub DESTROY
332             {
333             # You can call DESTROY by hand
334             my ($self, $cb) = @_;
335             return if in_global_destruction;
336             $cb ||= sub { };
337             my $fo = $self->{fo};
338             unless (blessed $$fo) {
339             $cb->(fatal => 'Child process was already destroyed');
340             return;
341             }
342              
343             $$fo -> do(
344             _invocant => $self->{no},
345             method => 'DESTROY',
346             cb => $cb,
347             wantarray => undef,
348             );
349             return;
350             }
351              
352             1;
353             __END__