File Coverage

blib/lib/POE/Wheel/Sendfile.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             package POE::Wheel::Sendfile;
2              
3 1     1   48853 use strict;
  1         3  
  1         38  
4 1     1   5 use warnings;
  1         1  
  1         44  
5              
6             our $VERSION = '0.0200';
7              
8 1     1   522 use POE;
  0            
  0            
9             use IO::File;
10             use Socket;
11              
12             sub DEBUG () { 0 }
13              
14             use base qw( POE::Wheel::ReadWrite );
15              
16             #######################################
17             BEGIN {
18             *HANDLE_OUTPUT = \&POE::Wheel::ReadWrite::HANDLE_OUTPUT;
19             *STATE_WRITE = \&POE::Wheel::ReadWrite::STATE_WRITE;
20             *EVENT_FLUSHED = \&POE::Wheel::ReadWrite::EVENT_FLUSHED;
21             *EVENT_ERROR = \&POE::Wheel::ReadWrite::EVENT_ERROR;
22             *UNIQUE_ID = \&POE::Wheel::ReadWrite::UNIQUE_ID;
23             *DRIVER_BOTH = \&POE::Wheel::ReadWrite::DRIVER_BOTH;
24             *AUTOFLUSH = \&POE::Wheel::ReadWrite::AUTOFLUSH;
25             }
26              
27             sub STATE_SENDFILE () { AUTOFLUSH+1 }
28              
29             #######################################
30             our $HAVE_SENDFILE;
31             BEGIN {
32             unless( defined $HAVE_SENDFILE ) {
33             $HAVE_SENDFILE = 0;
34             eval "use Sys::Sendfile 0.11 ();";
35             $HAVE_SENDFILE = 1 unless $@;
36             warn $@ if DEBUG and $@;
37             }
38             }
39              
40             #######################################
41             sub sendfile
42             {
43             my( $self, $fh ) = @_;
44             # Build a [SENDFILE] hash with details
45             my $S = $self->_sendfile_setup( $fh );
46             $S or return;
47             # Build a select write handler
48             $self->_sendfile_define_write( $S ) or return;
49             # Call that handler
50             return $poe_kernel->call(
51             $poe_kernel->get_active_session,
52             $self->[STATE_SENDFILE],
53             $self->[HANDLE_OUTPUT]
54             );
55             }
56              
57             #######################################
58             sub _sendfile_setup
59             {
60             my( $self, $fh ) = @_;
61             my $event_error = \$self->[EVENT_ERROR];
62             my $unique_id = \$self->[UNIQUE_ID];
63              
64             if( $self->[STATE_SENDFILE] ) {
65             $@ = "Already sending a file";
66             return;
67             }
68             my $S = {};
69             if( 'HASH' eq ref $fh ) {
70             $S = $fh;
71             $fh = delete $S->{file};
72             }
73             unless( ref $fh ) {
74             my $io = IO::File->new;
75             unless( $io->open( $fh ) ) {
76             my $me = $poe_kernel->get_active_session;
77             $$event_error && $poe_kernel->call(
78             $me, $$event_error, 'open', ($!+0), "$fh: $!", $unique_id
79             );
80             return;
81             }
82             $S->{file} = $fh;
83             $S->{fh} = $io;
84             }
85             else {
86             $S->{fh} = $fh;
87             }
88              
89             unless( $S->{offset} ) {
90             $S->{offset} = 0;
91             }
92              
93             unless( $S->{size} ) {
94             $S->{size} = (stat $S->{fh})[7] - $S->{offset};
95             }
96             else {
97             $S->{size} += $S->{offset};
98             }
99             unless( $S->{blocksize} or $HAVE_SENDFILE ) {
100             $S->{blocksize} = eval {
101             $SIG{__DIE__} = 'DEFAULT';
102             my $h = $self->[HANDLE_OUTPUT];
103             return unpack "i",
104             getsockopt($h, Socket::SOL_SOCKET(), Socket::SO_SNDBUF());
105             };
106             $S->{blocksize} ||= 7500;
107             }
108              
109             return $S;
110             }
111              
112             #######################################
113             sub _sendfile_define_write
114             {
115             my( $self, $S ) = @_;
116              
117             my @need = (
118             \$self->[EVENT_ERROR], # $event_error
119             \$self->[EVENT_FLUSHED], # $event_flush
120             \$self->[STATE_WRITE], # $state_write
121             \$self->[STATE_SENDFILE], # $state_sendfile
122             $self->[UNIQUE_ID], # $unique_id
123             $self->[DRIVER_BOTH], # $driver
124             \$S, # $sendfile
125             1 # $first
126             );
127              
128             my $state;
129             if( $HAVE_SENDFILE ) {
130             $state = _mk_sendfile( \@need );
131             }
132             else {
133             $state = _mk_fallback( \@need );
134             }
135             die unless $state;
136             $self->[STATE_SENDFILE] = ref( $self ) . " ($self->[UNIQUE_ID]) -> sendfile write",
137             $poe_kernel->state( $self->[STATE_SENDFILE], $state );
138             $poe_kernel->select_write($self->[HANDLE_OUTPUT]);
139             $poe_kernel->select_write( $self->[HANDLE_OUTPUT],
140             $self->[STATE_SENDFILE]
141             );
142             return 1;
143             }
144              
145             # This is where all the work happens
146             # We call sendfile(), check it's return, update the offset, then
147             # wait for the flushed-event to happen.
148             # If we are at the end of the file, we let the flushed-event go to the
149             # OutputEvent handler
150             sub _mk_sendfile
151             {
152             my(
153             $event_error, # \$self->[EVENT_ERROR];
154             $event_flushed, # \$self->[EVENT_FLUSHED];
155             $state_write, # \$self->[STATE_WRITE];
156             $state_sendfile, # \$self->[STATE_SENDFILE];
157             $unique_id, # $self->[UNIQUE_ID];
158             $driver, # $self->[DRIVER_BOTH];
159             $sendfile, # \$S;
160             $first # 1;
161             ) = @{ $_[0] };
162              
163             return sub {
164             0 && CRIMSON_SCOPE_HACK('<');
165             my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];
166              
167             my $need = $$sendfile->{size} - $$sendfile->{offset};
168             DEBUG and warn "sendfile #$unique_id, offset=$$sendfile->{offset}, need=$need, size=$$sendfile->{size}";
169             my $rv = Sys::Sendfile::sendfile(
170             $handle, $$sendfile->{fh}, $need, $$sendfile->{offset} );
171              
172             DEBUG and warn "sendfile #$unique_id, rv=$rv";
173              
174             # sendfile(2) - Applications may wish to fall back to
175             # read(2)/write(2) in the case where sendfile() fails with EINVAL or
176             # ENOSYS.
177              
178             unless( defined $rv and $rv >= 0 ) {
179             $$event_error && $k->call(
180             $me, $$event_error, 'sendfile', ($!+0), "$!", $unique_id
181             );
182             return;
183             }
184              
185             $$sendfile->{offset} += $rv;
186             if( $rv == 0 or $$sendfile->{offset} >= $$sendfile->{size} ) {
187             DEBUG and warn "sendfile #$unique_id, done";
188             # We want the last flush to do to the session
189             $k->select_write( $handle );
190             $k->select_write( $handle, $$state_write );
191             # Remove this state
192             $k->state( $$state_sendfile );
193             # Nothing more to send
194             $$state_sendfile = undef();
195             return 1;
196             }
197              
198             if( $first ) {
199             # Turn the select on
200             $k->select_resume_write( $handle );
201             $first = 0;
202             }
203             return 1;
204             };
205             }
206              
207              
208             #
209             # Fallback to doing it by hand
210             #
211             sub _mk_fallback
212             {
213             my(
214             $event_error, # \$self->[EVENT_ERROR];
215             $event_flushed, # \$self->[EVENT_FLUSHED];
216             $state_write, # \$self->[STATE_WRITE];
217             $state_sendfile, # \$self->[STATE_SENDFILE];
218             $unique_id, # $self->[UNIQUE_ID];
219             $driver, # $self->[DRIVER_BOTH];
220             $sendfile, # \$S;
221             $first # 1;
222             ) = @{ $_[0] };
223              
224             my $rv = sysseek( $$sendfile->{fh}, $$sendfile->{offset}, 0 );
225             unless( defined $rv ) {
226             $@ = "Unable to sysseek to $$sendfile->{offset}: $!";
227             return;
228             }
229              
230             my $buffer = '';
231             return sub {
232             0 && CRIMSON_SCOPE_HACK('<');
233              
234             my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];
235              
236             # Don't read to much if we only want a little
237             my $size = $$sendfile->{blocksize};
238             if( $size+$$sendfile->{offset} > $$sendfile->{size} ) {
239             $size = $$sendfile->{size} - $$sendfile->{offset};
240             }
241              
242             my $rv = sysread( $$sendfile->{fh}, $buffer, $size );
243             unless( defined $rv ) {
244             $$event_error && $k->call(
245             $me, $$event_error, 'sysread', ($!+0), $!, $unique_id
246             );
247             return;
248             }
249              
250             $$sendfile->{offset} += $rv;
251             if( $rv == 0 || $$sendfile->{offset} >= $$sendfile->{size} ) {
252             # Nothing more to send
253             $$sendfile = undef();
254             # We want the last flush to go to the session
255             $k->select_write( $handle );
256             $k->select_write( $handle, $$state_write );
257             # Remove this state
258             $k->state( $$state_sendfile );
259             # Nothing more to send
260             $$state_sendfile = undef();
261             }
262            
263             my $err = 0;
264             if( $rv != 0 ) {
265             if( $driver->put( [$buffer] ) ) {
266             $driver->flush( $handle );
267             if( $! ) {
268             $$event_error && $k->call(
269             $me, $$event_error, 'syswrite', ($!+0), $!, $unique_id
270             );
271             $err = 1;
272             }
273             }
274             if( $first and not $err ) {
275             # Turn the select on
276             $k->select_resume_write( $handle );
277             $first = 0;
278             }
279             }
280             return if $err;
281             return 1;
282             };
283             }
284              
285             1;
286              
287             __END__