File Coverage

blib/lib/IO/Lambda/Loop/Select.pm
Criterion Covered Total %
statement 117 148 79.0
branch 35 74 47.3
condition 11 24 45.8
subroutine 13 13 100.0
pod 7 8 87.5
total 183 267 68.5


line stmt bran cond sub pod time code
1             # $Id: Select.pm,v 1.18 2010/01/01 14:52:17 dk Exp $
2              
3             package IO::Lambda::Loop::Select;
4 14     14   65 use strict;
  14         22  
  14         442  
5 14     14   64 use warnings;
  14         22  
  14         517  
6 14     14   59 use Errno qw(EINTR EAGAIN);
  14         22  
  14         2282  
7 14     14   87 use IO::Lambda qw(:constants);
  14         27  
  14         2771  
8 14     14   72 use Time::HiRes qw(time);
  14         30  
  14         112  
9              
10             IO::Lambda::Loop::default('Select');
11              
12             our $DEBUG = $IO::Lambda::DEBUG{select} || 0;
13              
14             # IO::Select::select doesn't distinguish between select returning 0 and -1, don't have
15             # time to fix that. I'll just use a plain select instead, it'll be faster also.
16              
17             sub new
18             {
19 14     14 1 35 my $self = bless {} , shift;
20 14         100 $self-> {$_} = '' for qw(read write exc);
21 14         28 $self-> {items} = {};
22 14         27 $self-> {timers} = [];
23 14         54 return $self;
24             }
25              
26             sub empty
27             {
28 181     181 1 167 my $self = shift;
29             return (
30 181         237 @{$self->{timers}} +
31 181 100       186 keys(%{$self-> {items}})
  181         653  
32             ) ? 0 : 1;
33             }
34              
35             sub yield
36             {
37 89     89 1 117 my ( $self, $nonblocking ) = @_;
38              
39 89 50       155 return if $self-> empty;
40              
41 89         99 my $t;
42 89 50       157 $t = 0 if $nonblocking;
43              
44 89         111 my ($min,$max) = ( 0, -1);
45 89         204 my $ct = time;
46              
47             # timers
48 89         90 for ( @{$self-> {timers}}) {
  89         177  
49 128 100 100     754 $t = $_->[WATCH_DEADLINE]
      33        
50             if defined $_->[WATCH_DEADLINE] and
51             (!defined($t) or $t > $_-> [WATCH_DEADLINE]);
52             }
53              
54             # handles
55 89         111 my ( $R, $W, $E) = @{$self}{qw(read write exc)};
  89         221  
56              
57 89         95 while ( my ( $fileno, $bucket) = each %{ $self-> {items}} ) {
  99         288  
58 10         20 for ( @$bucket) {
59 10 50 33     36 $t = $_->[WATCH_DEADLINE]
      66        
60             if defined $_->[WATCH_DEADLINE] and
61             (!defined($t) or $t > $_-> [WATCH_DEADLINE]);
62             }
63 10 50       15 warn "select: fileno $fileno\n" if $DEBUG;
64 10 50       28 $max = $fileno if $max < $fileno;
65 10 50 33     47 $min = $fileno if !defined($min) or $min > $fileno;
66             }
67 89 100       167 if ( defined $t) {
    50          
68 87         126 $t -= $ct;
69 87 100       195 $t = 0 if $t < 0;
70 87 50       166 warn "select: timeout=$t\n" if $DEBUG;
71             } elsif ( $DEBUG) {
72 0         0 warn "select: no timeout\n";
73             }
74              
75             # do select
76 89         2859546 my $n = select( $R, $W, $E, $t);
77 89 50       358 warn "select: $n handles ready\n" if $DEBUG;
78 89 100       244 if ( $n < 0) {
79 1 50 33     114 if ( $! == EINTR or $! == EAGAIN) {
80             # ignore
81 1 50       12 warn "select: $!\n" if $DEBUG;
82             } else {
83             # find out the rogue handles
84 0 0       0 if ( $DEBUG > 1) {
85 0         0 my $h = $R | $W | $E;
86 0         0 for ( my $i = 0; $i < length($h); $i++) {
87 0         0 my $v = '';
88 0         0 for ( my $j = 0; $j < 8; $j++) {
89 0         0 my $fd = $i * 8 + $j;
90 0 0       0 next unless vec($h,$fd,1);
91 0         0 vec($v,$fd,1) = 1;
92 0 0       0 next if select($v,$v,$v,0) >= 0;
93 0         0 warn "select: bad handle #$fd\n";
94             }
95             }
96             }
97 0         0 die "select() error:$!:$^E";
98             }
99             }
100            
101             # expired timers
102 89         119 my ( @kill, @expired);
103              
104 89         204 $t = $self-> {timers};
105             @$t = grep {
106 89 100       215 ($$_[WATCH_DEADLINE] <= $ct) ? do {
  128         517  
107 45         89 push @expired, $_;
108 45         90 0;
109             } : 1;
110             } @$t;
111              
112             # handles
113 89 100       175 if ( $n > 0) {
114             # process selected handles
115 10   66     45 for ( my $i = $min; $i <= $max && $n > 0; $i++) {
116 66         77 my $what =
117             vec( $R, $i, 1) * IO_READ +
118             vec( $W, $i, 1) * IO_WRITE +
119             vec( $E, $i, 1) * IO_EXCEPTION
120             ;
121 66 100       194 next unless $what;
122              
123 10         13 my $bucket = $self-> {items}-> {$i};
124             @$bucket = grep {
125 10 50       14 ($$_[WATCH_IO_FLAGS] & $what) ? do {
  10         18  
126 10         11 $$_[WATCH_IO_FLAGS] &= $what;
127 10         10 push @expired, $_;
128 10         17 0;
129             } : 1;
130             } @$bucket;
131 10 50       27 delete $self-> {items}->{$i} unless @$bucket;
132 10         26 $n--;
133             }
134             } else {
135             # else process timeouts
136 79         93 my @kill;
137 79         94 while ( my ( $fileno, $bucket) = each %{ $self-> {items}}) {
  79         324  
138             @$bucket = grep {
139 0         0 (
140             defined($_->[WATCH_DEADLINE]) &&
141             $_->[WATCH_DEADLINE] <= $ct
142 0 0 0     0 ) ? do {
143 0         0 $$_[WATCH_IO_FLAGS] = 0;
144 0         0 push @expired, $_;
145 0         0 0;
146             } : 1;
147             } @$bucket;
148 0 0       0 push @kill, $fileno unless @$bucket;
149             }
150 79         95 delete @{$self->{items}}{@kill};
  79         143  
151             }
152 89         281 $self-> rebuild_vectors;
153            
154             # call them
155 89         438 $$_[WATCH_OBJ]-> io_handler( $_) for @expired;
156             }
157              
158             sub watch
159             {
160 10     10 1 12 my ( $self, $rec) = @_;
161 10         19 my $fileno = fileno $rec->[WATCH_IO_HANDLE];
162 10 50       29 die "Invalid filehandle" unless defined $fileno;
163 10         11 my $flags = $rec->[WATCH_IO_FLAGS];
164              
165 10 50       52 vec($self-> {read}, $fileno, 1) = 1 if $flags & IO_READ;
166 10 50       24 vec($self-> {write}, $fileno, 1) = 1 if $flags & IO_WRITE;
167 10 100       19 vec($self-> {exc}, $fileno, 1) = 1 if $flags & IO_EXCEPTION;
168              
169 10         11 push @{$self-> {items}-> {$fileno}}, $rec;
  10         31  
170             }
171              
172             sub after
173             {
174 54     54 1 69 my ( $self, $rec) = @_;
175 54         58 push @{$self-> {timers}}, $rec;
  54         144  
176             }
177              
178             sub remove
179             {
180 22     22 1 32 my ($self, $obj) = @_;
181              
182 22         29 @{$self-> {timers}} = grep {
183 5 50       24 defined($_->[WATCH_OBJ]) and $_->[WATCH_OBJ] != $obj
184 22         29 } @{$self-> {timers}};
  22         36  
185              
186 22         29 my @kill;
187 22         27 while ( my ( $fileno, $bucket) = each %{$self->{items}}) {
  22         61  
188 0 0       0 @$bucket = grep { defined($_->[WATCH_OBJ]) and $_->[WATCH_OBJ] != $obj } @$bucket;
  0         0  
189 0 0       0 next if @$bucket;
190 0         0 push @kill, $fileno;
191             }
192 22         34 delete @{$self->{items}}{@kill};
  22         36  
193              
194 22         47 $self-> rebuild_vectors;
195             }
196              
197             sub remove_event
198             {
199 8     8 1 15 my ($self, $rec) = @_;
200            
201 8         18 @{$self-> {timers}} = grep { $_ != $rec } @{$self-> {timers}};
  8         17  
  7         25  
  8         98  
202              
203 8         21 my @kill;
204 8         13 while ( my ( $fileno, $bucket) = each %{$self->{items}}) {
  8         38  
205 0         0 @$bucket = grep { $_ != $rec } @$bucket;
  0         0  
206 0 0       0 next if @$bucket;
207 0         0 push @kill, $fileno;
208             }
209 8         14 delete @{$self->{items}}{@kill};
  8         13  
210              
211 8         25 $self-> rebuild_vectors;
212              
213             }
214              
215             sub rebuild_vectors
216             {
217 119     119 0 147 my $self = $_[0];
218 119         530 $self-> {$_} = '' for qw(read write exc);
219 119         172 my $r = \ $self-> {read};
220 119         140 my $w = \ $self-> {write};
221 119         146 my $e = \ $self-> {exc};
222 119         147 while ( my ( $fileno, $bucket) = each %{$self->{items}}) {
  119         420  
223 0         0 for my $flags ( map { $_-> [WATCH_IO_FLAGS] } @$bucket) {
  0         0  
224 0 0       0 vec($$r, $fileno, 1) = 1 if $flags & IO_READ;
225 0 0       0 vec($$w, $fileno, 1) = 1 if $flags & IO_WRITE;
226 0 0       0 vec($$e, $fileno, 1) = 1 if $flags & IO_EXCEPTION;
227             }
228             }
229             }
230              
231             1;
232              
233             __DATA__