line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package AnyEvent::Worker; |
2
|
|
|
|
|
|
|
|
3
|
3
|
|
|
3
|
|
196901
|
use 5.006; |
|
3
|
|
|
|
|
11
|
|
|
3
|
|
|
|
|
122
|
|
4
|
3
|
|
|
3
|
|
2641
|
use common::sense 2;m{ |
|
3
|
|
|
|
|
107
|
|
|
3
|
|
|
|
|
19
|
|
5
|
|
|
|
|
|
|
use warnings; |
6
|
|
|
|
|
|
|
use strict; |
7
|
|
|
|
|
|
|
}x; |
8
|
|
|
|
|
|
|
|
9
|
|
|
|
|
|
|
=head1 NAME |
10
|
|
|
|
|
|
|
|
11
|
|
|
|
|
|
|
AnyEvent::Worker - Manage blocking task in external process |
12
|
|
|
|
|
|
|
|
13
|
|
|
|
|
|
|
=head1 SYNOPSIS |
14
|
|
|
|
|
|
|
|
15
|
|
|
|
|
|
|
use AnyEvent 5; |
16
|
|
|
|
|
|
|
use AnyEvent::Worker; |
17
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
my $worker1 = AnyEvent::Worker->new( [ 'Actual::Worker::Class' => @init_args ] ); |
19
|
|
|
|
|
|
|
my $worker2 = AnyEvent::Worker->new( sub { return "Cb 1 @_"; } ); |
20
|
|
|
|
|
|
|
my $worker3 = AnyEvent::Worker->new( { |
21
|
|
|
|
|
|
|
class => 'Actual::Worker::Class2', |
22
|
|
|
|
|
|
|
new => 'create', # alternative constructor |
23
|
|
|
|
|
|
|
args => [qw(arg1 arg2)], |
24
|
|
|
|
|
|
|
} ); |
25
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
# Invoke method `test' on Actual::Worker::Class with arguments @args |
27
|
|
|
|
|
|
|
$worker1->do( test => @args , sub { |
28
|
|
|
|
|
|
|
return warn "Request died: $@" if $@; |
29
|
|
|
|
|
|
|
warn "Received response: @_"; |
30
|
|
|
|
|
|
|
}); |
31
|
|
|
|
|
|
|
|
32
|
|
|
|
|
|
|
# Just call callback, passed to worker2 with arguments @args |
33
|
|
|
|
|
|
|
$worker2->do( @args , sub { |
34
|
|
|
|
|
|
|
return warn "Request died: $@" if $@; |
35
|
|
|
|
|
|
|
warn "Received response: @_"; |
36
|
|
|
|
|
|
|
}); |
37
|
|
|
|
|
|
|
|
38
|
|
|
|
|
|
|
=head1 CONSTRUCTOR |
39
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
=head2 new $cb->($req), %args |
41
|
|
|
|
|
|
|
|
42
|
|
|
|
|
|
|
Simple stateless worker. On any C a sub sill be invoked with C arguments |
43
|
|
|
|
|
|
|
|
44
|
|
|
|
|
|
|
=head2 new [ Class => @new_args ], %args |
45
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
Stateful, object-based worker. After fork, Class will we C |
47
|
|
|
|
|
|
|
|
48
|
|
|
|
|
|
|
First argument to C will be interpreted as object method, rest -- as method arguments. |
49
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
=head2 new { class => 'Class', args => \@new_args, new => 'constructor_method' }, %args |
51
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
Same as previous, but allow to pass optional constructor name in C arg |
53
|
|
|
|
|
|
|
|
54
|
|
|
|
|
|
|
=head2 $args{on_error} = $cb->($worker,$error,$fatal,$file,$line) |
55
|
|
|
|
|
|
|
|
56
|
|
|
|
|
|
|
When an unexpected error occurs (for ex: child process exited or killed) C callback will be invoked |
57
|
|
|
|
|
|
|
|
58
|
|
|
|
|
|
|
=head1 METHODS |
59
|
|
|
|
|
|
|
|
60
|
|
|
|
|
|
|
=head2 do @args, $cb->($res) |
61
|
|
|
|
|
|
|
|
62
|
|
|
|
|
|
|
Only for stateless worker. |
63
|
|
|
|
|
|
|
|
64
|
|
|
|
|
|
|
=head2 do method => @args, $cb->($res) |
65
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
Only for stateful worker. |
67
|
|
|
|
|
|
|
|
68
|
|
|
|
|
|
|
=cut |
69
|
|
|
|
|
|
|
|
70
|
3
|
|
|
3
|
|
274
|
use Carp; |
|
3
|
|
|
|
|
10
|
|
|
3
|
|
|
|
|
183
|
|
71
|
3
|
|
|
3
|
|
934
|
use Socket (); |
|
3
|
|
|
|
|
3850
|
|
|
3
|
|
|
|
|
74
|
|
72
|
3
|
|
|
3
|
|
17
|
use Scalar::Util (); |
|
3
|
|
|
|
|
5
|
|
|
3
|
|
|
|
|
46
|
|
73
|
3
|
|
|
3
|
|
3317
|
use Storable (); |
|
3
|
|
|
|
|
11390
|
|
|
3
|
|
|
|
|
77
|
|
74
|
|
|
|
|
|
|
|
75
|
3
|
|
|
3
|
|
1738
|
use AnyEvent (); |
|
3
|
|
|
|
|
6264
|
|
|
3
|
|
|
|
|
60
|
|
76
|
3
|
|
|
3
|
|
1005
|
use AnyEvent::Util (); |
|
3
|
|
|
|
|
8797
|
|
|
3
|
|
|
|
|
54
|
|
77
|
|
|
|
|
|
|
|
78
|
3
|
|
|
3
|
|
18
|
use Errno (); |
|
3
|
|
|
|
|
8
|
|
|
3
|
|
|
|
|
9016
|
|
79
|
3
|
|
|
3
|
|
35
|
use Fcntl (); |
|
3
|
|
|
|
|
5
|
|
|
3
|
|
|
|
|
47
|
|
80
|
3
|
|
|
3
|
|
1053
|
use POSIX (); |
|
3
|
|
|
|
|
9081
|
|
|
3
|
|
|
|
|
11739
|
|
81
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
our $VERSION = '0.06'; |
83
|
|
|
|
|
|
|
our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023; |
84
|
|
|
|
|
|
|
|
85
|
|
|
|
|
|
|
# Almost fully derived from AnyEvent::DBI |
86
|
|
|
|
|
|
|
|
87
|
|
|
|
|
|
|
our $WORKER; |
88
|
|
|
|
|
|
|
|
89
|
|
|
|
|
|
|
sub serve_fh($$) { |
90
|
0
|
|
|
0
|
0
|
0
|
my ($fh, $version) = @_; |
91
|
|
|
|
|
|
|
|
92
|
0
|
0
|
|
|
|
0
|
if ($VERSION != $version) { |
93
|
0
|
|
|
|
|
0
|
syswrite $fh, |
94
|
|
|
|
|
|
|
pack "L/a*", |
95
|
|
|
|
|
|
|
Storable::freeze |
96
|
|
|
|
|
|
|
[undef, __PACKAGE__." version mismatch ($VERSION vs. $version)"]; |
97
|
0
|
|
|
|
|
0
|
return; |
98
|
|
|
|
|
|
|
} |
99
|
|
|
|
|
|
|
|
100
|
0
|
|
|
|
|
0
|
eval { |
101
|
0
|
|
|
|
|
0
|
my $rbuf; |
102
|
0
|
0
|
|
|
|
0
|
my $name = ref $WORKER eq 'CODE' ? __PACKAGE__ : ref $WORKER; |
103
|
0
|
|
|
|
|
0
|
$0 .= ' - '.$name; |
104
|
0
|
|
|
|
|
0
|
my $O = $0; |
105
|
0
|
|
|
|
|
0
|
my $N = 0; |
106
|
0
|
|
|
|
|
0
|
while () { |
107
|
0
|
0
|
|
|
|
0
|
sysread $fh, $rbuf, 16384, length $rbuf |
108
|
|
|
|
|
|
|
or last; |
109
|
|
|
|
|
|
|
|
110
|
0
|
|
|
|
|
0
|
while () { |
111
|
0
|
|
|
|
|
0
|
my $len = unpack "L", $rbuf; |
112
|
|
|
|
|
|
|
|
113
|
|
|
|
|
|
|
# full request available? |
114
|
0
|
0
|
0
|
|
|
0
|
last unless $len && $len + 4 <= length $rbuf; |
115
|
|
|
|
|
|
|
|
116
|
0
|
|
|
|
|
0
|
my $req = Storable::thaw substr $rbuf, 4; |
117
|
0
|
|
|
|
|
0
|
substr $rbuf, 0, $len + 4, ""; # remove length + request |
118
|
0
|
|
|
|
|
0
|
local $@; |
119
|
0
|
|
|
|
|
0
|
my $wbuf = eval { |
120
|
0
|
|
|
|
|
0
|
++$N; |
121
|
0
|
0
|
|
|
|
0
|
if (ref $WORKER eq 'CODE') { |
122
|
0
|
|
|
|
|
0
|
local $0 = "$O : request $N"; |
123
|
0
|
|
|
|
|
0
|
pack "L/a*", Storable::freeze [ 1, $WORKER->(@$req) ]; |
124
|
|
|
|
|
|
|
} else { |
125
|
0
|
|
|
|
|
0
|
my $method = shift @$req; |
126
|
|
|
|
|
|
|
#warn ">> request $method"; |
127
|
0
|
|
|
|
|
0
|
local $0 = "$O : request $N : $method"; |
128
|
0
|
|
|
|
|
0
|
pack "L/a*", Storable::freeze [ 1, $WORKER->$method(@$req) ]; |
129
|
|
|
|
|
|
|
} |
130
|
|
|
|
|
|
|
}; |
131
|
|
|
|
|
|
|
# warn if $@; |
132
|
0
|
|
|
|
|
0
|
$0 = "$O : idle"; |
133
|
0
|
0
|
|
|
|
0
|
$wbuf = pack "L/a*", Storable::freeze [ undef, ref $@ ? $@ : "$@" ] |
|
|
0
|
|
|
|
|
|
134
|
|
|
|
|
|
|
if $@; |
135
|
|
|
|
|
|
|
|
136
|
|
|
|
|
|
|
#warn "<< response"; |
137
|
0
|
|
|
|
|
0
|
for (my $ofs = 0; $ofs < length $wbuf; ) { |
138
|
0
|
|
|
|
|
0
|
my $wr = syswrite $fh, $wbuf, length($wbuf), $ofs; |
139
|
0
|
0
|
0
|
|
|
0
|
defined $wr or $!{EINTR} or die "unable to write results: $!"; |
140
|
0
|
|
|
|
|
0
|
$ofs += $wr; |
141
|
|
|
|
|
|
|
} |
142
|
|
|
|
|
|
|
} |
143
|
|
|
|
|
|
|
} |
144
|
|
|
|
|
|
|
}; |
145
|
0
|
0
|
|
|
|
0
|
warn if $@; |
146
|
|
|
|
|
|
|
} |
147
|
|
|
|
|
|
|
|
148
|
|
|
|
|
|
|
sub serve_fd($$) { |
149
|
0
|
0
|
|
0
|
0
|
0
|
open my $fh, ">>&=$_[0]" |
150
|
|
|
|
|
|
|
or die "Couldn't open server file descriptor: $!"; |
151
|
|
|
|
|
|
|
|
152
|
0
|
|
|
|
|
0
|
serve_fh $fh, $_[1]; |
153
|
|
|
|
|
|
|
} |
154
|
|
|
|
|
|
|
|
155
|
|
|
|
|
|
|
# stupid Storable autoloading, total loss-loss situation |
156
|
|
|
|
|
|
|
Storable::thaw Storable::freeze []; |
157
|
|
|
|
|
|
|
|
158
|
|
|
|
|
|
|
=head1 METHODS |
159
|
|
|
|
|
|
|
|
160
|
|
|
|
|
|
|
=over 4 |
161
|
|
|
|
|
|
|
|
162
|
|
|
|
|
|
|
=cut |
163
|
|
|
|
|
|
|
|
164
|
|
|
|
|
|
|
sub new { |
165
|
5
|
|
|
5
|
1
|
496
|
my ($class, $cb, %arg) = @_; |
166
|
|
|
|
|
|
|
|
167
|
5
|
50
|
|
|
|
69
|
my ($client, $server) = AnyEvent::Util::portable_socketpair |
168
|
|
|
|
|
|
|
or croak "unable to create Anyevent::Worker communications pipe: $!"; |
169
|
5
|
|
|
|
|
451
|
binmode $client, ':raw'; |
170
|
5
|
|
|
|
|
18
|
binmode $server, ':raw'; |
171
|
|
|
|
|
|
|
|
172
|
5
|
|
|
|
|
17
|
my $self = bless \%arg, $class; |
173
|
5
|
|
|
|
|
43
|
$self->{fh} = $client; |
174
|
|
|
|
|
|
|
|
175
|
5
|
|
|
|
|
65
|
AnyEvent::Util::fh_nonblocking $client, 1; |
176
|
|
|
|
|
|
|
|
177
|
5
|
|
|
|
|
74
|
my $rbuf; |
178
|
5
|
|
|
|
|
64
|
my @caller = (caller)[1,2]; # the "default" caller |
179
|
|
|
|
|
|
|
|
180
|
|
|
|
|
|
|
{ |
181
|
5
|
|
|
|
|
8
|
Scalar::Util::weaken (my $self = $self); |
|
5
|
|
|
|
|
23
|
|
182
|
|
|
|
|
|
|
|
183
|
|
|
|
|
|
|
$self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub { |
184
|
6
|
50
|
|
6
|
|
3106945
|
return unless $self; |
185
|
|
|
|
|
|
|
|
186
|
6
|
|
|
|
|
86
|
$self->{last_activity} = AnyEvent->now; |
187
|
|
|
|
|
|
|
|
188
|
6
|
|
|
|
|
275
|
my $len = sysread $client, $rbuf, 65536, length $rbuf; |
189
|
|
|
|
|
|
|
|
190
|
6
|
100
|
|
|
|
40
|
if ($len > 0) { |
|
|
50
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
191
|
|
|
|
|
|
|
# we received data, so reset the timer |
192
|
|
|
|
|
|
|
|
193
|
5
|
|
|
|
|
12
|
while () { |
194
|
11
|
|
|
|
|
33
|
my $len = unpack "L", $rbuf; |
195
|
|
|
|
|
|
|
|
196
|
|
|
|
|
|
|
# full response available? |
197
|
11
|
100
|
66
|
|
|
95
|
last unless $len && $len + 4 <= length $rbuf; |
198
|
6
|
|
|
|
|
44
|
my $res = Storable::thaw substr $rbuf, 4; |
199
|
6
|
|
|
|
|
151
|
substr $rbuf, 0, $len + 4, ""; # remove length + request |
200
|
|
|
|
|
|
|
|
201
|
6
|
50
|
|
|
|
19
|
last unless $self; |
202
|
6
|
|
|
|
|
9
|
my $req = shift @{ $self->{queue} }; |
|
6
|
|
|
|
|
23
|
|
203
|
|
|
|
|
|
|
|
204
|
6
|
100
|
|
|
|
18
|
if (defined $res->[0]) { |
205
|
3
|
|
|
|
|
11
|
$res->[0] = $self; |
206
|
3
|
|
|
|
|
18
|
$req->[0](@$res); |
207
|
|
|
|
|
|
|
} else { |
208
|
3
|
|
|
|
|
7
|
my $cb = shift @$req; |
209
|
|
|
|
|
|
|
{ |
210
|
3
|
|
|
|
|
4
|
local $@ = $res->[1]; |
|
3
|
|
|
|
|
7
|
|
211
|
3
|
|
|
|
|
96
|
$@ =~ s{\n$}{}; |
212
|
3
|
|
|
|
|
14
|
$cb->($self); |
213
|
|
|
|
|
|
|
} |
214
|
|
|
|
|
|
|
} |
215
|
|
|
|
|
|
|
|
216
|
|
|
|
|
|
|
# no more queued requests, so become idle |
217
|
6
|
|
|
|
|
75
|
undef $self->{last_activity} |
218
|
6
|
100
|
66
|
|
|
10051
|
if $self && !@{ $self->{queue} }; |
219
|
|
|
|
|
|
|
} |
220
|
|
|
|
|
|
|
|
221
|
|
|
|
|
|
|
} |
222
|
|
|
|
|
|
|
elsif (defined $len) { |
223
|
|
|
|
|
|
|
# todo, caller? |
224
|
1
|
|
|
|
|
13
|
$self->_error ("unexpected eof", @caller, 1); |
225
|
|
|
|
|
|
|
} |
226
|
|
|
|
|
|
|
elsif ($! != Errno::EAGAIN) { |
227
|
|
|
|
|
|
|
# todo, caller? |
228
|
0
|
|
|
|
|
0
|
$self->_error ("read error ".(0+$!).": $!", @caller, 1); |
229
|
|
|
|
|
|
|
} |
230
|
5
|
|
|
|
|
192
|
}); |
231
|
|
|
|
|
|
|
|
232
|
|
|
|
|
|
|
$self->{tw_cb} = sub { |
233
|
0
|
0
|
0
|
0
|
|
0
|
if ($self->{timeout} && $self->{last_activity}) { |
234
|
0
|
0
|
|
|
|
0
|
if (AnyEvent->now > $self->{last_activity} + $self->{timeout}) { |
235
|
|
|
|
|
|
|
# we did time out |
236
|
0
|
|
|
|
|
0
|
my $req = $self->{queue}[0]; |
237
|
0
|
|
|
|
|
0
|
$self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal |
238
|
|
|
|
|
|
|
} else { |
239
|
|
|
|
|
|
|
# we need to re-set the timeout watcher |
240
|
0
|
|
|
|
|
0
|
$self->{tw} = AnyEvent->timer ( |
241
|
|
|
|
|
|
|
after => $self->{last_activity} + $self->{timeout} - AnyEvent->now, |
242
|
|
|
|
|
|
|
cb => $self->{tw_cb}, |
243
|
|
|
|
|
|
|
); |
244
|
0
|
|
|
|
|
0
|
Scalar::Util::weaken $self; |
245
|
|
|
|
|
|
|
} |
246
|
|
|
|
|
|
|
} else { |
247
|
|
|
|
|
|
|
# no timeout check wanted, or idle |
248
|
0
|
|
|
|
|
0
|
undef $self->{tw}; |
249
|
|
|
|
|
|
|
} |
250
|
5
|
|
|
|
|
1349
|
}; |
251
|
|
|
|
|
|
|
|
252
|
|
|
|
|
|
|
$self->{ww_cb} = sub { |
253
|
0
|
0
|
|
0
|
|
0
|
return unless $self; |
254
|
|
|
|
|
|
|
|
255
|
0
|
|
|
|
|
0
|
$self->{last_activity} = AnyEvent->now; |
256
|
|
|
|
|
|
|
|
257
|
0
|
0
|
|
|
|
0
|
my $len = syswrite $client, $self->{wbuf} |
258
|
|
|
|
|
|
|
or return delete $self->{ww}; |
259
|
|
|
|
|
|
|
|
260
|
0
|
|
|
|
|
0
|
substr $self->{wbuf}, 0, $len, ""; |
261
|
5
|
|
|
|
|
92
|
}; |
262
|
|
|
|
|
|
|
} |
263
|
|
|
|
|
|
|
|
264
|
5
|
|
|
|
|
4769
|
my $pid = fork; |
265
|
|
|
|
|
|
|
|
266
|
5
|
50
|
|
|
|
357
|
if ($pid) { |
|
|
0
|
|
|
|
|
|
267
|
|
|
|
|
|
|
# parent |
268
|
5
|
|
|
|
|
292
|
close $server; |
269
|
|
|
|
|
|
|
} |
270
|
|
|
|
|
|
|
elsif (defined $pid) { |
271
|
|
|
|
|
|
|
# child |
272
|
0
|
|
|
|
|
0
|
$SIG{INT} = 'IGNORE'; |
273
|
0
|
|
|
|
|
0
|
my $serv_fno = fileno $server; |
274
|
|
|
|
|
|
|
|
275
|
0
|
|
0
|
|
|
0
|
($_ != $serv_fno) && POSIX::close $_ for $^F+1..$FD_MAX; |
276
|
|
|
|
|
|
|
|
277
|
0
|
0
|
|
|
|
0
|
if (ref $cb eq 'CODE'){ |
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
278
|
0
|
|
|
|
|
0
|
$WORKER = $cb; |
279
|
|
|
|
|
|
|
} |
280
|
|
|
|
|
|
|
elsif ( ref $cb eq 'ARRAY') { |
281
|
0
|
|
|
|
|
0
|
my ( $class,@args ) = @$cb; |
282
|
0
|
0
|
0
|
|
|
0
|
eval qq{ use $class; 1 } or croak($@) unless $class->can('new'); |
283
|
0
|
|
|
|
|
0
|
$WORKER = $class->new(@args); |
284
|
|
|
|
|
|
|
} |
285
|
|
|
|
|
|
|
elsif ( ref $cb eq 'HASH') { |
286
|
0
|
0
|
|
|
|
0
|
my $class = $cb->{class} or croak "You should define class to construct"; |
287
|
0
|
|
0
|
|
|
0
|
my $new = $cb->{new} || 'new'; |
288
|
0
|
0
|
0
|
|
|
0
|
eval qq{ use $class; 1 } or croak($@) unless $class->can($new); |
289
|
0
|
0
|
|
|
|
0
|
$WORKER = $class->$new(@{ $cb->{args} || [] }); |
|
0
|
|
|
|
|
0
|
|
290
|
|
|
|
|
|
|
} |
291
|
|
|
|
|
|
|
else { |
292
|
0
|
|
|
|
|
0
|
croak "Bad argument: $cb"; |
293
|
|
|
|
|
|
|
} |
294
|
|
|
|
|
|
|
|
295
|
0
|
|
|
|
|
0
|
serve_fh $server, $VERSION; |
296
|
|
|
|
|
|
|
|
297
|
|
|
|
|
|
|
# no other way on the broken windows platform, even this leaks |
298
|
|
|
|
|
|
|
# memory and might fail. |
299
|
0
|
|
|
|
|
0
|
kill 9, $$ if AnyEvent::WIN32; |
300
|
|
|
|
|
|
|
|
301
|
|
|
|
|
|
|
# and this kills the parent process on windows |
302
|
0
|
|
|
|
|
0
|
POSIX::_exit 0; |
303
|
|
|
|
|
|
|
} |
304
|
|
|
|
|
|
|
else { |
305
|
0
|
|
|
|
|
0
|
croak "fork: $!"; |
306
|
|
|
|
|
|
|
} |
307
|
5
|
|
|
|
|
660
|
$self->{child_pid} = $pid; |
308
|
5
|
|
|
|
|
753
|
$self |
309
|
|
|
|
|
|
|
} |
310
|
|
|
|
|
|
|
|
311
|
|
|
|
|
|
|
sub _server_pid { |
312
|
0
|
|
|
0
|
|
0
|
shift->{child_pid} |
313
|
|
|
|
|
|
|
} |
314
|
|
|
|
|
|
|
|
315
|
|
|
|
|
|
|
our %KIDW; |
316
|
|
|
|
|
|
|
our %TERM; |
317
|
|
|
|
|
|
|
|
318
|
|
|
|
|
|
|
sub kill_child { |
319
|
6
|
|
|
6
|
0
|
10
|
my $self = shift; |
320
|
6
|
|
|
|
|
23
|
my $child_pid = delete $self->{child_pid}; |
321
|
6
|
|
|
|
|
10
|
my $GD = 0; |
322
|
|
|
|
|
|
|
{ |
323
|
6
|
50
|
|
6
|
|
12
|
local $SIG{__WARN__} = sub { $GD = 1 if $_[0] =~ / during global destruction\.\s*$/ }; |
|
6
|
|
|
|
|
77
|
|
|
6
|
|
|
|
|
58
|
|
324
|
6
|
|
|
|
|
62
|
warn 'test'; |
325
|
|
|
|
|
|
|
} |
326
|
|
|
|
|
|
|
#print STDERR "killing $child_pid / $GD\n"; |
327
|
6
|
100
|
|
|
|
218
|
if ($child_pid) { |
328
|
|
|
|
|
|
|
# send SIGKILL in two seconds |
329
|
5
|
|
|
|
|
24
|
$TERM{$child_pid}++; |
330
|
5
|
50
|
33
|
|
|
247
|
kill 0 => $child_pid and |
|
|
|
33
|
|
|
|
|
331
|
|
|
|
|
|
|
kill TERM => $child_pid or $!{ESRCH} or warn "kill $child_pid: $!"; |
332
|
5
|
50
|
|
|
|
18
|
return if $GD; |
333
|
|
|
|
|
|
|
# MAYBE: kill timer |
334
|
|
|
|
|
|
|
#my $murder_timer = AnyEvent->timer ( |
335
|
|
|
|
|
|
|
# after => 2, |
336
|
|
|
|
|
|
|
# cb => sub { |
337
|
|
|
|
|
|
|
# kill 9, $child_pid |
338
|
|
|
|
|
|
|
# and delete $TERM{$child_pid}; |
339
|
|
|
|
|
|
|
# }, |
340
|
|
|
|
|
|
|
#); |
341
|
|
|
|
|
|
|
|
342
|
|
|
|
|
|
|
# reap process |
343
|
|
|
|
|
|
|
#print STDERR "start reaper $child_pid\n"; |
344
|
|
|
|
|
|
|
$KIDW{$child_pid} = AnyEvent->child ( |
345
|
|
|
|
|
|
|
pid => $child_pid, |
346
|
|
|
|
|
|
|
cb => sub { |
347
|
|
|
|
|
|
|
# just hold on to this so it won't go away |
348
|
|
|
|
|
|
|
#print STDERR "reaped $child_pid\n"; |
349
|
2
|
|
|
2
|
|
12740
|
delete $TERM{$child_pid}; |
350
|
2
|
|
|
|
|
10
|
delete $KIDW{$child_pid}; |
351
|
|
|
|
|
|
|
# cancel SIGKILL |
352
|
|
|
|
|
|
|
#undef $murder_timer; |
353
|
|
|
|
|
|
|
}, |
354
|
5
|
|
|
|
|
173
|
); |
355
|
|
|
|
|
|
|
|
356
|
5
|
|
|
|
|
322
|
close $self->{fh}; |
357
|
|
|
|
|
|
|
} |
358
|
|
|
|
|
|
|
} |
359
|
|
|
|
|
|
|
sub END { |
360
|
3
|
|
|
3
|
|
629
|
my $GD = 0; |
361
|
|
|
|
|
|
|
{ |
362
|
3
|
50
|
|
|
|
6
|
local $SIG{__WARN__} = sub { $GD = 1 if $_[0] =~ / during global destruction\.\s*$/ }; |
|
3
|
|
|
|
|
27
|
|
|
3
|
|
|
|
|
36
|
|
363
|
3
|
|
|
|
|
26
|
warn 'test'; |
364
|
|
|
|
|
|
|
} |
365
|
|
|
|
|
|
|
#print STDERR "END $!/$? GD=$GD\n"; |
366
|
3
|
|
|
|
|
14
|
for (keys %TERM) { |
367
|
3
|
|
|
|
|
1348
|
delete $KIDW{$_}; |
368
|
|
|
|
|
|
|
#print STDERR "END kill $_\n"; |
369
|
3
|
50
|
|
|
|
347
|
kill 0 => $_ and do { |
370
|
3
|
50
|
|
|
|
82
|
kill KILL => $_ or warn "kill $_ failed: $!"; |
371
|
|
|
|
|
|
|
#print STDERR "END waitpid $_\n"; |
372
|
3
|
|
|
|
|
4926
|
my $wp = waitpid $_,0; |
373
|
|
|
|
|
|
|
#print STDERR "END waitpid $_ = $wp\n"; |
374
|
|
|
|
|
|
|
}; |
375
|
|
|
|
|
|
|
#print STDERR "END $_ ($!/$?/${^CHILD_ERROR_NATIVE})\n"; |
376
|
|
|
|
|
|
|
} |
377
|
3
|
|
|
|
|
12
|
undef $!;undef $?; |
|
3
|
|
|
|
|
89
|
|
378
|
|
|
|
|
|
|
} |
379
|
|
|
|
|
|
|
|
380
|
|
|
|
|
|
|
sub DESTROY { |
381
|
5
|
|
|
5
|
|
2357
|
shift->kill_child; |
382
|
|
|
|
|
|
|
} |
383
|
|
|
|
|
|
|
|
384
|
|
|
|
|
|
|
sub _error { |
385
|
1
|
|
|
1
|
|
8
|
my ($self, $error, $filename, $line, $fatal) = @_; |
386
|
1
|
|
|
|
|
10
|
my $caller = ''; |
387
|
1
|
|
|
|
|
3
|
my @caller = ($filename,$line); |
388
|
1
|
50
|
|
|
|
8
|
if ($fatal) { |
389
|
1
|
|
|
|
|
3
|
delete $self->{tw}; |
390
|
1
|
|
|
|
|
17
|
delete $self->{rw}; |
391
|
1
|
|
|
|
|
22
|
delete $self->{ww}; |
392
|
1
|
|
|
|
|
3
|
delete $self->{fh}; |
393
|
|
|
|
|
|
|
|
394
|
|
|
|
|
|
|
# for fatal errors call all enqueued callbacks with error |
395
|
1
|
|
|
|
|
2
|
while (my $req = shift @{$self->{queue}}) { |
|
3
|
|
|
|
|
3904
|
|
396
|
2
|
100
|
|
|
|
10
|
@caller = ($req->[1],$req->[2]) unless $caller; |
397
|
2
|
|
66
|
|
|
20
|
$caller ||= " after $req->[1] line $req->[2],"; |
398
|
2
|
|
|
|
|
9
|
local $@ = "$error at $req->[1] line $req->[2].\n"; |
399
|
2
|
|
|
|
|
11
|
$req->[0]->($self); |
400
|
|
|
|
|
|
|
} |
401
|
1
|
|
|
|
|
6
|
$self->kill_child; |
402
|
|
|
|
|
|
|
} |
403
|
|
|
|
|
|
|
|
404
|
1
|
|
|
|
|
3
|
local $@ = $error; |
405
|
|
|
|
|
|
|
|
406
|
1
|
50
|
|
|
|
7
|
if ($self->{on_error}) { |
407
|
1
|
|
|
|
|
10
|
$self->{on_error}($self, $error, $fatal, @caller); |
408
|
|
|
|
|
|
|
} |
409
|
|
|
|
|
|
|
else { |
410
|
0
|
|
|
|
|
0
|
my $e = "$error$caller"; |
411
|
0
|
0
|
|
|
|
0
|
if ($fatal) { |
412
|
0
|
|
|
|
|
0
|
die "$e at $filename, line $line\n"; |
413
|
|
|
|
|
|
|
} else { |
414
|
0
|
|
|
|
|
0
|
warn "$e at $filename, line $line\n"; |
415
|
|
|
|
|
|
|
} |
416
|
|
|
|
|
|
|
} |
417
|
|
|
|
|
|
|
} |
418
|
|
|
|
|
|
|
|
419
|
|
|
|
|
|
|
=item $worker->on_error ($cb->($worker, $filename, $line, $fatal)) |
420
|
|
|
|
|
|
|
|
421
|
|
|
|
|
|
|
Sets (or clears, with C) the C handler. |
422
|
|
|
|
|
|
|
|
423
|
|
|
|
|
|
|
=cut |
424
|
|
|
|
|
|
|
|
425
|
|
|
|
|
|
|
sub on_error { |
426
|
0
|
|
|
0
|
1
|
0
|
$_[0]{on_error} = $_[1]; |
427
|
|
|
|
|
|
|
} |
428
|
|
|
|
|
|
|
|
429
|
|
|
|
|
|
|
=item $worker->timeout ($seconds) |
430
|
|
|
|
|
|
|
|
431
|
|
|
|
|
|
|
Sets (or clears, with C) the database timeout. Useful to extend the |
432
|
|
|
|
|
|
|
timeout when you are about to make a really long query. |
433
|
|
|
|
|
|
|
|
434
|
|
|
|
|
|
|
=cut |
435
|
|
|
|
|
|
|
|
436
|
|
|
|
|
|
|
sub timeout { |
437
|
0
|
|
|
0
|
1
|
0
|
my ($self, $timeout) = @_; |
438
|
|
|
|
|
|
|
|
439
|
0
|
|
|
|
|
0
|
$self->{timeout} = $timeout; |
440
|
|
|
|
|
|
|
|
441
|
|
|
|
|
|
|
# reschedule timer if one was running |
442
|
0
|
|
|
|
|
0
|
$self->{tw_cb}->(); |
443
|
|
|
|
|
|
|
} |
444
|
|
|
|
|
|
|
|
445
|
|
|
|
|
|
|
=item $worker->do ( @args, $cb->( $worker, @response ) ) |
446
|
|
|
|
|
|
|
|
447
|
|
|
|
|
|
|
Executes worker code and execure the callback, when response is ready |
448
|
|
|
|
|
|
|
|
449
|
|
|
|
|
|
|
=cut |
450
|
|
|
|
|
|
|
|
451
|
|
|
|
|
|
|
sub do { |
452
|
8
|
|
|
8
|
1
|
2252
|
my $self = shift; |
453
|
8
|
|
|
|
|
16
|
my $cb = pop; |
454
|
8
|
|
|
|
|
60
|
my ($filename,$line) = (caller)[1,2]; |
455
|
|
|
|
|
|
|
|
456
|
8
|
50
|
|
|
|
116
|
unless ($self->{fh}) { |
457
|
0
|
|
|
|
|
0
|
local $@ = my $err = 'no worker connection'; |
458
|
0
|
|
|
|
|
0
|
$cb->($self); |
459
|
0
|
|
|
|
|
0
|
$self->_error ($err, $filename, $line, 1); |
460
|
0
|
|
|
|
|
0
|
return; |
461
|
|
|
|
|
|
|
} |
462
|
|
|
|
|
|
|
|
463
|
8
|
|
|
|
|
28
|
push @{ $self->{queue} }, [$cb, $filename, $line]; |
|
8
|
|
|
|
|
52
|
|
464
|
|
|
|
|
|
|
|
465
|
|
|
|
|
|
|
# re-start timeout if necessary |
466
|
8
|
50
|
33
|
|
|
39
|
if ($self->{timeout} && !$self->{tw}) { |
467
|
0
|
|
|
|
|
0
|
$self->{last_activity} = AnyEvent->now; |
468
|
0
|
|
|
|
|
0
|
$self->{tw_cb}->(); |
469
|
|
|
|
|
|
|
} |
470
|
|
|
|
|
|
|
|
471
|
8
|
|
|
|
|
91
|
$self->{wbuf} .= pack "L/a*", Storable::freeze \@_; |
472
|
|
|
|
|
|
|
|
473
|
8
|
50
|
|
|
|
514
|
unless ($self->{ww}) { |
474
|
8
|
|
|
|
|
73
|
my $len = syswrite $self->{fh}, $self->{wbuf}; |
475
|
8
|
|
|
|
|
43
|
substr $self->{wbuf}, 0, $len, ""; |
476
|
|
|
|
|
|
|
|
477
|
|
|
|
|
|
|
# still any left? then install a write watcher |
478
|
8
|
50
|
|
|
|
70
|
$self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) |
479
|
|
|
|
|
|
|
if length $self->{wbuf}; |
480
|
|
|
|
|
|
|
} |
481
|
|
|
|
|
|
|
} |
482
|
|
|
|
|
|
|
|
483
|
|
|
|
|
|
|
=back |
484
|
|
|
|
|
|
|
|
485
|
|
|
|
|
|
|
=head1 AUTHOR |
486
|
|
|
|
|
|
|
|
487
|
|
|
|
|
|
|
Mons Anderson, C<< >> |
488
|
|
|
|
|
|
|
|
489
|
|
|
|
|
|
|
=head1 ACKNOWLEDGEMENTS |
490
|
|
|
|
|
|
|
|
491
|
|
|
|
|
|
|
This module based on Marc Lehmann's L |
492
|
|
|
|
|
|
|
|
493
|
|
|
|
|
|
|
Thanks to Vladimir Timofeev C<< >> for bugfixes and useful notes |
494
|
|
|
|
|
|
|
|
495
|
|
|
|
|
|
|
=cut |
496
|
|
|
|
|
|
|
|
497
|
|
|
|
|
|
|
1; # End of AnyEvent::Worker |