File Coverage

blib/lib/IO/Events.pm
Criterion Covered Total %
statement 406 574 70.7
branch 140 326 42.9
condition 24 65 36.9
subroutine 67 89 75.2
pod n/a
total 637 1054 60.4


line stmt bran cond sub pod time code
1             #
2             # Copyright (c) 2004 catpipe Systems ApS
3             # All rights reserved.
4             #
5             # Redistribution and use in source and binary forms, with or without
6             # modification, are permitted provided that the following conditions
7             # are met:
8             # 1. Redistributions of source code must retain the above copyright
9             # notice, this list of conditions and the following disclaimer.
10             # 2. Redistributions in binary form must reproduce the above copyright
11             # notice, this list of conditions and the following disclaimer in the
12             # documentation and/or other materials provided with the distribution.
13             #
14             # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15             # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16             # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17             # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18             # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19             # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20             # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21             # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22             # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23             # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24             # SUCH DAMAGE.
25             #
26              
27             # $Id: Events.pm,v 1.32 2007/03/28 08:01:59 dk Exp $
28 1     1   766 use strict;
  1         2  
  1         50  
29              
30             package IO::Events;
31 1     1   5 use vars qw($VERSION $FORK_MODE @loops);
  1         1  
  1         94  
32             $VERSION=0.6;
33              
34             # Master loop object
35             package IO::Events::Loop;
36 1     1   5 use vars qw(@ISA);
  1         5  
  1         39  
37              
38 1     1   1257 use IO::Handle;
  1         17143  
  1         68  
39 1     1   911 use Errno qw(EAGAIN);
  1         1725  
  1         141  
40 1     1   7767 use POSIX qw(sys_wait_h exit);
  1         10741  
  1         8  
41 1     1   4730 use Time::HiRes qw(time);
  1         2357  
  1         6  
42              
43             sub new
44             {
45 1     1   52 my $class = shift;
46 1         20 my $obj = bless {
47             # options
48             debug => 0,
49             timeout => 50, # seconds
50             waitpid => 1,
51             @_,
52              
53             # private fields
54             read => '',
55             write => '',
56             exc => '',
57             processes => {},
58             filenos => {},
59             ids => {},
60             timers => [],
61             }, $class;
62 1         4 push @IO::Events::loops, $obj;
63 1         3 return $obj;
64             }
65              
66             sub yield
67             {
68 7     7   18 my ( $self, %profile) = @_;
69              
70 7 50       36 my ( $ir, $iw, $ie) = (
    50          
71             $self->{read},
72             $profile{block_write} ? '' : $self-> {write},
73             $profile{block_exc} ? '' : $self-> {exc}
74             );
75              
76 7 50       25 my $timeout = exists $profile{timeout} ? $profile{timeout} : $self->{timeout};
77 7 100       8 if ( @{$self->{timers}}) {
  7         22  
78 1         4 my $time = time;
79 1         3 for my $timer (@{$self->{timers}}) {
  1         11  
80 1 50       5 next unless $timer->{active};
81 1         10 my $sleep = $timer->{alert} - $time;
82 1 50       6 $timeout = $sleep if $timeout > $sleep;
83             }
84 1 50       5 $timeout = 0 if $timeout < 0;
85             }
86 7         11524 my $n = select( $ir, $iw, $ie, $timeout);
87              
88 7 100       13 if ( @{$self->{timers}}) {
  7         35  
89 1         17 my $time = time;
90 1         2 for my $timer (@{$self->{timers}}) {
  1         7  
91 1 50 33     47 next if not $timer->{active} or $time < $timer->{alert};
92 1         19 $timer-> notify;
93             }
94             }
95            
96 7 100       25 unless ( $n > 0) {
97 1 50       7 if ( $self->{debug}) {
98 0         0 print STDERR "IO::Events: empty select";
99 0 0       0 if ( $n < 0) {
100 0         0 print STDERR " error:$!";
101             }
102 0         0 print STDERR "\n";
103             }
104 1         13 goto WAITPID;
105             }
106              
107 6         8 my $i;
108 6         18 my $lnx = (sort { $a <=> $b } map { length } ( $ir, $iw, $ie))[-1] * 8;
  12         26  
  18         63  
109 6         17 for ( $i = 0; $i < $lnx; $i++) {
110 80         227 my ( $r, $w, $e) = ( vec( $ir, $i, 1), vec( $iw, $i, 1), vec( $ie, $i, 1));
111 80 100 100     1476 next unless $r || $w || $e;
      66        
112 9         12 my $task;
113 9 50 33     74 if ( exists $self-> {filenos}-> {$i} &&
114             exists $self->{ids}->{$self-> {filenos}-> {$i}}) {
115 9         27 $task = $self->{ids}->{$self-> {filenos}-> {$i}};
116             } else {
117 0 0       0 print STDERR "IO::Events: runaway handle $i/$self->{filenos}->{$i}\n"
118             if $self->{debug};
119 0         0 $self-> error( undef, 'select');
120 0         0 next;
121             }
122 9 100 66     40 if ( $task-> {callback} and not $task-> {dead} ) {
123 2         7 $task-> {callback}-> ( $task, $r, $w, $e);
124             }
125              
126 9 100 66     56 if ( $r and not $task-> {dead}) {
127 6         9 my $nbytes;
128 6 100       17 if ( $task-> {read} > -1) {
129 3 50       74 $nbytes = sysread( $task->{handle}, $task->{read_buffer},
130             $profile{block_read} ? 0 : 65536, length ($task->{read_buffer}));
131             }
132 6 100       19 if ( $task->{read} > 0) {
133 3 50       9 print STDERR "IO::Events: # $i read $nbytes bytes\n"
134             if $self->{debug};
135 3 50       8 unless ( defined $nbytes) {
136 0 0       0 $self-> error( $task, 'read') unless $! == EAGAIN;
137 0         0 next;
138             }
139             } else {
140 3 50       15 $nbytes = 1 unless defined $nbytes; # simulate read
141 3 50       25 print STDERR "IO::Events: # $i simulated read $nbytes\n"
142             if $self->{debug};
143             }
144 6 50       15 next if $profile{block_read};
145 6 50       13 if ( $nbytes > 0) {
146 6         17 $task-> notify('on_read');
147 6         18 next;
148             }
149 0 0       0 $task-> destroy unless $task-> {pid};
150             }
151              
152 3 50 33     30 if ( $w and not $task-> {dead}) {
153 3 50       24 unless ( length $task->{write_buffer}) {
154 3         11 vec( $self->{write}, $task-> {fileno}, 1) = 0;
155 3         10 $task-> notify('on_write');
156 3         10 next;
157             }
158 0         0 my $nbytes = syswrite( $task->{handle}, $task->{write_buffer});
159 0 0       0 print STDERR "IO::Events: # $i wrote $nbytes bytes\n"
160             if $self->{debug};
161 0 0       0 unless ( defined $nbytes) {
162 0 0       0 $self-> error( $task, 'write') unless $! == EAGAIN;
163 0         0 next;
164             }
165 0 0       0 if ( $nbytes > 0) {
166 0         0 substr( $task->{write_buffer}, 0, $nbytes) = '';
167 0 0       0 unless ( length $task->{write_buffer}) {
168 0         0 vec( $self->{write}, $task-> {fileno}, 1) = 0;
169 0         0 $task-> notify('on_write');
170             }
171 0         0 next;
172             }
173             }
174            
175 0 0 0     0 if ( $e and not $task-> {dead}) {
176 0 0       0 print STDERR "IO::Events: exception $i\n" if $self->{debug};
177 0         0 $task-> notify('on_exception');
178             }
179             }
180              
181             # close processes
182             WAITPID:
183 7 50       23 if ( $self-> {waitpid}) {
184 7         63 while (($_ = waitpid(-1,WNOHANG)) > 0) {
185 0 0       0 next unless $self->{processes}->{$_};
186 0         0 my @tasks = map { $self-> {ids}-> {$_}} @{$self->{processes}-> {$_}};
  0         0  
  0         0  
187             # read leftovers
188 0         0 for my $task ( @tasks) {
189 0 0 0     0 if ( $task-> can_read && $task-> {read} > 0) {
190 0         0 my $notify;
191 0         0 while ( 1) {
192 0         0 my $nbytes = sysread( $task->{handle},
193             $task->{read_buffer}, 65536,
194             length ($task->{read_buffer}));
195 0 0       0 unless ( defined $nbytes) {
196 0 0       0 $self-> error( $task, 'read')
197             unless $! == EAGAIN;
198 0         0 last;
199             }
200 0         0 $notify += $nbytes;
201 0 0       0 last unless $nbytes;
202             }
203 0 0       0 $task-> notify('on_read') if $notify;
204             }
205             # XXX if $task-> can_exception ... read URG bytes?
206 0         0 $task->{exitcode} = $?;
207 0         0 $task->{finished} = 1;
208             }
209 0         0 for my $task ( @tasks) {
210 0         0 $task-> destroy;
211             }
212             }
213             }
214              
215 7         25 return $n;
216             }
217              
218             sub handles
219             {
220 0     0   0 return scalar(keys %{$_[0]->{ids}});
  0         0  
221             }
222              
223             sub flush
224             {
225 0     0   0 shift-> yield( block_read => 1, block_exc => 1, timeout => 0);
226             }
227              
228             sub error
229             {
230 0     0   0 my ( $self, $task, $condition) = @_;
231 0 0       0 $task-> notify('on_error', $condition, $!) if $task;
232             }
233              
234             sub on_fork
235             {
236 0     0   0 $IO::Events::FORK_MODE = 1;
237 0         0 shift-> DESTROY;
238 0         0 $IO::Events::FORK_MODE = undef;
239             }
240              
241             sub DESTROY
242             {
243 1     1   2 my $self = $_[0];
244 1 50       6 return if $self->{dead};
245 1         3 for ( values %{$self->{ids}}) {
  1         15  
246 10 100       29 next unless $_;
247 9 50       21 $_->{dead} = 1 if $IO::Events::FORK_MODE;
248 9         58 $_-> destroy;
249             }
250 1         5 for ( @{$self->{timers}}) {
  1         5  
251 1 50       6 next unless $_;
252 1         7 $_->{dead} = 1;
253             }
254 1         5 @IO::Events::loops = grep { $self != $_ } @IO::Event::IPC::loops;
  0         0  
255 1         5 $self-> {dead} = 1;
256             }
257              
258             END
259             {
260 1     1   275 for ( @IO::Events::loops) {
261 1         3 eval { $_->DESTROY };
  1         7  
262 1 50       7 warn "$@" if $@;
263             }
264 1         18 @IO::Events::loops = ();
265             }
266              
267             # Single task
268             package IO::Events::Handle;
269 1     1   2534 use vars qw(@ISA %events);
  1         2  
  1         83  
270              
271 1     1   7 use Errno qw(EAGAIN);
  1         1  
  1         58  
272              
273 1     1   5 use constant SINGLE => 1;
  1         3  
  1         91  
274 1     1   5 use constant MULTIPLE => 2;
  1         2  
  1         112  
275              
276             %events = (
277             on_read => SINGLE,
278             on_write => SINGLE,
279             on_exception => SINGLE,
280             on_close => MULTIPLE,
281             on_create => MULTIPLE,
282             on_error => MULTIPLE,
283             );
284              
285 1     1   6 use Fcntl;
  1         2  
  1         2177  
286              
287             sub new
288             {
289 10     10   41 my $class = shift;
290 10         193 my $self = bless {
291             auto_close => 1,
292             finished => 0,
293             exitcode => 0,
294             read_buffer => '',
295             write_buffer => '',
296             write => 0,
297             read => 0,
298             exception => 0,
299             pid => undef,
300             @_,
301             }, $class;
302 10 50       239 $self->{handle} = IO::Handle-> new() unless defined $self->{handle};
303 10         43 for ( qw(owner)) {
304 10 50       49 die "No `$_' field" unless defined $self-> {$_};
305             }
306 10 100       174 $self-> {id} = "$self" unless defined $self-> {id};
307 10         16 my $owner = $self->{owner};
308 10 50       234 die "Id `$self->{id}` already present" if exists $owner->{ids}->{$self->{id}};
309 10         30 my $fno = fileno( $self->{handle});
310 10 50       59 die "Cannot read fileno() from handle" unless defined $fno;
311 10         28 $self-> {fileno} = $fno;
312 10 50       44 unless ( $self-> {nonblock}) {
313 10         13 my $fl;
314 10         98 $fl = fcntl( $self->{handle}, F_GETFL, 0);
315 10 50       23 die "$!" unless defined $fl;
316 10 50       160 fcntl( $self->{handle}, F_SETFL, $fl|O_NONBLOCK) or die "$!";
317             }
318 10 100       26 if ($self-> {write}) {
319 1         4 vec( $owner-> {write}, $fno, 1) = 1;
320             #print "write\n";
321             }
322 10 100       29 if ($self-> {read}) {
323 7         49 vec( $owner-> {read}, $fno, 1) = 1;
324             #print "read\n";
325             }
326 10 50       44 if ($self-> {exception}) {
327 0         0 vec( $owner-> {exc}, $fno, 1) = 1;
328             }
329 10         100 $owner-> {filenos}-> {$fno} = $self-> {id};
330 10 100       28 push @{$owner-> {processes}-> {$self->{pid}}}, $self-> {id} if defined $self->{pid};
  2         15  
331 10         38 $owner-> {ids}-> {$self->{id}} = $self;
332 10         176 $self-> notify('on_create');
333 10         49 return $self;
334             }
335              
336             sub can_read
337             {
338 0 0   0   0 return vec( $_[0]->{owner}->{read}, $_[0]-> {fileno}, 1) unless $#_;
339 0         0 vec( $_[0]->{owner}->{read}, $_[0]-> {fileno}, 1) = $_[1];
340 0         0 $_[0]-> {read} = $_[1];
341             }
342              
343             sub can_write
344             {
345 0 0   0   0 return vec( $_[0]->{owner}->{write}, $_[0]-> {fileno}, 1) unless $#_;
346 0         0 vec( $_[0]->{owner}->{write}, $_[0]-> {fileno}, 1) = $_[1];
347 0         0 $_[0]-> {write} = $_[1];
348             }
349              
350             sub can_exception
351             {
352 0 0   0   0 return vec( $_[0]->{owner}->{exc}, $_[0]-> {fileno}, 1) unless $#_;
353 0         0 vec( $_[0]->{owner}->{exc}, $_[0]-> {fileno}, 1) = $_[1];
354 0         0 $_[0]-> {exception} = $_[1];
355             }
356              
357             sub DESTROY
358             {
359 19     19   49 my $self = $_[0];
360 19 100       136 return if $self->{dead};
361 10         30 $self->{dead} = 1;
362 10         56 $self-> flush;
363 10         28 $self-> notify('on_close');
364 10 50 33     118 $self-> {handle}-> close
365             if defined $self->{handle} && $self->{auto_close};
366 10 50       455 if ( defined $self->{owner}) {
367 10 50       25 if ( defined $self->{fileno}) {
368 10         44 vec( $self-> {owner}-> {exc}, $self->{fileno}, 1) = 0;
369 10         33 vec( $self-> {owner}-> {write}, $self->{fileno}, 1) = 0;
370 10         29 vec( $self-> {owner}-> {read}, $self->{fileno}, 1) = 0;
371 10         43 delete $self-> {owner}-> {filenos}-> {$self->{fileno}};
372             }
373 10 100       49 if (defined $self->{pid}) {
374 2         7 my $p = $self-> {owner}-> {processes}-> {$self->{pid}};
375 2         7 @$p = grep { $_ ne $self->{id}} @$p;
  3         15  
376 2 100       22 delete $self-> {owner}-> {processes}-> {$self->{pid}} unless @$p;
377             }
378 10         33 delete $self-> {owner}-> {ids}-> {$self->{id}};
379             }
380 10         20 delete $self->{fileno};
381 10         181 delete $self->{id};
382             }
383              
384             sub readline
385             {
386 7 100   7   277 return $1 if $_[0]-> {read_buffer} =~ s/^([^\n]*\n)//;
387 3         8 return undef;
388             }
389              
390             sub read
391             {
392 0     0   0 my $c = $_[0]-> {read_buffer};
393 0         0 substr( $_[0]-> {read_buffer}, 0) = '';
394 0         0 return $c;
395             }
396              
397             sub write
398             {
399 3     3   6 my ( $self, $data) = @_;
400 3         9 $self-> {write_buffer} .= $data;
401 3 50 33     43 vec( $self->{owner}->{write}, $self-> {fileno}, 1) = 1 if $self->{owner} and defined $self->{fileno};
402              
403 3         100 my $nbytes = syswrite( $self->{handle}, $self->{write_buffer});
404 3 50       24 unless ( defined $nbytes) {
    50          
405 0 0 0     0 $self-> {owner}-> error( $self, 'write') if $self->{owner} && $! != EAGAIN;
406 0 0       0 $nbytes = 0 if $! == EAGAIN;
407             } elsif ( $nbytes > 0) {
408 3         8 substr( $self->{write_buffer}, 0, $nbytes) = '';
409             }
410 3         16 $nbytes;
411             }
412              
413             sub flush
414             {
415 10     10   17 my ( $self, $discard) = @_;
416 10 50       24 if ( $discard) {
417 0         0 $self-> {write_buffer} = '';
418             } else {
419 10         39 while ( length $self-> {write_buffer}) {
420 0 0       0 return undef unless defined $self-> write('');
421             }
422             }
423 10         14 return 1;
424             }
425              
426 9     9   43 sub destroy { shift-> DESTROY }
427              
428             sub notify
429             {
430 31     31   126 my ( $self, $event, @params) = @_;
431 31 50       86 die( "Unexistent event `$event'") unless $events{$event};
432            
433 31         65 $self-> {event_flag} = 0;
434 31 100       82 if ( exists $self->{$event}) {
435 8         68 $self->{$event}->($self,@params);
436 8 100 66     60 return if $events{$event} == SINGLE || $self->{event_flag};
437             }
438 24 100       575 $self-> $event(@params) if $self-> can($event);
439             }
440              
441             sub on_error
442             {
443 0     0   0 my ( $self, $condition, $errno) = @_;
444 0 0       0 if ( $self) {
445 0         0 $condition .= ' '.ref($self);
446 0 0       0 $condition .= ",#$self->{fileno}" if defined $self->{fileno};
447 0 0       0 $condition .= " pid $self->{pid}" if defined $self->{pid};
448 0 0       0 $condition .= " ($self->{process})" if defined $self->{process};
449             }
450 0         0 warn "Error on $condition: $errno\n";
451 0         0 $_[0]-> destroy;
452             }
453              
454              
455             # external writer process
456             package IO::Events::Process::Write;
457 1     1   7 use vars qw(@ISA);
  1         2  
  1         207  
458             @ISA = qw(IO::Events::Handle);
459              
460             sub new
461             {
462 0     0   0 my ( $self, %profile) = @_;
463 0 0       0 die "No `process'" unless defined $profile{process};
464 0         0 my $handle = IO::Handle-> new();
465 0         0 $handle-> autoflush(1);
466 0         0 my $pid = open( $handle, "|$profile{process}");
467 0 0       0 die("Cannot fork:$!") unless defined $pid;
468              
469 0         0 $self = $self-> SUPER::new(
470             write => 1,
471             %profile,
472             handle => $handle,
473             pid => $pid,
474             );
475 0         0 return $self;
476             }
477              
478             # external reader process
479             package IO::Events::Process::Read;
480 1     1   20 use vars qw(@ISA);
  1         1  
  1         205  
481             @ISA = qw(IO::Events::Handle);
482              
483             sub new
484             {
485 0     0   0 my ( $self, %profile) = @_;
486 0 0       0 die "No `process'" unless defined $profile{process};
487 0         0 my $handle = IO::Handle-> new();
488 0         0 $handle-> autoflush(1);
489 0         0 my $pid = open( $handle, "$profile{process}|");
490 0 0       0 die("Cannot fork:$!") unless defined $pid;
491              
492 0         0 $self = $self-> SUPER::new(
493             read => 1,
494             %profile,
495             handle => $handle,
496             pid => $pid,
497             );
498 0         0 return $self;
499             }
500              
501             # internal reader process
502             package IO::Events::Fork::Read;
503 1     1   5 use vars qw(@ISA);
  1         2  
  1         214  
504             @ISA = qw(IO::Events::Handle);
505              
506             sub new
507             {
508 0     0   0 my ( $self, %profile) = @_;
509 0         0 my $handle = IO::Handle-> new();
510 0         0 $handle-> autoflush(1);
511 0         0 my $pid = open( $handle, "-|");
512 0 0       0 die("Cannot fork:$!") unless defined $pid;
513 0 0       0 unless ( $pid) {
514             # $profile{owner}->on_fork();
515 0         0 $|=1;
516 0   0     0 my $on_fork = $profile{on_fork} || $self->can('on_fork');
517 0 0       0 $on_fork->(\%profile) if $on_fork;
518 0         0 POSIX::_exit(0);
519             }
520              
521 0         0 $self = $self-> SUPER::new(
522             read => 1,
523             %profile,
524             handle => $handle,
525             pid => $pid,
526             );
527 0         0 return $self;
528             }
529              
530             # internal writer process
531             package IO::Events::Fork::Write;
532 1     1   5 use vars qw(@ISA);
  1         2  
  1         214  
533             @ISA = qw(IO::Events::Handle);
534              
535             sub new
536             {
537 0     0   0 my ( $self, %profile) = @_;
538 0         0 my $handle = IO::Handle-> new();
539 0         0 $handle-> autoflush(1);
540 0         0 my $pid = open( $handle, "|-");
541 0 0       0 die("Cannot fork:$!") unless defined $pid;
542 0 0       0 unless ( $pid) {
543             # $profile{owner}->on_fork();
544 0   0     0 my $on_fork = $profile{on_fork} || $self->can('on_fork');
545 0 0       0 $on_fork->(\%profile) if $on_fork;
546 0         0 POSIX::_exit(0);
547             }
548              
549 0         0 $self = $self-> SUPER::new(
550             write => 1,
551             %profile,
552             handle => $handle,
553             pid => $pid,
554             );
555 0         0 return $self;
556             }
557              
558             package IO::Events::internal::Shadow;
559 1     1   5 use vars qw(@ISA);
  1         2  
  1         203  
560             @ISA = qw(IO::Events::Handle);
561              
562             sub new
563             {
564 1     1   27 my ( $self, %profile) = @_;
565 1         11 $profile{shadow_task} = $profile{owner}->{ids}->{$profile{id}};
566 1         7 $profile{id} = "shadow:$profile{id}";
567 1         27 my $ret = $self-> SUPER::new(%profile);
568 1         5 return $ret;
569             }
570              
571             sub on_close
572             {
573 1     1   5 undef $_[0]->{shadow_task}-> {shadow};
574             }
575              
576             sub on_error
577             {
578 0     0   0 my ( $self, $condition, $errno) = @_;
579 0         0 $self-> {shadow_task}-> notify('on_error', $condition, $errno);
580             }
581              
582              
583             # internal bidirectional process
584             package IO::Events::Fork::ReadWrite;
585 1     1   4 use vars qw(@ISA);
  1         2  
  1         651  
586             @ISA = qw(IO::Events::Handle);
587              
588             sub new
589             {
590 1     1   53 my ( $self, %profile) = @_;
591              
592             # reader
593 1         10 my $handle1 = IO::Handle-> new();
594 1         114 $handle1-> autoflush(1);
595            
596             # writer
597 1         72 my $handle2 = IO::Handle-> new();
598 1         16 $handle2-> autoflush(1);
599              
600             # fork & pipes
601 1         67 pipe(READER, $handle2);
602 1         18 pipe($handle1, WRITER);
603 1         10 WRITER->autoflush(1);
604            
605 1         1161 my $pid = fork();
606 1 50       1418 die("Cannot fork:$!") unless defined $pid;
607            
608 1 50       13 unless ( $pid) {
609 0         0 close $handle1;
610 0         0 close $handle2;
611 0         0 open STDOUT, ">&WRITER";
612 0         0 open STDIN, "<&READER";
613              
614             # $profile{owner}->on_fork();
615 0         0 $|=1;
616 0   0     0 my $on_fork = $profile{on_fork} || $self->can('on_fork');
617 0 0       0 $on_fork->(\%profile) if $on_fork;
618 0         0 POSIX::_exit(0);
619             }
620              
621 1         107 close WRITER;
622 1         12 close READER;
623              
624             # create objects
625 1         1092 $self = $self-> SUPER::new(
626             read => 1,
627             %profile,
628             handle => $handle1,
629             pid => $pid,
630             );
631              
632 1         36 $self-> {shadow} = IO::Events::internal::Shadow-> new(
633             write => 1,
634             %profile,
635             id => $self-> {id},
636             handle => $handle2,
637             pid => $pid,
638             on_write => \&shadow_write,
639             on_close => \&shadow_close,
640             );
641              
642 1         33 return $self;
643             }
644              
645             sub shadow_write
646             {
647 1     1   14 shift-> {shadow_task}-> notify('on_write');
648             }
649              
650             sub shadow_close
651             {
652 1     1   3 my $shadow = shift;
653 1         4 $shadow-> {shadow_task}->{finished} = $shadow-> {finished};
654 1         3 $shadow-> {shadow_task}->{exitcode} = $shadow-> {exitcode};
655 1         6 $shadow-> {shadow_task}-> notify('on_close', 1);
656             }
657              
658             sub shutdown
659             {
660 0     0   0 my ( $self, @cmd) = @_;
661 0         0 for ( @cmd) {
662 0 0       0 if ( $_ eq 'read') {
    0          
663 0         0 $self-> SUPER::DESTROY;
664             } elsif ( $_ eq 'write') {
665 0 0       0 $self-> {shadow}-> DESTROY if $self-> {shadow};
666             }
667             }
668             }
669              
670             sub DESTROY
671             {
672 2 100   2   31 return if $_[0]->{dead};
673 1 50       20 $_[0]->{shadow}->DESTROY if $_[0]->{shadow};
674 1         10 $_[0]->SUPER::DESTROY;
675 1         6 $_[0]->{dead} = 1;
676             }
677              
678             sub write {
679 1     1   3 my $self = shift;
680 1 50       4 return unless $self->{shadow};
681 1         23 $self-> {shadow}-> write( @_)
682             }
683              
684             # external bidirectional process
685             package IO::Events::Process::ReadWrite;
686 1     1   5 use vars qw(@ISA);
  1         2  
  1         95  
687             @ISA = qw(IO::Events::Fork::ReadWrite);
688              
689             sub on_fork
690             {
691 0 0   0   0 exec( $_[0]->{process}) or POSIX::_exit(127);
692             }
693              
694             package IO::Events::stdin;
695 1     1   5 use vars qw(@ISA);
  1         7  
  1         138  
696             @ISA=qw(IO::Events::Handle);
697              
698             sub new
699             {
700 0     0   0 my ( $self, %profile) = @_;
701 0         0 $profile{id} = "stdin";
702 0         0 $profile{handle} = \*STDIN;
703 0         0 $profile{read} = 1;
704 0         0 $profile{auto_close} = 0;
705 0         0 return $self-> SUPER::new(%profile);
706             }
707              
708             package IO::Events::stdout;
709 1     1   6 use vars qw(@ISA);
  1         1  
  1         164  
710             @ISA=qw(IO::Events::Handle);
711              
712             sub new
713             {
714 0     0   0 my ( $self, %profile) = @_;
715 0         0 $profile{id} = "stdout";
716 0         0 $profile{handle} = \*STDOUT;
717 0         0 $profile{write} = 1;
718 0         0 $profile{auto_close} = 0;
719 0         0 return $self-> SUPER::new(%profile);
720             }
721              
722             package IO::Events::stderr;
723 1     1   5 use vars qw(@ISA);
  1         5  
  1         120  
724             @ISA=qw(IO::Events::Handle);
725              
726             sub new
727             {
728 0     0   0 my ( $self, %profile) = @_;
729 0         0 $profile{id} = "stderr";
730 0         0 $profile{handle} = \*STDERR;
731 0         0 $profile{write} = 1;
732 0         0 $profile{auto_close} = 0;
733 0         0 return $self-> SUPER::new(%profile);
734             }
735              
736             package IO::Events::Socket;
737 1     1   5 use vars qw(@ISA);
  1         3  
  1         46  
738             @ISA=qw(IO::Events::Handle);
739              
740 1     1   1033 use Socket;
  1         7503  
  1         1320  
741              
742             sub accept
743             {
744 2     2   39 my ( $self, %profile) = @_;
745 2         10 my $handle = IO::Handle-> new;
746 2 50       87 accept( $handle, $self-> {handle}) or die "accept() error:$!";
747 2         14 return IO::Events::Handle-> new(
748             owner => $self-> {owner},
749             handle => $handle,
750             %profile,
751             );
752             }
753              
754             sub connect
755             {
756 2     2   12 $_[0]-> {callback} = \&socket_connect_error_checker;
757             }
758              
759             sub socket_connect_error_checker
760             {
761 2     2   3 my ( $self, $r, $w, $e) = @_;
762 2         5 delete $self-> {callback};
763              
764 2         35 local $! = unpack('i', getsockopt($self-> {handle}, SOL_SOCKET, SO_ERROR));
765 2 50       11 if ( $!) {
766 0 0       0 $self-> {owner}-> error( $self, 'connect') if $self->{owner};
767             }
768             }
769              
770             package IO::Events::Socket::TCP;
771 1     1   12 use vars qw(@ISA);
  1         2  
  1         56  
772             @ISA=qw(IO::Events::Socket);
773              
774 1     1   6 use strict;
  1         1  
  1         39  
775 1     1   5 use Socket;
  1         1  
  1         602  
776 1     1   6 use Fcntl;
  1         2  
  1         332  
777 1     1   6 use Errno qw(EWOULDBLOCK EINPROGRESS);
  1         2  
  1         912  
778              
779             sub new
780             {
781 2     2   48 my ( $self, %profile) = @_;
782              
783 2 50       52 $profile{handle} = IO::Handle-> new unless $profile{handle};
784 2 50       2821 die "Cannot create socket: $!" unless
785             socket( $profile{handle}, PF_INET, SOCK_STREAM, getprotobyname('tcp'));
786              
787 2 50       12 unless ( $profile{nonblock}) {
788 2         3 my $fl;
789 2         13 $fl = fcntl( $profile{handle}, F_GETFL, 0);
790 2 50       13 die "$!" unless defined $fl;
791 2 50       24 fcntl( $profile{handle}, F_SETFL, $fl|O_NONBLOCK) or die "$!";
792             }
793              
794 2 100       11 if ( defined $profile{connect}) {
    50          
795 1         2 my $iaddr;
796 1 50       8 die "Cannot resolve host '$profile{connect}'" unless
797             $iaddr = inet_aton( $profile{connect});
798 1         7 my $ok = connect( $profile{handle}, sockaddr_in( $profile{port}, $iaddr));
799 1 50 33     209 $ok = 1 if !$ok and ( $! == EWOULDBLOCK || $! == EINPROGRESS);
      33        
800 1 50       4 die "Connect error: $!" unless $ok;
801             } elsif ( exists $profile{listen}) {
802 1 50       14 setsockopt( $profile{handle}, SOL_SOCKET, SO_REUSEADDR, pack("l", 1)) or
803             die "Error in setsockopt(SOL_SOCKET,SO_REUSEADDR,1):$!";
804 1   50     12 my $addr = $profile{addr} || '0.0.0.0';
805 1         19 my $inet = inet_aton( $addr);
806 1 50       11 die "Cannot resolve host '$addr'" unless defined $inet;
807 1 50       22 bind( $profile{handle}, sockaddr_in( $profile{port}, $inet)) or
808             die "Error in bind($profile{port}, $addr):$!";
809 1         47 listen( $profile{handle}, SOMAXCONN);
810 1         5 $profile{read} = -1;
811             }
812              
813 2         45 my $this = $self-> SUPER::new(%profile);
814 2 100       21 $this-> SUPER::connect() if $profile{connect};
815 2         16 return $this;
816             }
817              
818             sub accept
819             {
820 1     1   10 my ( $self, %profile) = @_;
821 1         25 my $client = $self-> SUPER::accept( %profile);
822 1         15 my ($port, $ipaddr) = unpack_sockaddr_in( getpeername( $client->{handle}));
823 1         19 $client-> {remote_addr} = inet_ntoa($ipaddr);
824 1         3 $client-> {remote_port} = $port;
825 1         3 return $client;
826             }
827              
828             package IO::Events::Socket::UDP;
829 1     1   7 use vars qw(@ISA);
  1         3  
  1         58  
830             @ISA=qw(IO::Events::Socket);
831              
832 1     1   6 use strict;
  1         3  
  1         32  
833 1     1   5 use Socket;
  1         2  
  1         765  
834 1     1   14 use Fcntl;
  1         1  
  1         1203  
835              
836             sub new
837             {
838 2     2   10 my ( $self, %profile) = @_;
839              
840 2 50       15 $profile{handle} = IO::Handle-> new unless $profile{handle};
841 2 50       301 die "Cannot create socket: $!" unless
842             socket( $profile{handle}, PF_INET, SOCK_DGRAM, getprotobyname('udp'));
843              
844 2 50       9 unless ( $profile{nonblock}) {
845 2         4 my $fl;
846 2         10 $fl = fcntl( $profile{handle}, F_GETFL, 0);
847 2 50       7 die "$!" unless defined $fl;
848 2 50       13 fcntl( $profile{handle}, F_SETFL, $fl|O_NONBLOCK) or die "$!";
849             }
850            
851 2   50     33 my $addr = $profile{addr} || '0.0.0.0';
852 2         10 my $inet = inet_aton( $addr);
853 2 50       7 die "Cannot resolve host '$addr'" unless defined $inet;
854            
855 2 50       7 if ( $profile{broadcast}) {
856 0 0       0 setsockopt( $profile{handle}, SOL_SOCKET, SO_BROADCAST, pack("l", 1)) or
857             die "Error in setsockopt(SOL_SOCKET,SO_BROADCAST,1):$!";
858             }
859 2         3 $profile{read} = -2;
860            
861 2 50 100     31 bind( $profile{handle}, sockaddr_in( $profile{port} || 0, $inet)) or
862             die "Error in bind($profile{port}, $addr):$!";
863              
864 2         70 return $self-> SUPER::new(%profile);
865             }
866              
867             sub recv
868             {
869 1     1   8 my ( $self, %profile) = @_;
870              
871 1 50       29 $profile{maxlen} = 32768 unless defined $profile{maxlen};
872              
873 1         4 my $flags = MSG_DONTWAIT;
874 1 50       5 $flags |= MSG_OOB if $profile{oob};
875 1 50       4 $flags |= MSG_PEEK if $profile{peek};
876 1 50       4 $flags |= MSG_WAITALL if $profile{waitall};
877 1 50 33     7 $flags &= ~MSG_DONTWAIT if defined($profile{nonblock}) and $profile{nonblock} == 0;
878            
879 1         2 my ( $port, $host);
880 1         2 my $data = '';
881 1         26 $host = recv( $self-> {handle}, $data, $profile{maxlen}, $flags);
882 1 50       4 unless ( defined $host) {
883 0         0 $self-> error( 'recv');
884 0         0 return undef;
885             }
886 1         6 ( $port, $host) = sockaddr_in( $host);
887 1         26 $self-> {remote_port} = $port;
888 1         136 $self-> {remote_host} = gethostbyaddr( $host, AF_INET);
889              
890 1         6 return $data;
891             }
892              
893             sub send
894             {
895 1     1   10 my ( $self, $addr, $port, $data, %profile) = @_;
896              
897 1         2 my $flags = 0;
898 1 50       3 $flags |= MSG_OOB if $profile{oob};
899 1 50       4 $flags |= MSG_DONTROUTE if $profile{dontroute};
900 1 50       4 $flags |= MSG_EOR if $profile{eor};
901 1 50       3 $flags |= MSG_EOF if $profile{eof};
902            
903 1   50     381 my $inet = inet_aton($addr) || die "unknown host '$addr'\n";
904 1         5 $inet = sockaddr_in( $port, $inet);
905              
906 1         135 my $ret = send( $self-> {handle}, $data, $flags, $inet);
907 1 50       12 unless ( defined $ret) {
908 0         0 $self-> error( 'recv');
909 0         0 return undef;
910             }
911 1         4 return $ret;
912             }
913              
914             package IO::Events::Socket::UNIX;
915 1     1   5 use vars qw(@ISA);
  1         2  
  1         44  
916             @ISA=qw(IO::Events::Socket);
917              
918 1     1   5 use Socket;
  1         24  
  1         979  
919 1     1   7 use Fcntl;
  1         2  
  1         635  
920              
921             sub new
922             {
923 2     2   8 my ( $self, %profile) = @_;
924              
925 2 50       13 $profile{handle} = IO::Handle-> new unless $profile{handle};
926 2 50       363 die "Cannot create socket: $!" unless
927             socket( $profile{handle}, PF_UNIX, SOCK_STREAM, 0);
928              
929 2 50       7 unless ( $profile{nonblock}) {
930 2         3 my $fl;
931 2         10 $fl = fcntl( $profile{handle}, F_GETFL, 0);
932 2 50       13 die "$!" unless defined $fl;
933 2 50       12 fcntl( $profile{handle}, F_SETFL, $fl|O_NONBLOCK) or die "$!";
934             }
935              
936 2 100       9 if ( defined $profile{connect}) {
    50          
937 1 50       24 connect( $profile{handle}, pack_sockaddr_un($profile{connect})) or
938             die "connect($profile{connect}) error: $!";
939             } elsif ( exists $profile{listen}) {
940 1 50       96 bind( $profile{handle}, pack_sockaddr_un($profile{listen})) or
941             die "Error in bind($profile{listen}):$!";
942 1         6 listen( $profile{handle}, SOMAXCONN);
943 1         4 $profile{read} = -1;
944             }
945              
946 2         66 my $this = $self-> SUPER::new(%profile);
947 2 100       141 $this-> SUPER::connect() if $profile{connect};
948 2         15 return $this;
949             }
950              
951             package IO::Events::Timer;
952 1     1   8 use Time::HiRes qw(time);
  1         2  
  1         12  
953              
954             sub new
955             {
956 1     1   4 my $class = shift;
957              
958 1         30 my $self = bless {
959             timeout => 10000,
960             repetitive => 0,
961             active => 0,
962             @_,
963             }, $class;
964            
965 1         4 for ( qw(owner)) {
966 1 50       16 die "No `$_' field" unless defined $self-> {$_};
967             }
968              
969 1         1 push @{$self-> {owner}-> {timers}}, $self;
  1         5  
970              
971 1 50       8 $self-> start if $self-> {active};
972              
973 1         3 return $self;
974             }
975              
976             sub DESTROY
977             {
978 0     0   0 my $self = $_[0];
979 0 0       0 return if $self-> {dead};
980 0         0 @{$self-> {owner}-> {timers}} = grep { $_ != $self } @{$self-> {owner}-> {timers}};
  0         0  
  0         0  
  0         0  
981             }
982              
983             sub start
984             {
985 1     1   2 my $self = $_[0];
986 1         8 $self-> {alert} = time + $self-> {timeout};
987 1         2 $self-> {active} = 1;
988             }
989              
990 0     0   0 sub stop { $_[0]-> {active} = 0 }
991              
992             sub active
993             {
994 0     0   0 my ( $self, $active) = @_;
995 0 0       0 return if $active == $self-> {active}; # to avoid restarts
996 0 0       0 $active ? $self-> start : $self-> stop;
997             }
998              
999             sub notify
1000             {
1001 1     1   5 my $self = $_[0];
1002              
1003 1 50       10 if ( $self-> {repetitive}) {
1004 0         0 my $time = time;
1005             # eat up late events
1006 0         0 $self-> {alert} += $self-> {timeout} while $self-> {alert} < $time;
1007             } else {
1008 1         3 $self-> {active} = 0;
1009             }
1010            
1011 1         4 $self-> {event_flag} = 0;
1012 1 50       6 if ( defined $self->{on_tick}) {
1013 1         10 $self->{on_tick}->($self);
1014 1 50       10 return if $self->{event_flag};
1015             }
1016 1 50       47 $self-> on_tick() if $self-> can('on_tick');
1017             }
1018              
1019              
1020             1;
1021              
1022             __DATA__