File Coverage

blib/lib/AnyEvent/Handle/Writer.pm
Criterion Covered Total %
statement 13 127 10.2
branch 0 56 0.0
condition 0 59 0.0
subroutine 5 17 29.4
pod 5 5 100.0
total 23 264 8.7


line stmt bran cond sub pod time code
1             package AnyEvent::Handle::Writer;
2              
3 1     1   24348 use 5.8.8;
  1         4  
  1         50  
4 1     1   990 use common::sense 2;m{
  1         32  
  1         7  
5             use strict;
6             use warnings;
7             }x;
8 1     1   1682 use AnyEvent::Handle;
  1         40003  
  1         49  
9 1     1   14 use AnyEvent::Util;
  1         2  
  1         144  
10 1     1   2319 BEGIN{ push our @ISA, 'AnyEvent::Handle'; }
11              
12             =head1 NAME
13              
14             AnyEvent::Handle::Writer - Extended version of AnyEvent::Handle with additional write options
15              
16             =cut
17              
18             our $VERSION = '0.02';
19              
20             =head1 SYNOPSIS
21              
22             use AnyEvent;
23             use AnyEvent::Handle::Writer;
24              
25             my $hdl; $hdl = AnyEvent::Handle::Writer->new(
26             fh => $fh,
27             on_error => sub {
28             my ($hdl, $fatal, $msg) = @_;
29             warn "got error $msg\n";
30             $hdl->destroy;
31             }
32             );
33              
34             # Custom writer
35             $hdl->push_write(sub {
36             my $h = shift;
37             if ($have_data) {
38             $h->unshift_write($data);
39             return 1; # Work done
40             } else {
41             return 0; # Work not done, call me again
42             }
43             });
44              
45             # sendfile
46             $hdl->push_sendfile('/path/to/file', 1024);
47              
48             =head1 RATIONALE
49              
50             We have great l. But it have only raw write queue. This module extends it with
51             callbacks in write queue and adds a C call, which would be processed at correct time
52              
53             =head1 METHODS
54              
55             =cut
56              
57             =head2 push_write($data)
58              
59             L
60              
61             =head2 push_write(type => @args)
62              
63             L
64              
65             =head2 push_write($cb->($handle))
66              
67             This version of call allow to push a callback, which will be invoked when the write queue before it became empty.
68              
69             Callback should return:
70             true - when there is no work to be done with this callback.
71             it will be removed from queue and continue
72             false - when it want to be called again (i.e. not all work was done)
73             it will be kept in queue and called on next drain
74              
75             This call allow us to implement such a thing:
76              
77             $handle->push_write("HTTP/1.1 200 OK\012\015\012\015");
78             $handle->push_write(sub {
79             # Manual work on handle
80             my $h = shift;
81             my $len = syswrite($h->{fh}, $data); # Here may be also sendfile work
82             if (defined $len) {
83             diag "written $len";
84             substr $data, 0, $len, "";
85             if (length $data) {
86             return 0; # want be called again
87             } else {
88             return 1; # all done
89             }
90             } elsif (!$!{EAGAIN} and !$!{EINTR} and !$!{WSAEWOULDBLOCK}) {
91             $h->_error ($!, 1);
92             return 1; # No more requests to me, got an error
93             }
94             else { return 0; }
95             });
96             $handle->push_write("HTTP/1.1 200 OK\012\015\012\015");
97             $handle->push_write("Common response");
98              
99             =head2 unshift_write($data)
100             =head2 unshift_write(type => @args)
101             =head2 unshift_write($cb->($handle))
102              
103             Analogically to C, it unshift write data at the beginngin of queue. B
104              
105             $handle->push_write("1")
106             $handle->push_write(sub {
107             my $h = shift;
108             $h->unshift_write("2");
109             return 1;
110             });
111             $handle->push_write("3");
112            
113             # The output will be "123"
114              
115             =cut
116              
117              
118             sub new {
119 0     0 1   my $pkg = shift;
120 0           my $self;
121 0           my %args = @_;
122 0           $args{on_drain} = _shadow_on_drain(delete $args{on_drain});
123 0           $self = $pkg->AnyEvent::Handle::new(%args);
124 0           $self;
125             }
126              
127             sub _shadow_on_drain {
128 0     0     my $old = shift;
129             return sub {
130 0     0     my $h = shift;
131             #warn "on drain called";
132             #warn "Ready ".int $h;
133 0           $h->{_writer_buffer_clean} = 1;
134 0 0         if (@{ $h->{_writer_wbuf} || [] }) {
  0 0          
135 0           $h->_drain_writer_wbuf;
136             } else {
137 0 0         $old->($h) if defined $old;
138             }
139 0           };
140             }
141              
142             sub push_write {
143 0     0 1   my $self = shift;
144             #warn "push_write ";
145 0 0         if (@_ > 1) {
146 0           my $type = shift;
147              
148 0   0       @_ = ($AnyEvent::Handle::WH{$type} ||= AnyEvent::Handle::_load_func "$type\::anyevent_write_type"
149             or Carp::croak "unsupported/unloadable type '$type' passed to AnyEvent::Handle::push_write")
150             ->($self, @_);
151             }
152 0 0 0       if (ref $_[0] or @{ $self->{_writer_wbuf} }) {
  0            
153 0           push @{ $self->{_writer_wbuf} }, $_[0];
  0            
154             } else {
155 0           $self->{_writer_buffer_clean} = 0;
156 0           $self->AnyEvent::Handle::push_write(@_);
157             }
158 0           $self->_drain_writer_wbuf;
159             }
160              
161             sub unshift_write {
162 0     0 1   my $self = shift;
163 0 0         if (@_ > 1) {
164 0           my $type = shift;
165              
166 0   0       @_ = ($AnyEvent::Handle::WH{$type} ||= AnyEvent::Handle::_load_func "$type\::anyevent_write_type"
167             or Carp::croak "unsupported/unloadable type '$type' passed to AnyEvent::Handle::push_write")
168             ->($self, @_);
169             }
170 0 0         if (ref $_[0]) {
171 0           unshift @{ $self->{_writer_wbuf} }, $_[0];
  0            
172             } else {
173 0 0         if ($self->{_writer_buffer_clean}) {
174 0           $self->{_writer_buffer_clean} = 0;
175 0           $self->AnyEvent::Handle::push_write(@_);
176             } else {
177 0           unshift @{ $self->{_writer_wbuf} }, $_[0];
  0            
178             }
179             }
180 0           $self->_drain_writer_wbuf;
181             }
182              
183             sub _drain_writer_wbuf {
184             #warn "call my_drain";
185 0     0     my $self = shift;
186 0 0         if (ref $self->{_writer_wbuf}[0]) {
187 0 0         if($self->{_writer_wbuf}[0]->($self)) {
188 0           shift @{$self->{_writer_wbuf}};
  0            
189             # Write nothing but call AE::Handle logic
190             };
191 0 0 0       unless($self->{_ww} or length $self->{_wbuf} ) {
192             $self->{_writer_ww} = AE::io $self->{fh}, 1, sub {
193 0     0     delete $self->{_writer_ww};
194 0           $self->_drain_writer_wbuf;
195             }
196 0           } else {
197 0           $self->_drain_wbuf;
198             }
199             #syswrite($self->{fh},'');
200             } else {
201             #warn "Not a cb";
202 0           $self->AnyEvent::Handle::push_write(shift @{$self->{_writer_wbuf}});
  0            
203             }
204             }
205              
206             sub on_drain {
207 0     0 1   my ($self,$cb) = @_;
208 0           $cb = _shadow_on_drain($cb);
209 0           $self->AnyEvent::Handle::on_drain($cb);
210             }
211              
212             =head2 push_sendfile($filename, [$size, [$offset]]);
213              
214             Push sendfile operation into write queue. If sendfile cannot be found (L)
215             or if it fails with one of ENOSYS, ENOTSUP, EOPNOTSUPP, EAFNOSUPPORT, EPROTOTYPE or ENOTSOCK, it will be emulated with chunked read/write
216              
217             $handle->push_write("HTTP/1.0 200 OK\nContent-length: $size\n...\n\n");
218             $handle->push_sendfile($file, $size, $offset);
219              
220             =cut
221              
222             our $NO_SENDFILE;
223              
224             sub push_sendfile {
225 0     0 1   my $self = shift;
226 0           my $file = shift;
227 0           my $size = shift;
228 0           my $offset = shift;
229 0           my $do_sendfile = 0;
230 0 0 0       if (!$self->{tls} and !$NO_SENDFILE) {
231 0 0         eval {
232 0           require Sys::Sendfile;
233 0           $do_sendfile = 1;
234             } or $NO_SENDFILE = 1;
235             }
236 0           my $f;
237             my $open = sub {
238 0 0   0     if (open $f, '<:raw', $file) {
239 0   0       $size ||= (stat $f)[7];
240 0   0       $offset ||= 0;
241             #warn "Successfully opened file $file: $size | ".fileno($f);
242 0           AnyEvent::Util::fh_nonblocking $f, 1;
243 0           return 1;
244             } else {
245             #warn "open failed: $!";
246 0           $self->_error($!,1); # Fatal error, write queue became broken
247 0           return 0;
248             }
249 0           };
250             my $emulation = sub {
251 0 0 0 0     $open->() or return 1 unless $f;
252 0           my $h = $_[0];
253 0           my $buf;
254             # Here I'm assume, that reading from fs is faster, than writing to socket.
255             # So I don't create io watcher on filehandle.
256             # When we're asked to give a write data, we're trying to read it
257 0   0       my $read = sysread($f,$buf,($self->{read_size} || 8192));
258             #warn "sysread()=$read";
259 0 0 0       if (defined $read and $read >= 0) {
    0 0        
      0        
260             #warn "read $read";
261 0           $size -= $read;
262 0 0         if ($read > 0) {
    0          
263 0           $h->unshift_write($buf);
264 0           return 0;
265             }
266             elsif ($size > 0) {
267 0           return 0;
268             }
269             else { # EOF
270 0           close $f;
271 0 0         if ($size > 0) {
272 0           warn "File $file was truncated during sendfile.\n\tHave to sent $size more, but got EOF";
273             #$h->_error ($!, 1);
274             }
275 0           return 1;
276             }
277             }
278             elsif ($!{EAGAIN} or $!{EINTR} or $!{WSAEWOULDBLOCK}) {
279             # warn "retry $!";
280 0           return 0; # Call me later
281             }
282             else {
283             # warn "failed with error $!";
284 0           $h->_error ($!, 1);
285 0           close $f;
286 0           return 1; # No more requests to me, got an error
287             }
288 0           };
289             my $sendfile = sub {
290 0 0   0     goto &$emulation unless $do_sendfile;
291 0 0 0       $open->() or return 1 unless $f;
292             # warn "sendfile";
293 0           my $h = $_[0];
294 0           my $len = Sys::Sendfile::sendfile($h->{fh}, $f, $size, $offset);
295 0 0 0       if (defined $len) {
    0 0        
    0 0        
    0 0        
      0        
      0        
      0        
      0        
296             # warn "Written $len by sendfile $!";
297 0           $offset += $len;
298 0           $size -= $len;
299 0 0         if ($size > 0) {
300             # warn "want more (+$size)";
301 0           return 0; # want be called again
302             } else {
303 0           warn "all done";
304 0           close $f;
305 0           return 1; # done
306             }
307             }
308             elsif ($!{EAGAIN} or $!{EINTR} or $!{WSAEWOULDBLOCK}) {
309 0           return 0; # Call me later
310             }
311             elsif ($!{EINVAL}) {
312 0           warn "Fallback to emulation because of $!\n";
313 0           $do_sendfile = 0;
314 0           goto &$emulation;
315             }
316             elsif ( $!{EINVAL} or $!{ENOSYS} or $!{ENOTSUP} or $!{EOPNOTSUPP} or $!{EAFNOSUPPORT} or $!{EPROTOTYPE} or $!{ENOTSOCK} ) {
317 0           $do_sendfile = 0;
318 0           goto &$emulation;
319             }
320             else {
321 0           warn "sendfile: $!";
322 0           $h->_error ($!, 1);
323 0           close $f;
324 0           return 1; # No more requests to me, got an error
325             }
326 0           };
327 0 0         $self->push_write($do_sendfile ? $sendfile : $emulation);
328             }
329              
330             #sub DESTROY {
331             # my $self = shift;
332             # warn "DESTROY handle";
333             # return $self->AnyEvent::Handle::DESTROY;
334             #}
335              
336             1;
337              
338             =head1 ACKNOWLEDGEMENTS
339              
340              
341             =head1 AUTHOR
342              
343             Mons Anderson, C<< >>
344              
345             =head1 LICENSE
346              
347             This program is free software; you can redistribute it and/or modify it
348             under the terms of either: the GNU General Public License as published
349             by the Free Software Foundation; or the Artistic License.
350              
351             =cut
352              
353             1; # End of AnyEvent::Handle::Writer