File Coverage

blib/lib/Async/Event/Interval.pm
Criterion Covered Total %
statement 26 169 15.3
branch 1 62 1.6
condition 0 15 0.0
subroutine 10 37 27.0
pod 15 15 100.0
total 52 298 17.4


line stmt bran cond sub pod time code
1             package Async::Event::Interval;
2              
3 22     22   181704 use warnings;
  22         138  
  22         606  
4 22     22   97 use strict;
  22         31  
  22         755  
5              
6             our $VERSION = '1.11';
7              
8 22     22   114 use Carp qw(croak);
  22         28  
  22         1648  
9 22     22   10108 use IPC::Shareable;
  22         799726  
  22         1275  
10 22     22   23931 use Parallel::ForkManager;
  22         1533555  
  22         42749  
11              
12             $SIG{CHLD} = 'IGNORE';
13             $SIG{__WARN__} = sub {
14             my $warn = shift;
15             warn $warn if $warn !~ /^child process/;
16             };
17              
18             my $id = 0;
19              
20             my %events;
21             my $shared_memory_protect_lock = _rand_shm_lock();
22              
23             my $shared_memory_segment_created;
24              
25             while (! $shared_memory_segment_created) {
26             $shared_memory_segment_created = eval {
27             tie %events, 'IPC::Shareable', {
28             key => _rand_shm_key(),
29             create => 1,
30             exclusive => 1,
31             protected => _shm_lock(),
32             mode => 0600,
33             destroy => 1
34             };
35             1;
36             };
37             }
38              
39             *restart = \&start;
40              
41             sub new {
42 0     0 1 0 my $self = bless {}, shift;
43              
44 0         0 $self->id($id);
45 0         0 $id++;
46              
47 0         0 $events{$self->id} = {};
48              
49 0         0 $self->_pm;
50 0         0 $self->_setup(@_);
51 0         0 $self->_started(0);
52              
53 0         0 return $self;
54             }
55             sub error {
56 0     0 1 0 my ($self) = @_;
57 0         0 $self->status;
58 0 0 0     0 return $self->pid && $self->pid == -99 ? 1 : 0;
59             }
60             sub errors {
61 0     0 1 0 my ($self) = @_;
62 0   0     0 return $self->_errors || 0;
63             }
64             sub error_message {
65 0     0 1 0 my ($self) = @_;
66 0         0 return $self->_error_message;
67             }
68             sub events {
69 0     0 1 0 return \%events;
70             }
71             sub id {
72 0     0 1 0 my ($self, $id) = @_;
73 0 0       0 $self->{id} = $id if defined $id;
74 0         0 return $_[0]->{id};
75             }
76             sub info {
77 0     0 1 0 my ($self) = @_;
78 0         0 return $self->events()->{$self->id};
79             }
80             sub interval {
81 0     0 1 0 my ($self, $interval) = @_;
82              
83 0 0       0 if (defined $interval) {
84 0 0 0     0 if ($interval !~ /^\d+$/ && $interval !~ /^(\d+)?\.\d+$/) {
85 0         0 croak "\$interval must be an integer or float";
86             }
87 0         0 $events{$self->id}{interval} = $interval;
88             }
89              
90 0         0 return $events{$self->id}->{interval};
91             }
92             sub pid {
93 0     0 1 0 my ($self) = @_;
94 0         0 return $self->_pid;
95             }
96             sub runs {
97 0     0 1 0 my ($self) = @_;
98 0   0     0 return $self->_runs || 0;
99             }
100             sub shared_scalar {
101 0     0 1 0 my ($self) = @_;
102              
103 0         0 my $shm_key;
104 0         0 my $unique_shm_key_found = 0;
105              
106 0         0 for (0..9) {
107 0         0 $shm_key = _rand_shm_key();
108 0 0       0 if (! exists $events{$self->id}->{shared_scalars}{$shm_key}) {
109 0         0 $unique_shm_key_found = 1;
110 0         0 last;
111             }
112             }
113              
114 0 0       0 if (! $unique_shm_key_found) {
115 0         0 croak("Could not generate a unique shared memory segment.");
116             }
117              
118 0         0 tie my $scalar, 'IPC::Shareable', $shm_key, {create => 1, destroy => 1};
119              
120 0         0 $events{$self->id}->{shared_scalars}{$shm_key} = \$scalar;
121              
122 0         0 return \$scalar;
123             }
124             sub start {
125 0     0 1 0 my ($self) = @_;
126 0 0       0 if ($self->_started){
127 0         0 warn "Event already running...\n";
128 0         0 return;
129             }
130 0         0 $self->_started(1);
131 0         0 $self->_event;
132             }
133             sub status {
134 0     0 1 0 my ($self) = @_;
135              
136 0 0       0 if ($self->_started){
137 0 0       0 if (! $self->pid){
138 0         0 croak "Event is started, but no PID can be found. This is a " .
139             "fatal error. Exiting...\n";
140             }
141 0 0       0 if ($self->pid > 0){
142 0 0       0 if (kill 0, $self->pid){
143 0         0 return $self->pid;
144             }
145             else {
146             # proc must have crashed
147 0         0 $self->_started(0);
148 0         0 $self->_pid(-99);
149 0         0 $self->error;
150             }
151             }
152             }
153 0         0 return 0;
154             }
155             sub stop {
156 0     0 1 0 my $self = shift;
157              
158 0 0       0 if ($self->pid){
159 0         0 kill 9, $self->pid;
160              
161 0         0 $self->_started(0);
162              
163             # time to ensure the proc was killed
164              
165 0         0 sleep 1;
166              
167 0 0       0 if (kill 0, $self->pid){
168 0         0 croak "Event stop was called, but the process hasn't been killed. " .
169             "This is a fatal event. Exiting...\n";
170             }
171             }
172             }
173             sub waiting {
174 0     0 1 0 my ($self) = @_;
175 0 0 0     0 return 1 if $self->error || ! $self->status;
176 0         0 return 0;
177             }
178              
179             sub _args {
180 0     0   0 my ($self, $args) = @_;
181              
182 0 0       0 if (defined $args) {
183 0         0 $self->{args} = $args;
184             }
185              
186 0         0 return $self->{args};
187             }
188             sub _cb {
189 0     0   0 my ($self, $cb) = @_;
190              
191 0 0       0 if (defined $cb) {
192 0 0       0 croak "Callback must be a code reference." if ref $cb ne 'CODE';
193 0         0 $self->{cb} = $cb;
194             }
195              
196 0         0 return $self->{cb};
197             }
198             sub _errors {
199 0     0   0 my ($self, $increment) = @_;
200 0 0       0 $events{$self->id}->{errors}++ if defined $increment;
201 0         0 return $events{$self->id}->{errors};
202             }
203             sub _error_message {
204 0     0   0 my ($self, $msg) = @_;
205 0 0       0 $events{$self->id}->{error_message} = $msg if defined $msg;
206 0         0 return $events{$self->id}->{error_message};
207             }
208             sub _event {
209 0     0   0 my ($self) = @_;
210              
211 0         0 for (0..1){
212 0         0 my $pid = $self->_pm->start;
213 0 0       0 if ($pid){
214             # this is the parent process
215 0         0 $self->_pid($pid);
216 0         0 last;
217             }
218              
219             # set the child's proc id
220              
221 0         0 $self->_pid($$);
222              
223             # if no interval, run only once
224              
225 0 0       0 if ($self->interval) {
226 0         0 while (1) {
227 0         0 select(undef, undef, undef, $self->interval);
228              
229 0         0 my $callback_success = eval {
230 0         0 $self->_cb->(@{ $self->_args });
  0         0  
231 0         0 1;
232             };
233              
234 0 0       0 if (! $callback_success) {
235 0         0 $self->_errors(1);
236 0         0 $self->_error_message($@);
237 0         0 $self->_runs(1);
238 0         0 $self->status;
239 0         0 croak $@;
240             }
241              
242 0         0 $self->_runs(1);
243 0         0 $self->status;
244             }
245             }
246             else {
247              
248 0         0 my $callback_success = eval {
249 0         0 $self->_cb->(@{$self->_args});
  0         0  
250 0         0 1;
251             };
252              
253 0 0       0 if (! $callback_success) {
254 0         0 $self->_errors(1);
255 0         0 $self->_error_message($@);
256 0         0 $self->_runs(1);
257 0         0 $self->status;
258 0         0 croak $@;
259             }
260              
261 0         0 $self->_runs(1);
262 0         0 $self->status;
263             }
264              
265 0         0 $self->_pm->finish;
266             }
267             }
268             sub _pm {
269 0     0   0 my ($self) = @_;
270              
271 0 0       0 if (! exists $self->{pm}) {
272 0         0 $self->{pm} = Parallel::ForkManager->new(1);
273             }
274              
275 0         0 return $self->{pm};
276             }
277             sub _pid {
278 0     0   0 my ($self, $pid) = @_;
279 0 0       0 $self->{pid} = $pid if defined $pid;
280 0 0       0 $events{$self->id}->{pid} = $self->{pid} if $self->{pid};
281 0   0     0 return $self->{pid} || undef;
282             }
283             sub _rand_shm_key {
284 22     22   37 my $key_str;
285              
286 22         76 for (0..11) {
287 264         2882 srand();
288 264         914 $key_str .= ('A'..'Z')[rand(26)];
289             }
290              
291 22         91 return $key_str;
292             }
293             sub _rand_shm_lock {
294             # Used for the 'protected' option in the %events hash creation
295              
296 22     22   914 srand();
297 22         168 return int(rand(1_000_000));
298             }
299             sub _runs {
300 0     0   0 my ($self, $increment) = @_;
301 0 0       0 $events{$self->id}->{runs}++ if defined $increment;
302 0         0 return $events{$self->id}->{runs};
303             }
304             sub _setup {
305 0     0   0 my ($self, $interval, $cb, @args) = @_;
306 0         0 $self->interval($interval);
307 0         0 $self->_cb($cb);
308 0         0 $self->_args(\@args);
309             }
310             sub _shm_lock {
311 44     44   440 return $shared_memory_protect_lock;
312             }
313             sub _started {
314 0     0   0 my ($self, $started) = @_;
315 0 0       0 $self->{started} = $started if defined $started;
316 0         0 return $self->{started};
317             }
318             sub DESTROY {
319 0 0   0   0 $_[0]->stop if $_[0]->pid;
320              
321             # On events with interval of zero, ForkManager runs finish(), which
322             # calls our destroy method. We only want to blow away the %events
323             # hash if we truly go out of scope
324              
325 0 0       0 return if (caller())[0] eq 'Parallel::ForkManager::Child';
326              
327 0         0 delete $events{$_[0]->id};
328             }
329             sub _end {
330 22 50   22   365 if (keys %events) {
331 0         0 warn "The following events remain: " . join(', ', keys %events);
332             }
333              
334 22         5955 IPC::Shareable::clean_up_protected(_shm_lock());
335             }
336             END {
337 22     22   29119 _end();
338             }
339       0     sub _vim{}
340              
341             1;
342              
343             __END__