File Coverage

blib/lib/Async/Event/Interval.pm
Criterion Covered Total %
statement 136 169 80.4
branch 41 62 66.1
condition 13 15 86.6
subroutine 36 37 97.3
pod 15 15 100.0
total 241 298 80.8


line stmt bran cond sub pod time code
1             package Async::Event::Interval;
2              
3 22     22   198424 use warnings;
  22         144  
  22         588  
4 22     22   103 use strict;
  22         35  
  22         775  
5              
6             our $VERSION = '1.10';
7              
8 22     22   106 use Carp qw(croak);
  22         44  
  22         1562  
9 22     22   9880 use IPC::Shareable;
  22         732851  
  22         1126  
10 22     22   11462 use Parallel::ForkManager;
  22         1458171  
  22         42018  
11              
12             $SIG{CHLD} = 'IGNORE';
13             $SIG{__WARN__} = sub {
14             my $warn = shift;
15             warn $warn if $warn !~ /^child process/;
16             };
17              
18             my $id = 0;
19              
20             my %events;
21             my $shared_memory_protect_lock = _rand_shm_lock();
22              
23             my $shared_memory_segment_created;
24              
25             while (! $shared_memory_segment_created) {
26             $shared_memory_segment_created = eval {
27             tie %events, 'IPC::Shareable', {
28             key => _rand_shm_key(),
29             create => 1,
30             exclusive => 1,
31             protected => _shm_lock(),
32             mode => 0600,
33             destroy => 1
34             };
35             1;
36             };
37             }
38              
39             *restart = \&start;
40              
41             sub new {
42 29     29 1 19701 my $self = bless {}, shift;
43              
44 29         269 $self->id($id);
45 29         53 $id++;
46              
47 29         94 $events{$self->id} = {};
48              
49 29         38122 $self->_pm;
50 29         156 $self->_setup(@_);
51 29         131 $self->_started(0);
52              
53 29         102 return $self;
54             }
55             sub error {
56 24     24 1 69 my ($self) = @_;
57 24         124 $self->status;
58 24 100 100     65 return $self->pid && $self->pid == -99 ? 1 : 0;
59             }
60             sub errors {
61 3     3 1 2202 my ($self) = @_;
62 3   50     52 return $self->_errors || 0;
63             }
64             sub error_message {
65 2     2 1 2133 my ($self) = @_;
66 2         27 return $self->_error_message;
67             }
68             sub events {
69 32     32 1 2204605 return \%events;
70             }
71             sub id {
72 478     478 1 5002 my ($self, $id) = @_;
73 478 100       1485 $self->{id} = $id if defined $id;
74 478         5515 return $_[0]->{id};
75             }
76             sub info {
77 21     21 1 77960 my ($self) = @_;
78 21         85 return $self->events()->{$self->id};
79             }
80             sub interval {
81 30     30 1 2087 my ($self, $interval) = @_;
82              
83 30 50       117 if (defined $interval) {
84 30 50 66     416 if ($interval !~ /^\d+$/ && $interval !~ /^(\d+)?\.\d+$/) {
85 0         0 croak "\$interval must be an integer or float";
86             }
87 30         130 $events{$self->id}{interval} = $interval;
88             }
89              
90 30         23831 return $events{$self->id}->{interval};
91             }
92             sub pid {
93 240     240 1 730 my ($self) = @_;
94 240         793 return $self->_pid;
95             }
96             sub runs {
97 8     8 1 2105239 my ($self) = @_;
98 8   100     70 return $self->_runs || 0;
99             }
100             sub shared_scalar {
101 9     9 1 16025 my ($self) = @_;
102              
103 9         16 my $shm_key;
104 9         15 my $unique_shm_key_found = 0;
105              
106 9         32 for (0..9) {
107 18         7492 $shm_key = _rand_shm_key();
108 18 100       184 if (! exists $events{$self->id}->{shared_scalars}{$shm_key}) {
109 8         13204 $unique_shm_key_found = 1;
110 8         29 last;
111             }
112             }
113              
114 9 100       904 if (! $unique_shm_key_found) {
115 1         191 croak("Could not generate a unique shared memory segment.");
116             }
117              
118 8         64 tie my $scalar, 'IPC::Shareable', $shm_key, {create => 1, destroy => 1};
119              
120 8         3551 $events{$self->id}->{shared_scalars}{$shm_key} = \$scalar;
121              
122 8         8312 return \$scalar;
123             }
124             sub start {
125 26     26 1 17398 my ($self) = @_;
126 26 100       109 if ($self->_started){
127 2         131 warn "Event already running...\n";
128 2         20 return;
129             }
130 24         89 $self->_started(1);
131 24         97 $self->_event;
132             }
133             sub status {
134 40     40 1 2288 my ($self) = @_;
135              
136 40 100       191 if ($self->_started){
137 18 50       100 if (! $self->pid){
138 0         0 croak "Event is started, but no PID can be found. This is a " .
139             "fatal error. Exiting...\n";
140             }
141 18 50       70 if ($self->pid > 0){
142 18 100       83 if (kill 0, $self->pid){
143 11         67 return $self->pid;
144             }
145             else {
146             # proc must have crashed
147 7         77 $self->_started(0);
148 7         60 $self->_pid(-99);
149 7         80 $self->error;
150             }
151             }
152             }
153 29         148 return 0;
154             }
155             sub stop {
156 33     33 1 9407935 my $self = shift;
157              
158 33 50       211 if ($self->pid){
159 33         185 kill 9, $self->pid;
160              
161 33         330 $self->_started(0);
162              
163             # time to ensure the proc was killed
164              
165 33         33007145 sleep 1;
166              
167 33 50       784 if (kill 0, $self->pid){
168 0         0 croak "Event stop was called, but the process hasn't been killed. " .
169             "This is a fatal event. Exiting...\n";
170             }
171             }
172             }
173             sub waiting {
174 7     7 1 341 my ($self) = @_;
175 7 100 100     33 return 1 if $self->error || ! $self->status;
176 1         29 return 0;
177             }
178              
179             sub _args {
180 29     29   80 my ($self, $args) = @_;
181              
182 29 50       89 if (defined $args) {
183 29         79 $self->{args} = $args;
184             }
185              
186 29         79 return $self->{args};
187             }
188             sub _cb {
189 29     29   95 my ($self, $cb) = @_;
190              
191 29 50       123 if (defined $cb) {
192 29 50       138 croak "Callback must be a code reference." if ref $cb ne 'CODE';
193 29         80 $self->{cb} = $cb;
194             }
195              
196 29         54 return $self->{cb};
197             }
198             sub _errors {
199 3     3   14 my ($self, $increment) = @_;
200 3 50       22 $events{$self->id}->{errors}++ if defined $increment;
201 3         15 return $events{$self->id}->{errors};
202             }
203             sub _error_message {
204 2     2   8 my ($self, $msg) = @_;
205 2 50       7 $events{$self->id}->{error_message} = $msg if defined $msg;
206 2         9 return $events{$self->id}->{error_message};
207             }
208             sub _event {
209 24     24   66 my ($self) = @_;
210              
211 24         107 for (0..1){
212 24         89 my $pid = $self->_pm->start;
213 24 50       36380 if ($pid){
214             # this is the parent process
215 24         929 $self->_pid($pid);
216 24         299 last;
217             }
218              
219             # set the child's proc id
220              
221 0         0 $self->_pid($$);
222              
223             # if no interval, run only once
224              
225 0 0       0 if ($self->interval) {
226 0         0 while (1) {
227 0         0 select(undef, undef, undef, $self->interval);
228              
229 0         0 my $callback_success = eval {
230 0         0 $self->_cb->(@{ $self->_args });
  0         0  
231 0         0 1;
232             };
233              
234 0 0       0 if (! $callback_success) {
235 0         0 $self->_errors(1);
236 0         0 $self->_error_message($@);
237 0         0 $self->_runs(1);
238 0         0 $self->status;
239 0         0 croak $@;
240             }
241              
242 0         0 $self->_runs(1);
243 0         0 $self->status;
244             }
245             }
246             else {
247              
248 0         0 my $callback_success = eval {
249 0         0 $self->_cb->(@{$self->_args});
  0         0  
250 0         0 1;
251             };
252              
253 0 0       0 if (! $callback_success) {
254 0         0 $self->_errors(1);
255 0         0 $self->_error_message($@);
256 0         0 $self->_runs(1);
257 0         0 $self->status;
258 0         0 croak $@;
259             }
260              
261 0         0 $self->_runs(1);
262 0         0 $self->status;
263             }
264              
265 0         0 $self->_pm->finish;
266             }
267             }
268             sub _pm {
269 53     53   142 my ($self) = @_;
270              
271 53 100       227 if (! exists $self->{pm}) {
272 29         580 $self->{pm} = Parallel::ForkManager->new(1);
273             }
274              
275 53         82287 return $self->{pm};
276             }
277             sub _pid {
278 273     273   936 my ($self, $pid) = @_;
279 273 100       1255 $self->{pid} = $pid if defined $pid;
280 273 100       1739 $events{$self->id}->{pid} = $self->{pid} if $self->{pid};
281 273   100     211826 return $self->{pid} || undef;
282             }
283             sub _rand_shm_key {
284 29     29   59 my $key_str;
285              
286 29         108 for (0..11) {
287 348         4001 srand();
288 348         1104 $key_str .= ('A'..'Z')[rand(26)];
289             }
290              
291 29         127 return $key_str;
292             }
293             sub _rand_shm_lock {
294             # Used for the 'protected' option in the %events hash creation
295              
296 22     22   786 srand();
297 22         166 return int(rand(1_000_000));
298             }
299             sub _runs {
300 8     8   36 my ($self, $increment) = @_;
301 8 50       48 $events{$self->id}->{runs}++ if defined $increment;
302 8         58 return $events{$self->id}->{runs};
303             }
304             sub _setup {
305 29     29   159 my ($self, $interval, $cb, @args) = @_;
306 29         125 $self->interval($interval);
307 29         14992 $self->_cb($cb);
308 29         116 $self->_args(\@args);
309             }
310             sub _shm_lock {
311 48     48   590 return $shared_memory_protect_lock;
312             }
313             sub _started {
314 159     159   387 my ($self, $started) = @_;
315 159 100       495 $self->{started} = $started if defined $started;
316 159         379 return $self->{started};
317             }
318             sub DESTROY {
319 29 100   29   1050366 $_[0]->stop if $_[0]->pid;
320              
321             # On events with interval of zero, ForkManager runs finish(), which
322             # calls our destroy method. We only want to blow away the %events
323             # hash if we truly go out of scope
324              
325 29 50       319 return if (caller())[0] eq 'Parallel::ForkManager::Child';
326              
327 29         214 delete $events{$_[0]->id};
328             }
329             sub _end {
330 25 50   25   4225 if (keys %events) {
331 0         0 warn "The following events remain: " . join(', ', keys %events);
332             }
333              
334 25         5198 IPC::Shareable::clean_up_protected(_shm_lock());
335             }
336             END {
337 22     22   70526 _end();
338             }
339       0     sub _vim{}
340              
341             1;
342              
343             __END__