File Coverage

blib/lib/IO/Multiplex.pm
Criterion Covered Total %
statement 185 289 64.0
branch 43 140 30.7
condition 16 101 15.8
subroutine 34 46 73.9
pod 17 20 85.0
total 295 596 49.5


line stmt bran cond sub pod time code
1             package IO::Multiplex;
2              
3 5     5   71200 use strict;
  5         8  
  5         129  
4 5     5   19 use warnings;
  5         5  
  5         326  
5              
6             our $VERSION = '1.16';
7              
8             =head1 NAME
9              
10             IO::Multiplex - Manage IO on many file handles
11              
12             =head1 SYNOPSIS
13              
14             use IO::Multiplex;
15              
16             my $mux = new IO::Multiplex;
17             $mux->add($fh1);
18             $mux->add(\*FH2);
19             $mux->set_callback_object(...);
20             $mux->listen($server_socket);
21             $mux->loop;
22              
23             sub mux_input { ... }
24              
25             C is designed to take the effort out of managing
26             multiple file handles. It is essentially a really fancy front end to
27             the C
28             loop, it buffers all input and output to/from the file handles. It
29             can also accept incoming connections on one or more listen sockets.
30              
31             =head1 DESCRIPTION
32              
33             It is object oriented in design, and will notify you of significant events
34             by calling methods on an object that you supply. If you are not using
35             objects, you can simply supply C<__PACKAGE__> instead of an object reference.
36              
37             You may have one callback object registered for each file handle, or
38             one global one. Possibly both -- the per-file handle callback object
39             will be used instead of the global one.
40              
41             Each file handle may also have a timer associated with it. A callback
42             function is called when the timer expires.
43              
44             =head2 Handling input on descriptors
45              
46             When input arrives on a file handle, the C method is called
47             on the appropriate callback object. This method is passed three
48             arguments (in addition to the object reference itself of course):
49              
50             =over 4
51              
52             =item 1
53              
54             a reference to the mux,
55              
56             =item 2
57              
58             A reference to the file handle, and
59              
60             =item 3
61              
62             a reference to the input buffer for the file handle.
63              
64             =back
65              
66             The method should remove the data that it has consumed from the
67             reference supplied. It may leave unconsumed data in the input buffer.
68              
69             =head2 Handling output to descriptors
70              
71             If C did not handle output to the file handles as well
72             as input from them, then there is a chance that the program could
73             block while attempting to write. If you let the multiplexer buffer
74             the output, it will write the data only when the file handle is
75             capable of receiveing it.
76              
77             The basic method for handing output to the multiplexer is the C
78             method, which simply takes a file descriptor and the data to be
79             written, like this:
80              
81             $mux->write($fh, "Some data");
82              
83             For convenience, when the file handle is Ced to the multiplexer, it
84             is tied to a special class which intercepts all attempts to write to the
85             file handle. Thus, you can use print and printf to send output to the
86             handle in a normal manner:
87              
88             printf $fh "%s%d%X", $foo, $bar, $baz
89              
90             Unfortunately, Perl support for tied file handles is incomplete, and
91             functions such as C cannot be supported.
92              
93             Also, file handle object methods such as the C method of
94             C cannot be intercepted.
95              
96             =head1 EXAMPLES
97              
98             =head2 Simple Example
99              
100             This is a simple telnet-like program, which demonstrates the concepts
101             covered so far. It does not really work too well against a telnet
102             server, but it does OK against the sample server presented further down.
103              
104             use IO::Socket;
105             use IO::Multiplex;
106              
107             # Create a multiplex object
108             my $mux = new IO::Multiplex;
109             # Connect to the host/port specified on the command line,
110             # or localhost:23
111             my $sock = new IO::Socket::INET(Proto => 'tcp',
112             PeerAddr => shift || 'localhost',
113             PeerPort => shift || 23)
114             or die "socket: $@";
115              
116             # add the relevant file handles to the mux
117             $mux->add($sock);
118             $mux->add(\*STDIN);
119             # We want to buffer output to the terminal. This prevents the program
120             # from blocking if the user hits CTRL-S for example.
121             $mux->add(\*STDOUT);
122              
123             # We're not object oriented, so just request callbacks to the
124             # current package
125             $mux->set_callback_object(__PACKAGE__);
126              
127             # Enter the main mux loop.
128             $mux->loop;
129              
130             # mux_input is called when input is available on one of
131             # the descriptors.
132             sub mux_input {
133             my $package = shift;
134             my $mux = shift;
135             my $fh = shift;
136             my $input = shift;
137              
138             # Figure out whence the input came, and send it on to the
139             # other place.
140             if ($fh == $sock) {
141             print STDOUT $$input;
142             } else {
143             print $sock $$input;
144             }
145             # Remove the input from the input buffer.
146             $$input = '';
147             }
148              
149             # This gets called if the other end closes the connection.
150             sub mux_close {
151             print STDERR "Connection Closed\n";
152             exit;
153             }
154              
155             =head2 A server example
156              
157             Servers are just as simple to write. We just register a listen socket
158             with the multiplex object C method. It will automatically
159             accept connections on it and add them to its list of active file handles.
160              
161             This example is a simple chat server.
162              
163             use IO::Socket;
164             use IO::Multiplex;
165              
166             my $mux = new IO::Multiplex;
167              
168             # Create a listening socket
169             my $sock = new IO::Socket::INET(Proto => 'tcp',
170             LocalPort => shift || 2300,
171             Listen => 4)
172             or die "socket: $@";
173              
174             # We use the listen method instead of the add method.
175             $mux->listen($sock);
176              
177             $mux->set_callback_object(__PACKAGE__);
178             $mux->loop;
179              
180             sub mux_input {
181             my $package = shift;
182             my $mux = shift;
183             my $fh = shift;
184             my $input = shift;
185              
186             # The handles method returns a list of references to handles which
187             # we have registered, except for listen sockets.
188             foreach $c ($mux->handles) {
189             print $c $$input;
190             }
191             $$input = '';
192             }
193              
194             =head2 A more complex server example
195              
196             Let us take a look at the beginnings of a multi-user game server. We will
197             have a Player object for each player.
198              
199             # Paste the above example in here, up to but not including the
200             # mux_input subroutine.
201              
202             # mux_connection is called when a new connection is accepted.
203             sub mux_connection {
204             my $package = shift;
205             my $mux = shift;
206             my $fh = shift;
207              
208             # Construct a new player object
209             Player->new($mux, $fh);
210             }
211              
212             package Player;
213              
214             my %players = ();
215              
216             sub new {
217             my $package = shift;
218             my $self = bless { mux => shift,
219             fh => shift } => $package;
220              
221             # Register the new player object as the callback specifically for
222             # this file handle.
223              
224             $self->{mux}->set_callback_object($self, $self->{fh});
225             print $self->{fh}
226             "Greetings, Professor. Would you like to play a game?\n";
227              
228             # Register this player object in the main list of players
229             $players{$self} = $self;
230             $mux->set_timeout($self->{fh}, 1);
231             }
232              
233             sub players { return values %players; }
234              
235             sub mux_input {
236             my $self = shift;
237             shift; shift; # These two args are boring
238             my $input = shift; # Scalar reference to the input
239              
240             # Process each line in the input, leaving partial lines
241             # in the input buffer
242             while ($$input =~ s/^(.*?)\n//) {
243             $self->process_command($1);
244             }
245             }
246              
247             sub mux_close {
248             my $self = shift;
249              
250             # Player disconnected;
251             # [Notify other players or something...]
252             delete $players{$self};
253             }
254             # This gets called every second to update player info, etc...
255             sub mux_timeout {
256             my $self = shift;
257             my $mux = shift;
258              
259             $self->heartbeat;
260             $mux->set_timeout($self->{fh}, 1);
261             }
262              
263             =head1 METHODS
264              
265             =cut
266              
267 5     5   1999 use POSIX qw(errno_h BUFSIZ);
  5         23110  
  5         22  
268 5     5   5242 use Socket;
  5         2578  
  5         1797  
269 5     5   2076 use FileHandle qw(autoflush);
  5         16444  
  5         20  
270 5     5   1348 use IO::Handle;
  5         7  
  5         147  
271 5     5   15 use Fcntl;
  5         6  
  5         1377  
272 5     5   20 use Carp qw(carp);
  5         5  
  5         204  
273 5     5   18 use constant IsWin => ($^O eq 'MSWin32');
  5         5  
  5         389  
274              
275              
276             BEGIN {
277 5     5   37 eval {
278             # Can optionally use Hi Res timers if available
279 5         2167 require Time::HiRes;
280 5         5987 Time::HiRes->import('time');
281             };
282             }
283              
284             # This is what you want. Trust me.
285             $SIG{PIPE} = 'IGNORE';
286              
287 5     5   742 { no warnings;
  5         4  
  5         12630  
288             if(IsWin) { *EWOULDBLOCK = sub() {10035} }
289             }
290              
291             =head2 new
292              
293             Construct a new C object.
294              
295             $mux = new IO::Multiplex;
296              
297             =cut
298              
299             sub new
300             {
301 4     4 1 6398 my $package = shift;
302 4         84 my $self = bless { _readers => '',
303             _writers => '',
304             _fhs => {},
305             _handles => {},
306             _timerkeys => {},
307             _timers => [],
308             _listen => {} } => $package;
309 4         18 return $self;
310             }
311              
312             =head2 listen
313              
314             Add a socket to be listened on. The socket should have had the
315             C and C system calls already applied to it. The C
316             module will do this for you.
317              
318             $socket = new IO::Socket::INET(Listen => ..., LocalAddr => ...);
319             $mux->listen($socket);
320              
321             Connections will be automatically accepted and Ced to the multiplex
322             object. C callback method will also be called.
323              
324             =cut
325              
326             sub listen
327             {
328 2     2 1 640 my $self = shift;
329 2         4 my $fh = shift;
330              
331 2         6 $self->add($fh);
332 2         6 $self->{_fhs}{"$fh"}{listen} = 1;
333 2         2 $fh;
334             }
335              
336             =head2 add
337              
338             Add a file handle to the multiplexer.
339              
340             $mux->add($fh);
341              
342             As a side effect, this sets non-blocking mode on the handle, and disables
343             STDIO buffering. It also ties it to intercept output to the handle.
344              
345             =cut
346              
347             sub add
348             {
349 7     7 1 37 my $self = shift;
350 7         51 my $fh = shift;
351              
352 7 50       61 return if $self->{_fhs}{"$fh"};
353              
354 7         30 nonblock($fh);
355 7         38 autoflush($fh, 1);
356 7         378 fd_set($self->{_readers}, $fh, 1);
357              
358 7         47 my $sockopt = getsockopt $fh, SOL_SOCKET, SO_TYPE;
359 7 100 66     71 $self->{_fhs}{"$fh"}{udp_true} = 1
360             if defined $sockopt && SOCK_DGRAM == unpack "i", $sockopt;
361              
362 7         31 $self->{_fhs}{"$fh"}{inbuffer} = '';
363 7         17 $self->{_fhs}{"$fh"}{outbuffer} = '';
364 7         26 $self->{_fhs}{"$fh"}{fileno} = fileno($fh);
365 7         13 $self->{_handles}{"$fh"} = $fh;
366 7         51 tie *$fh, "IO::Multiplex::Handle", $self, $fh;
367 7         33 return $fh;
368             }
369              
370             =head2 remove
371              
372             Removes a file handle from the multiplexer. This also unties the
373             handle. It does not currently turn STDIO buffering back on, or turn
374             off non-blocking mode.
375              
376             $mux->remove($fh);
377              
378             =cut
379              
380             sub remove
381             {
382 0     0 1 0 my $self = shift;
383 0         0 my $fh = shift;
384 0         0 fd_set($self->{_writers}, $fh, 0);
385 0         0 fd_set($self->{_readers}, $fh, 0);
386 0         0 delete $self->{_fhs}{"$fh"};
387 0         0 delete $self->{_handles}{"$fh"};
388 0         0 $self->_removeTimer($fh);
389 0         0 untie *$fh;
390 0         0 return 1;
391             }
392              
393             =head2 set_callback_object
394              
395             Set the object on which callbacks are made. If you are not using objects,
396             you can specify the name of the package into which the method calls are
397             to be made.
398              
399             If a file handle is supplied, the callback object is specific for that
400             handle:
401              
402             $mux->set_callback_object($object, $fh);
403              
404             Otherwise, it is considered a default callback object, and is used when
405             events occur on a file handle that does not have its own callback object.
406              
407             $mux->set_callback_object(__PACKAGE__);
408              
409             The previously registered object (if any) is returned.
410              
411             See also the CALLBACK INTERFACE section.
412              
413             =cut
414              
415             sub set_callback_object
416             {
417 4     4 1 21 my $self = shift;
418 4         7 my $obj = shift;
419 4         5 my $fh = shift;
420 4 50 33     22 return if $fh && !exists($self->{_fhs}{"$fh"});
421              
422 4 50       15 my $old = $fh ? $self->{_fhs}{"$fh"}{object} : $self->{_object};
423              
424 4 50       26 $fh ? $self->{_fhs}{"$fh"}{object} : $self->{_object} = $obj;
425 4         6 return $old;
426             }
427              
428             =head2 kill_output
429              
430             Remove any pending output on a file descriptor.
431              
432             $mux->kill_output($fh);
433              
434             =cut
435              
436             sub kill_output
437             {
438 0     0 1 0 my $self = shift;
439 0         0 my $fh = shift;
440 0 0 0     0 return unless $fh && exists($self->{_fhs}{"$fh"});
441              
442 0         0 $self->{_fhs}{"$fh"}{outbuffer} = '';
443 0         0 fd_set($self->{_writers}, $fh, 0);
444             }
445              
446             =head2 outbuffer
447              
448             Return or set the output buffer for a descriptor
449              
450             $output = $mux->outbuffer($fh);
451             $mux->outbuffer($fh, $output);
452              
453             =cut
454              
455             sub outbuffer
456             {
457 0     0 1 0 my $self = shift;
458 0         0 my $fh = shift;
459 0 0 0     0 return unless $fh && exists($self->{_fhs}{"$fh"});
460              
461 0 0       0 if (@_) {
462 0 0       0 $self->{_fhs}{"$fh"}{outbuffer} = $_[0] if @_;
463 0 0       0 fd_set($self->{_writers}, $fh, 0) if !$_[0];
464             }
465              
466 0         0 $self->{_fhs}{"$fh"}{outbuffer};
467             }
468              
469             =head2 inbuffer
470              
471             Return or set the input buffer for a descriptor
472              
473             $input = $mux->inbuffer($fh);
474             $mux->inbuffer($fh, $input);
475              
476             =cut
477              
478             sub inbuffer
479             {
480 0     0 1 0 my $self = shift;
481 0         0 my $fh = shift;
482 0 0 0     0 return unless $fh && exists($self->{_fhs}{"$fh"});
483              
484 0 0       0 if (@_) {
485 0 0       0 $self->{_fhs}{"$fh"}{inbuffer} = $_[0] if @_;
486             }
487              
488 0         0 return $self->{_fhs}{"$fh"}{inbuffer};
489             }
490              
491             =head2 set_timeout
492              
493             Set the timer for a file handle. The timeout value is a certain number of
494             seconds in the future, after which the C callback is called.
495              
496             If the C module is installed, the timers may be specified in
497             fractions of a second.
498              
499             Timers are not reset automatically.
500              
501             $mux->set_timeout($fh, 23.6);
502              
503             Use C<$mux-Eset_timeout($fh, undef)> to cancel a timer.
504              
505             =cut
506              
507             sub set_timeout
508             {
509 4     4 1 439 my $self = shift;
510 4         6 my $fh = shift;
511 4         5 my $timeout = shift;
512 4 50 33     35 return unless $fh && exists($self->{_fhs}{"$fh"});
513              
514 4 50       10 if (defined $timeout) {
515 4         82 $self->_addTimer($fh, $timeout + time);
516             } else {
517 0         0 $self->_removeTimer($fh);
518             }
519             }
520              
521             =head2 handles
522              
523             Returns a list of handles that the C object knows about,
524             excluding listen sockets.
525              
526             @handles = $mux->handles;
527              
528             =cut
529              
530             sub handles
531             {
532 0     0 1 0 my $self = shift;
533              
534 0         0 return grep(!$self->{_fhs}{"$_"}{listen}, values %{$self->{_handles}});
  0         0  
535             }
536              
537             sub _addTimer {
538 4     4   6 my $self = shift;
539 4         5 my $fh = shift;
540 4         8 my $time = shift;
541              
542             # Set a key so that we can quickly tell if a given $fh has
543             # a timer set
544 4         13 $self->{_timerkeys}{"$fh"} = 1;
545              
546             # Store the timeout in an array, and resort it
547 4         5 @{$self->{_timers}} = sort { $a->[1] <=> $b->[1] } (@{$self->{_timers}}, [ $fh, $time ] );
  4         20  
  0         0  
  4         15  
548             }
549              
550             sub _removeTimer {
551 4     4   7 my $self = shift;
552 4         7 my $fh = shift;
553              
554             # Return quickly if no timer is set
555 4 50       22 return unless exists $self->{_timerkeys}{"$fh"};
556              
557             # Remove the timeout from the sorted array
558 4         8 @{$self->{_timers}} = grep { $_->[0] ne $fh } @{$self->{_timers}};
  4         10  
  4         20  
  4         13  
559              
560             # Get rid of the key
561 4         22 delete $self->{_timerkeys}{"$fh"};
562             }
563              
564              
565             =head2 loop
566              
567             Enter the main loop and start processing IO events.
568              
569             $mux->loop;
570              
571             =cut
572              
573             sub loop
574             {
575 4     4 1 26 my $self = shift;
576 4         5 my $heartbeat = shift;
577 4         8 $self->{_endloop} = 0;
578              
579 4   66     21 while (!$self->{_endloop} && keys %{$self->{_fhs}}) {
  14         3935  
580 14         22 my $rv;
581             my $data;
582 14         24 my $rdready = "";
583 14         20 my $wrready = "";
584 14         16 my $timeout = undef;
585              
586 14         18 foreach my $fh (values %{$self->{_handles}}) {
  14         46  
587 21 0 33     115 fd_set($rdready, $fh, 1) if
      33        
588             ref($fh) =~ /SSL/ &&
589             $fh->can("pending") &&
590             $fh->pending;
591             }
592              
593 14 50       44 if (!length $rdready) {
594 14 100       12 if (@{$self->{_timers}}) {
  14         42  
595 4         14 $timeout = $self->{_timers}[0][1] - time;
596             }
597              
598 14         20022625 my $numready = select($rdready=$self->{_readers},
599             $wrready=$self->{_writers},
600             undef,
601             $timeout);
602              
603 14 50       112 unless(defined($numready)) {
604 0 0 0     0 if ($! == EINTR || $! == EAGAIN) {
605 0         0 next;
606             } else {
607 0         0 last;
608             }
609             }
610             }
611              
612 14 50       44 &{ $heartbeat } ($rdready, $wrready) if $heartbeat;
  0         0  
613              
614 14         30 foreach my $k (keys %{$self->{_handles}}) {
  14         88  
615 20 50       80 my $fh = $self->{_handles}->{$k} or next;
616              
617             # Avoid creating a permanent empty hash ref for "$fh"
618             # by attempting to access its {object} element
619             # if it has already been closed.
620 20 50       124 next unless exists $self->{_fhs}{"$fh"};
621              
622             # It is not easy to replace $self->{_fhs}{"$fh"} with a
623             # variable, because some mux_* routines may remove it as
624             # side-effect.
625              
626             # Get the callback object.
627 20   33     151 my $obj = $self->{_fhs}{"$fh"}{object} ||
628             $self->{_object};
629              
630             # Is this descriptor ready for reading?
631 20 100       76 if (fd_isset($rdready, $fh))
632             {
633 10 100       44 if ($self->{_fhs}{"$fh"}{listen}) {
634             # It's a server socket, so a new connection is
635             # waiting to be accepted
636 2         18 my $client = $fh->accept;
637 2 50       214 next unless ($client);
638 2         10 $self->add($client);
639 2 50 33     27 $obj->mux_connection($self, $client)
640             if $obj && $obj->can("mux_connection");
641             } else {
642 8 100       55 if ($self->is_udp($fh)) {
643 6         108 $rv = recv($fh, $data, BUFSIZ, 0);
644 6 50       22 if (defined $rv) {
645             # Remember where the last UDP packet came from
646 6         29 $self->{_fhs}{"$fh"}{udp_peer} = $rv;
647             }
648             } else {
649 2         7 $rv = &POSIX::read(fileno($fh), $data, BUFSIZ);
650             }
651              
652 8 50 33     70 if (defined($rv) && length($data)) {
653             # Append the data to the client's receive buffer,
654             # and call process_input to see if anything needs to
655             # be done.
656 8         32 $self->{_fhs}{"$fh"}{inbuffer} .= $data;
657 8 50 33     163 $obj->mux_input($self, $fh,
658             \$self->{_fhs}{"$fh"}{inbuffer})
659             if $obj && $obj->can("mux_input");
660             } else {
661 0 0       0 unless (defined $rv) {
662             next if
663 0 0 0     0 $! == EINTR ||
      0        
664             $! == EAGAIN ||
665             $! == EWOULDBLOCK;
666 0 0       0 warn "IO::Multiplex read error: $!"
667             if $! != ECONNRESET;
668             }
669             # There's an error, or we received EOF. If
670             # there's pending data to be written, we leave
671             # the connection open so it can be sent. If
672             # the other end is closed for writing, the
673             # send will error and we close down there.
674             # Either way, we remove it from _readers as
675             # we're no longer interested in reading from
676             # it.
677 0         0 fd_set($self->{_readers}, $fh, 0);
678 0 0 0     0 $obj->mux_eof($self, $fh,
679             \$self->{_fhs}{"$fh"}{inbuffer})
680             if $obj && $obj->can("mux_eof");
681              
682 0 0       0 if (exists $self->{_fhs}{"$fh"}) {
683 0         0 $self->{_fhs}{"$fh"}{inbuffer} = '';
684             # The mux_eof handler could have responded
685             # with a shutdown for writing.
686 0 0 0     0 $self->close($fh)
687             unless exists $self->{_fhs}{"$fh"}
688             && length $self->{_fhs}{"$fh"}{outbuffer};
689             }
690 0         0 next;
691             }
692             }
693             } # end if readable
694 18 50       2239 next unless exists $self->{_fhs}{"$fh"};
695              
696 18 50       42 if (fd_isset($wrready, $fh)) {
697 0 0       0 unless (length $self->{_fhs}{"$fh"}{outbuffer}) {
698 0         0 fd_set($self->{_writers}, $fh, 0);
699 0 0 0     0 $obj->mux_outbuffer_empty($self, $fh)
700             if ($obj && $obj->can("mux_outbuffer_empty"));
701 0         0 next;
702             }
703 0         0 $rv = &POSIX::write(fileno($fh),
704             $self->{_fhs}{"$fh"}{outbuffer},
705             length($self->{_fhs}{"$fh"}{outbuffer}));
706 0 0       0 unless (defined($rv)) {
707             # We got an error writing to it. If it's
708             # EWOULDBLOCK (shouldn't happen if select told us
709             # we can write) or EAGAIN, or EINTR we don't worry
710             # about it. otherwise, close it down.
711 0 0 0     0 unless ($! == EWOULDBLOCK ||
      0        
712             $! == EINTR ||
713             $! == EAGAIN) {
714 0 0       0 if ($! == EPIPE) {
715 0 0 0     0 $obj->mux_epipe($self, $fh)
716             if $obj && $obj->can("mux_epipe");
717             } else {
718 0         0 warn "IO::Multiplex: write error: $!\n";
719             }
720 0         0 $self->close($fh);
721             }
722 0         0 next;
723             }
724 0         0 substr($self->{_fhs}{"$fh"}{outbuffer}, 0, $rv) = '';
725 0 0       0 unless (length $self->{_fhs}{"$fh"}{outbuffer}) {
726             # Mark us as not writable if there's nothing more to
727             # write
728 0         0 fd_set($self->{_writers}, $fh, 0);
729 0 0 0     0 $obj->mux_outbuffer_empty($self, $fh)
730             if ($obj && $obj->can("mux_outbuffer_empty"));
731              
732 0 0 0     0 if ( $self->{_fhs}{"$fh"}
733             && $self->{_fhs}{"$fh"}{shutdown}) {
734             # If we've been marked for shutdown after write
735             # do it.
736 0         0 shutdown($fh, 1);
737 0         0 $self->{_fhs}{"$fh"}{outbuffer} = '';
738 0 0       0 unless (length $self->{_fhs}{"$fh"}{inbuffer}) {
739             # We'd previously been shutdown for reading
740             # also, so close out completely
741 0         0 $self->close($fh);
742 0         0 next;
743             }
744             }
745             }
746             } # End if writeable
747              
748 18 50       88 next unless exists $self->{_fhs}{"$fh"};
749              
750             } # End foreach $fh (...)
751              
752 12 100       22 $self->_checkTimeouts() if @{$self->{_timers}};
  12         85  
753              
754             } # End while(loop)
755             }
756              
757             sub _checkTimeouts {
758 5     5   9 my $self = shift;
759              
760             # Get the current time
761 5         25 my $time = time;
762              
763             # Copy all of the timers that should go off into
764             # a temporary array. This allows us to modify the
765             # real array as we process the timers, without
766             # interfering with the loop.
767              
768 5         14 my @timers = ();
769 5         12 foreach my $timer (@{$self->{_timers}}) {
  5         17  
770             # If the timer is in the future, we can stop
771 5 100       28 last if $timer->[1] > $time;
772 4         12 push @timers, $timer;
773             }
774              
775 5         16 foreach my $timer (@timers) {
776 4         29 my $fh = $timer->[0];
777 4         15 $self->_removeTimer($fh);
778              
779 4 50       16 next unless exists $self->{_fhs}{"$fh"};
780              
781 4   33     38 my $obj = $self->{_fhs}{"$fh"}{object} || $self->{_object};
782 4 50 33     230 $obj->mux_timeout($self, $fh) if $obj && $obj->can("mux_timeout");
783             }
784             }
785              
786              
787             =head2 endloop
788              
789             Prematurly terminate the loop. The loop will automatically terminate
790             when there are no remaining descriptors to be watched.
791              
792             $mux->endloop;
793              
794             =cut
795              
796             sub endloop
797             {
798 2     2 1 296 my $self = shift;
799 2         8 $self->{_endloop} = 1;
800             }
801              
802             =head2 udp_peer
803              
804             Get peer endpoint of where the last udp packet originated.
805              
806             $saddr = $mux->udp_peer($fh);
807              
808             =cut
809              
810             sub udp_peer {
811 6     6 1 20 my $self = shift;
812 6         12 my $fh = shift;
813 6         44 return $self->{_fhs}{"$fh"}{udp_peer};
814             }
815              
816             =head2 is_udp
817              
818             Sometimes UDP packets require special attention.
819             This method will tell if a file handle is of type UDP.
820              
821             $is_udp = $mux->is_udp($fh);
822              
823             =cut
824              
825             sub is_udp {
826 14     14 1 24 my $self = shift;
827 14         16 my $fh = shift;
828 14         60 return $self->{_fhs}{"$fh"}{udp_true};
829             }
830              
831             =head2 write
832              
833             Send output to a file handle.
834              
835             $mux->write($fh, "'ere I am, JH!\n");
836              
837             =cut
838              
839             sub write
840             {
841 6     6 1 16 my $self = shift;
842 6         9 my $fh = shift;
843 6         14 my $data = shift;
844 6 50 33     63 return unless $fh && exists($self->{_fhs}{"$fh"});
845              
846 6 50       25 if ($self->{_fhs}{"$fh"}{shutdown}) {
847 0         0 $! = EPIPE;
848 0         0 return undef;
849             }
850 6 50       14 if ($self->is_udp($fh)) {
851 6 100       21 if (my $udp_peer = $self->udp_peer($fh)) {
852             # Send the packet back to the last peer that said something
853 4         173 return send($fh, $data, 0, $udp_peer);
854             } else {
855             # No udp_peer yet?
856             # This better be a connect()ed UDP socket
857             # or else this will fail with ENOTCONN
858 2         162 return send($fh, $data, 0);
859             }
860             }
861 0         0 $self->{_fhs}{"$fh"}{outbuffer} .= $data;
862 0         0 fd_set($self->{_writers}, $fh, 1);
863 0         0 return length($data);
864             }
865              
866             =head2 shutdown
867              
868             Shut down a socket for reading or writing or both. See the C
869             Perl documentation for further details.
870              
871             If the shutdown is for reading, it happens immediately. However,
872             shutdowns for writing are delayed until any pending output has been
873             successfully written to the socket.
874              
875             $mux->shutdown($socket, 1);
876              
877             =cut
878              
879             sub shutdown
880             {
881 0     0 1 0 my $self = shift;
882 0         0 my $fh = shift;
883 0         0 my $which = shift;
884 0 0 0     0 return unless $fh && exists($self->{_fhs}{"$fh"});
885              
886 0 0 0     0 if ($which == 0 || $which == 2) {
887             # Shutdown for reading. We can do this now.
888 0         0 shutdown($fh, 0);
889             # The mux_eof hook must be run from the main loop to consume
890             # the rest of the inbuffer if there is anything left.
891             # It will also remove $fh from _readers.
892             }
893              
894 0 0 0     0 if ($which == 1 || $which == 2) {
895             # Shutdown for writing. Only do this now if there is no pending
896             # data.
897 0 0       0 if(length $self->{_fhs}{"$fh"}{outbuffer}) {
898 0         0 $self->{_fhs}{"$fh"}{shutdown} = 1;
899             } else {
900 0         0 shutdown($fh, 1);
901 0         0 $self->{_fhs}{"$fh"}{outbuffer} = '';
902             }
903             }
904             # Delete the descriptor if it's totally gone.
905 0 0 0     0 unless (length $self->{_fhs}{"$fh"}{inbuffer} ||
906             length $self->{_fhs}{"$fh"}{outbuffer}) {
907 0         0 $self->close($fh);
908             }
909             }
910              
911             =head2 close
912              
913             Close a handle. Always use this method to close a handle that is being
914             watched by the multiplexer.
915              
916             $mux->close($fh);
917              
918             =cut
919              
920             sub close
921             {
922 0     0 1 0 my $self = shift;
923 0         0 my $fh = shift;
924 0 0       0 return unless exists $self->{_fhs}{"$fh"};
925              
926 0   0     0 my $obj = $self->{_fhs}{"$fh"}{object} || $self->{_object};
927 0 0       0 warn "closing with read buffer" if length $self->{_fhs}{"$fh"}{inbuffer};
928 0 0       0 warn "closing with write buffer" if length $self->{_fhs}{"$fh"}{outbuffer};
929              
930 0         0 fd_set($self->{_readers}, $fh, 0);
931 0         0 fd_set($self->{_writers}, $fh, 0);
932              
933 0         0 delete $self->{_fhs}{"$fh"};
934 0         0 delete $self->{_handles}{"$fh"};
935 0         0 untie *$fh;
936 0         0 close $fh;
937 0 0 0     0 $obj->mux_close($self, $fh) if $obj && $obj->can("mux_close");
938             }
939              
940             # We set non-blocking mode on all descriptors. If we don't, then send
941             # might block if the data is larger than the kernel can accept all at once,
942             # even though select told us we can write. With non-blocking mode, we
943             # get a partial write in those circumstances, which is what we want.
944              
945             sub nonblock
946 7     7 0 139 { my $fh = shift;
947              
948 7         17 if(IsWin)
949             { ioctl($fh, 0x8004667e, pack("L!", 1));
950             }
951             else
952 7 50       44 { my $flags = fcntl($fh, F_GETFL, 0)
953             or die "fcntl F_GETFL: $!\n";
954 7 50       52 fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
955             or die "fcntl F_SETFL $!\n";
956             }
957             }
958              
959             sub fd_set
960             {
961 7     7 0 37 vec($_[0], fileno($_[1]), 1) = $_[2];
962             }
963              
964             sub fd_isset
965             {
966 38     38 0 156 return vec($_[0], fileno($_[1]), 1);
967             }
968              
969             # We tie handles into this package to handle write buffering.
970              
971             package IO::Multiplex::Handle;
972              
973 5     5   28 use strict;
  5         7  
  5         121  
974 5     5   2067 use Tie::Handle;
  5         6471  
  5         92  
975 5     5   31 use Carp;
  5         7  
  5         218  
976 5     5   19 use vars qw(@ISA);
  5         6  
  5         959  
977             @ISA = qw(Tie::Handle);
978              
979             sub FILENO
980             {
981 56     56   112 my $self = shift;
982 56         348 return ($self->{_mux}->{_fhs}->{"$self->{_fh}"}->{fileno});
983             }
984              
985              
986             sub TIEHANDLE
987             {
988 7     7   17 my $package = shift;
989 7         14 my $mux = shift;
990 7         6 my $fh = shift;
991              
992 7         33 my $self = bless { _mux => $mux,
993             _fh => $fh } => $package;
994 7         31 return $self;
995             }
996              
997             sub WRITE
998             {
999 6     6   1537 my $self = shift;
1000 6         16 my ($msg, $len, $offset) = @_;
1001 6   50     37 $offset ||= 0;
1002 6         46 return $self->{_mux}->write($self->{_fh}, substr($msg, $offset, $len));
1003             }
1004              
1005             sub CLOSE
1006             {
1007 0     0     my $self = shift;
1008 0           return $self->{_mux}->shutdown($self->{_fh}, 2);
1009             }
1010              
1011             sub READ
1012             {
1013 0     0     carp "Do not read from a muxed file handle";
1014             }
1015              
1016             sub READLINE
1017             {
1018 0     0     carp "Do not read from a muxed file handle";
1019             }
1020              
1021             sub FETCH
1022             {
1023 0     0     return "Fnord";
1024             }
1025              
1026 0     0     sub UNTIE {}
1027              
1028             1;
1029              
1030             __END__