File Coverage

blib/lib/Async/Event/Interval.pm
Criterion Covered Total %
statement 29 173 16.7
branch 1 62 1.6
condition 0 15 0.0
subroutine 11 38 28.9
pod 15 15 100.0
total 56 303 18.4


line stmt bran cond sub pod time code
1             package Async::Event::Interval;
2              
3 1     1   56275 use warnings;
  1         10  
  1         27  
4 1     1   4 use strict;
  1         1  
  1         51  
5              
6             our $VERSION = '1.12';
7              
8 1     1   5 use Carp qw(croak);
  1         1  
  1         34  
9 1     1   546 use Data::Dumper;
  1         5752  
  1         51  
10 1     1   507 use IPC::Shareable;
  1         27876  
  1         40  
11 1     1   497 use Parallel::ForkManager;
  1         63531  
  1         1915  
12              
13             $SIG{CHLD} = 'IGNORE';
14             $SIG{__WARN__} = sub {
15             my $warn = shift;
16             warn $warn if $warn !~ /^child process/;
17             };
18              
19             my $id = 0;
20              
21             my %events;
22             my $shared_memory_protect_lock = _rand_shm_lock();
23              
24             my $shared_memory_segment_created;
25              
26             while (! $shared_memory_segment_created) {
27             $shared_memory_segment_created = eval {
28             tie %events, 'IPC::Shareable', {
29             key => _rand_shm_key(),
30             create => 1,
31             exclusive => 1,
32             protected => _shm_lock(),
33             mode => 0600,
34             destroy => 1
35             };
36             1;
37             };
38             }
39              
40             *restart = \&start;
41              
42             sub new {
43 0     0 1 0 my $self = bless {}, shift;
44              
45 0         0 $self->id($id);
46 0         0 $id++;
47              
48 0         0 $events{$self->id} = {};
49              
50 0         0 $self->_pm;
51 0         0 $self->_setup(@_);
52 0         0 $self->_started(0);
53              
54 0         0 return $self;
55             }
56             sub error {
57 0     0 1 0 my ($self) = @_;
58 0         0 $self->status;
59 0 0 0     0 return $self->pid && $self->pid == -99 ? 1 : 0;
60             }
61             sub errors {
62 0     0 1 0 my ($self) = @_;
63 0   0     0 return $self->_errors || 0;
64             }
65             sub error_message {
66 0     0 1 0 my ($self) = @_;
67 0         0 return $self->_error_message;
68             }
69             sub events {
70 0     0 1 0 return \%events;
71             }
72             sub id {
73 0     0 1 0 my ($self, $id) = @_;
74 0 0       0 $self->{id} = $id if defined $id;
75 0         0 return $_[0]->{id};
76             }
77             sub info {
78 0     0 1 0 my ($self) = @_;
79 0         0 return $self->events()->{$self->id};
80             }
81             sub interval {
82 0     0 1 0 my ($self, $interval) = @_;
83              
84 0 0       0 if (defined $interval) {
85 0 0 0     0 if ($interval !~ /^\d+$/ && $interval !~ /^(\d+)?\.\d+$/) {
86 0         0 croak "\$interval must be an integer or float";
87             }
88 0         0 $events{$self->id}{interval} = $interval;
89             }
90              
91 0         0 return $events{$self->id}->{interval};
92             }
93             sub pid {
94 0     0 1 0 my ($self) = @_;
95 0         0 return $self->_pid;
96             }
97             sub runs {
98 0     0 1 0 my ($self) = @_;
99 0   0     0 return $self->_runs || 0;
100             }
101             sub shared_scalar {
102 0     0 1 0 my ($self) = @_;
103              
104 0         0 my $shm_key;
105 0         0 my $unique_shm_key_found = 0;
106              
107 0         0 for (0..9) {
108 0         0 $shm_key = _rand_shm_key();
109 0 0       0 if (! exists $events{$self->id}->{shared_scalars}{$shm_key}) {
110 0         0 $unique_shm_key_found = 1;
111 0         0 last;
112             }
113             }
114              
115 0 0       0 if (! $unique_shm_key_found) {
116 0         0 croak("Could not generate a unique shared memory segment.");
117             }
118              
119 0         0 tie my $scalar, 'IPC::Shareable', $shm_key, {create => 1, destroy => 1};
120              
121 0         0 $events{$self->id}->{shared_scalars}{$shm_key} = \$scalar;
122              
123 0         0 return \$scalar;
124             }
125             sub start {
126 0     0 1 0 my ($self) = @_;
127 0 0       0 if ($self->_started){
128 0         0 warn "Event already running...\n";
129 0         0 return;
130             }
131 0         0 $self->_started(1);
132 0         0 $self->_event;
133             }
134             sub status {
135 0     0 1 0 my ($self) = @_;
136              
137 0 0       0 if ($self->_started){
138 0 0       0 if (! $self->pid){
139 0         0 croak "Event is started, but no PID can be found. This is a " .
140             "fatal error. Exiting...\n";
141             }
142 0 0       0 if ($self->pid > 0){
143 0 0       0 if (kill 0, $self->pid){
144 0         0 return $self->pid;
145             }
146             else {
147             # proc must have crashed
148 0         0 $self->_started(0);
149 0         0 $self->_pid(-99);
150 0         0 $self->error;
151             }
152             }
153             }
154 0         0 return 0;
155             }
156             sub stop {
157 0     0 1 0 my $self = shift;
158              
159 0 0       0 if ($self->pid){
160 0         0 kill 9, $self->pid;
161              
162 0         0 $self->_started(0);
163              
164             # time to ensure the proc was killed
165              
166 0         0 sleep 1;
167              
168 0 0       0 if (kill 0, $self->pid){
169 0         0 croak "Event stop was called, but the process hasn't been killed. " .
170             "This is a fatal event. Exiting...\n";
171             }
172             }
173             }
174             sub waiting {
175 0     0 1 0 my ($self) = @_;
176 0 0 0     0 return 1 if $self->error || ! $self->status;
177 0         0 return 0;
178             }
179              
180             sub _args {
181 0     0   0 my ($self, $args) = @_;
182              
183 0 0       0 if (defined $args) {
184 0         0 $self->{args} = $args;
185             }
186              
187 0         0 return $self->{args};
188             }
189             sub _cb {
190 0     0   0 my ($self, $cb) = @_;
191              
192 0 0       0 if (defined $cb) {
193 0 0       0 croak "Callback must be a code reference." if ref $cb ne 'CODE';
194 0         0 $self->{cb} = $cb;
195             }
196              
197 0         0 return $self->{cb};
198             }
199             sub _errors {
200 0     0   0 my ($self, $increment) = @_;
201 0 0       0 $events{$self->id}->{errors}++ if defined $increment;
202 0         0 return $events{$self->id}->{errors};
203             }
204             sub _error_message {
205 0     0   0 my ($self, $msg) = @_;
206 0 0       0 $events{$self->id}->{error_message} = $msg if defined $msg;
207 0         0 return $events{$self->id}->{error_message};
208             }
209             sub _event {
210 0     0   0 my ($self) = @_;
211              
212 0         0 for (0..1){
213 0         0 my $pid = $self->_pm->start;
214 0 0       0 if ($pid){
215             # this is the parent process
216 0         0 $self->_pid($pid);
217 0         0 last;
218             }
219              
220             # set the child's proc id
221              
222 0         0 $self->_pid($$);
223              
224             # if no interval, run only once
225              
226 0 0       0 if ($self->interval) {
227 0         0 while (1) {
228 0         0 select(undef, undef, undef, $self->interval);
229              
230 0         0 my $callback_success = eval {
231 0         0 $self->_cb->(@{ $self->_args });
  0         0  
232 0         0 1;
233             };
234              
235 0 0       0 if (! $callback_success) {
236 0         0 $self->_errors(1);
237 0         0 $self->_error_message($@);
238 0         0 $self->_runs(1);
239 0         0 $self->status;
240 0         0 croak $@;
241             }
242              
243 0         0 $self->_runs(1);
244 0         0 $self->status;
245             }
246             }
247             else {
248 0         0 my $callback_success = eval {
249 0         0 $self->_cb->(@{$self->_args});
  0         0  
250 0         0 1;
251             };
252              
253 0 0       0 if (! $callback_success) {
254 0         0 $self->_errors(1);
255 0         0 $self->_error_message($@);
256 0         0 $self->_runs(1);
257 0         0 $self->status;
258 0         0 croak $@;
259             }
260              
261 0         0 $self->_runs(1);
262 0         0 $self->status;
263             }
264              
265 0         0 $self->_pm->finish;
266             }
267             }
268             sub _pm {
269 0     0   0 my ($self) = @_;
270              
271 0 0       0 if (! exists $self->{pm}) {
272 0         0 $self->{pm} = Parallel::ForkManager->new(1);
273             }
274              
275 0         0 return $self->{pm};
276             }
277             sub _pid {
278 0     0   0 my ($self, $pid) = @_;
279 0 0       0 if (defined $pid) {
280 0         0 $self->{pid} = $pid;
281 0         0 $events{$self->id}->{pid} = $self->{pid};
282             }
283 0   0     0 return $self->{pid} || undef;
284             }
285             sub _rand_shm_key {
286 1     1   2 my $key_str;
287              
288 1         2 for (0..11) {
289 12         162 srand();
290 12         36 $key_str .= ('A'..'Z')[rand(26)];
291             }
292              
293 1         4 return $key_str;
294             }
295             sub _rand_shm_lock {
296             # Used for the 'protected' option in the %events hash creation
297              
298 1     1   29 srand();
299 1         6 return int(rand(1_000_000));
300             }
301             sub _runs {
302 0     0   0 my ($self, $increment) = @_;
303 0 0       0 $events{$self->id}->{runs}++ if defined $increment;
304 0         0 return $events{$self->id}->{runs};
305             }
306             sub _setup {
307 0     0   0 my ($self, $interval, $cb, @args) = @_;
308 0         0 $self->interval($interval);
309 0         0 $self->_cb($cb);
310 0         0 $self->_args(\@args);
311             }
312             sub _shm_lock {
313 2     2   15 return $shared_memory_protect_lock;
314             }
315             sub _started {
316 0     0   0 my ($self, $started) = @_;
317 0 0       0 $self->{started} = $started if defined $started;
318 0         0 return $self->{started};
319             }
320             sub DESTROY {
321 0 0   0   0 if (defined $_[0]) {
322 0 0       0 $_[0]->stop if $_[0]->pid;
323             }
324              
325             # On events with interval of zero, ForkManager runs finish(), which
326             # calls our destroy method. We only want to blow away the %events
327             # hash if we truly go out of scope
328              
329 0 0       0 return if (caller())[0] eq 'Parallel::ForkManager::Child';
330              
331 0         0 delete $events{$_[0]->id};
332             }
333             sub _end {
334 1 50   1   7 if (! keys %events) {
335 1         229 IPC::Shareable::clean_up_protected(_shm_lock());
336             }
337             }
338             END {
339 1     1   894 _end();
340             }
341       0     sub _vim{}
342              
343             1;
344              
345             __END__