File Coverage

blib/lib/App/Diskd.pm
Criterion Covered Total %
statement 70 231 30.3
branch 0 66 0.0
condition 0 3 0.0
subroutine 24 53 45.2
pod n/a
total 94 353 26.6


line stmt bran cond sub pod time code
1             package App::Diskd;
2              
3 1     1   23056 use 5.014;
  1         4  
  1         28  
4 1     1   5 use strict;
  1         1  
  1         43  
5 1     1   4 use warnings;
  1         5  
  1         45  
6              
7             our $VERSION = '0.01';
8              
9 1     1   1212 use POE;
  1         62831  
  1         8  
10              
11             sub Daemon {
12              
13 0     0     print "Starting diskd in daemon mode\n";
14              
15 0           my $info = Local::Info->new;
16              
17 0           my $blkid_session = Local::DiskWatcher->new(info => $info,);
18 0           my $usock_session = Local::UnixSocketServer->new(info => $info);
19 0           my $multi_session = Local::MulticastServer->new(info => $info, ttl=>2);
20              
21 0           POE::Kernel->run();
22              
23             }
24              
25             sub Client {
26              
27 0     0     print "Starting diskd in client mode\n";
28              
29 0           my $usock_client = Local::UnixSocketClient->new;
30              
31 0           POE::Kernel->run();
32             }
33              
34             1;
35              
36             ##
37             ## The Info package is intended to provide a central area where we can
38             ## store details of known disks and hosts. It just provides some
39             ## useful get/set interfaces that the other packages can use.
40             ##
41              
42             package Local::Info;
43              
44 1     1   98628 use POE qw(Wheel::Run);
  1         2  
  1         6  
45              
46 1     1   38014 use Sys::Hostname;
  1         3  
  1         66  
47 1     1   1037 use Net::Nslookup;
  1         1103  
  1         468  
48              
49             sub new {
50 0     0     my $class = shift;
51              
52 0           my $hostname = hostname;
53 0           my ($ip) = nslookup $hostname;
54              
55 0           return bless {
56             this_host => $hostname,
57             this_ip => $ip,
58             temp_disk_list => [],
59             disks_by_ip => { $ip => {} },
60             update_time => {},
61             }, $class;
62             }
63              
64 0     0     sub our_ip { (shift)->{this_ip} }
65              
66             # We update local disk info in two phases, first creating a list to
67             # store them, then inserting the list into the live data all at once.
68             # This is so that we can handle disks being detached from the system
69             # between runs of blkid.
70              
71             sub add_our_disk_info {
72              
73 0     0     my ($self,$uuid,$label,$device) = @_;
74              
75 0           my $listref = $self->{temp_disk_list};
76              
77             # As a mnemonic for the order below, remember that UUIDs are more
78             # unique than labels, which in turn are more unique than device
79             # filenames.
80 0           push @$listref, [$uuid,$label,$device];
81              
82             }
83              
84             sub commit_our_disk_info {
85              
86 0     0     my $self = shift;
87 0           my $ip = $self->{this_ip};
88              
89             #warn "comitting new blkid data with " . (0+ @{$self->{temp_disk_list}}) .
90             # " entries\n";
91              
92 0           $self->{update_time}->{$ip} = time();
93 0           $self->{disks_by_ip}->{$ip} = $self->{temp_disk_list};
94 0           $self->{temp_disk_list} = [];
95              
96             # TODO: update "last seen" structures for each disk with a label/uuid.
97             # for each structure, map the label/uuid to [ip, timestamp] info.
98             }
99              
100             sub known_hosts {
101 0     0     my $self = shift;
102 0           return keys %{$self->{disks_by_ip}};
  0            
103             }
104             sub disks_by_host {
105 0     0     my ($self,$host) = @_;
106              
107             #warn "looking up host $host";
108 0 0         return undef unless exists $self->{disks_by_ip}->{$host};
109 0           return $self->{disks_by_ip}->{$host};
110             }
111              
112             #
113             # The routines used to pack and unpack a list of disks for
114             # transmission could take any form, really. The key things to consider
115             # are that (a) arbitrary spoofed data can't result in us introducing
116             # security issues (so solutions that involve eval'ing the packed data
117             # are out, unless we validate that the data is in the expected form)
118             # and (b) we take into consideration quoting issues (such as not using
119             # spaces as separators, since they may appear in disk labels). As it
120             # happens, YAML can solve both of these problems for us. It may not
121             # make best use of space, but at least it's quick and easy to
122             # implement.
123             #
124              
125 1     1   924 use YAML::XS;
  1         3670  
  1         316  
126              
127             # assume that we don't need to pack any disk list except our own
128             sub pack_our_disk_list {
129 0     0     my $self = shift;
130 0           my $ip = $self->{this_ip};
131              
132 0           return Dump $self->{disks_by_ip}->{$ip};
133             }
134              
135             # unpack incoming list of lists
136             sub unpack_disk_list {
137 0     0     my ($self,$host,$yaml) = @_;
138 0           my $ip = $self->{this_ip};
139              
140             # We shouldn't get here if the calling routine is doing its job right
141 0 0         if ($host eq $ip) {
142 0           warn "Fatal: caller requested unpack disk list with our IP address";
143 0           return undef;
144             }
145              
146 0           my $objref = Load $yaml;
147              
148             # Do some basic type checking on the unpacked object. We expect an
149             # array of arrays.
150 0 0         unless (ref($objref) eq "ARRAY") {
151 0           warn "unpacked disk list is not an ARRAY";
152 0           return undef;
153              
154 0           for (@$objref) {
155 0 0         unless (ref($_) eq "ARRAY") {
156 0           warn "unpacked disk element is not an ARRAY";
157 0           return undef;
158             }
159             }
160             }
161              
162 0           $self->{update_time}->{$host} = time();
163              
164 0           return $self->{disks_by_ip}->{$host} = $objref;
165             }
166              
167             #
168             # The remaining packages are used simply to achieve a clean separation
169             # between different POE sessions and to encapsulate related methods
170             # without having to worry about namespace issues (like ensuring event
171             # names and handler routines are unique across all sessions). As a
172             # consequence of having distinct sessions for each program area, when
173             # we need to have inter-session communication, we need to use POE's
174             # post method. An alias is also used to identify each of the sessions.
175             #
176              
177              
178             ##
179             ## The DiskWatcher package sets up a session to periodically run
180             ## blkid, parse the results and store them in our Info object. Since
181             ## blkid can sometimes hang (due to expected devices or media not
182             ## being present), a timer is set and if the command hasn't completed
183             ## within that timeout, the child process is killed and the child
184             ## session garbage collected.
185             ##
186              
187             package Local::DiskWatcher;
188              
189 1     1   11 use POE qw(Wheel::Run Filter::Line);
  1         1  
  1         10  
190              
191             sub new {
192              
193 0     0     my $class = shift;
194 0           my %args = (
195             program => '/sbin/blkid',
196             frequency => 10 * 60 * 1, # seconds between runs
197             timeout => 15,
198             info => undef,
199             @_
200             );
201              
202 0 0         die "DiskWatcher needs info => ref argument\n" unless defined($args{info});
203              
204             # by using package_states, POE event names will eq package methods
205 0           my @events =
206             qw(
207             _start start_child child_timeout got_child_stdout got_child_stderr
208             child_cleanup
209             );
210 0           my $session = POE::Session->create
211             (
212             package_states => [$class => \@events],
213             args => [%args],
214             );
215              
216 0           return bless { session => $session }, $class;
217             }
218              
219              
220             # Our _start event is solely concerned with extracting args and saving
221             # them in the heap. It then queues start_child to run the actual child
222             # process and timeout watcher.
223             sub _start {
224              
225             #print "DiskWatcher: _start args: ". (join ", ", @_). "\n";
226              
227 0     0     my ($kernel, $heap, %args) = @_[KERNEL, HEAP, ARG0 .. $#_];
228              
229 0           $heap->{timeout} = $args{timeout};
230 0           $heap->{info} = $args{info};
231 0           $heap->{program} = $args{program};
232 0           $heap->{delay} = $args{frequency};
233 0           $heap->{child} = undef;
234              
235 0           $kernel->yield('start_child');
236             }
237              
238             # start_child is responsible for running the program with a timeout
239             sub start_child {
240 0     0     my ($kernel, $heap) = @_[KERNEL, HEAP];
241              
242             # Using a named timer for timeouts. Set it to undef to deactivate.
243 0           $kernel->delay(child_timeout => $heap->{timeout});
244              
245 0           $heap->{child} = POE::Wheel::Run->new(
246             Program => [$heap->{program}],
247             StdioFilter => POE::Filter::Line->new(),
248             StderrFilter => POE::Filter::Line->new(),
249             StdoutEvent => "got_child_stdout",
250             StderrEvent => "got_child_stderr",
251             CloseEvent => "child_cleanup",
252             );
253 0           $kernel->sig_child($heap->{child}->PID, "child_cleanup");
254              
255             # queue up the next run of this event
256 0           $kernel->delay(start_child => $heap->{delay});
257             }
258              
259             # if the child process didn't complete within the timeout, we kill it
260             sub child_timeout {
261 0     0     my ($heap) = $_[HEAP];
262 0           my $child = $heap->{child};
263              
264 0           warn "CHILD KILL TIMEOUT";
265 0 0         warn "diskid failed to send kill signal\n" unless $child->kill();
266              
267             # The kernel should eventually receive a SIGCHLD after this
268             }
269              
270             # For our purposes, we don't care whether the child exited by closing
271             # its output or throwing a SIGCHLD. Wrap the deletion of references to
272             # the child in if(defined()) to avoid warnings.
273             sub child_cleanup {
274              
275             #print "DiskWatcher: child_cleanup args: ". (join ", ", @_). "\n";
276              
277 0     0     my ($heap,$kernel) = @_[HEAP,KERNEL];
278              
279             # Deactivate the kill timer
280 0           $kernel->delay(child_timeout => undef);
281              
282             # We need to commit the new list of disks and recycle the child
283             # object. Both of these should only be called once, even if this
284             # routine is called twice.
285 0 0         if (defined($heap->{child})) {
286 0           my $info = $heap->{info};
287 0           $info->commit_our_disk_info;
288              
289 0           delete $heap->{child};
290             }
291             }
292              
293             # Consume a single line of output (thanks to using POE::Filter::Line)
294             sub got_child_stdout {
295 0     0     my ($heap,$_) = @_[HEAP,ARG0];
296              
297 0           my ($uuid,$label,$device) = ();
298              
299 0 0         $uuid = $1 if /UUID=\"([^\"]+)/;
300 0 0         $label = $1 if /LABEL=\"([^\"]+)/;
301 0 0         $device = $1 if /^(.*?):/;
302              
303 0 0         return unless defined($device); # we'll silently fail if blkid
304             # output format is not as expected.
305 0 0 0       return unless defined($label) or defined($uuid);
306              
307 0           my $info = $heap->{info};
308              
309             # the call to add_our_disk_info just queues the update, then when we
310             # clean up this child, we'll instruct info to "commit" the update.
311             # This is needed to take care of removing old disks that are no
312             # longer attached.
313 0           $info->add_our_disk_info($uuid,$label,$device);
314              
315             # print "STDOUT: $_\n";
316             }
317              
318             # Echo any stderr from the child
319             sub got_child_stderr {
320 0     0     my ($heap,$stderr,$wheel) = @_[HEAP, ARG0, ARG1];
321 0           my $child = $heap->{child};
322 0           my $pid = $child->PID;
323 0           warn "blkid $pid> $stderr\n";
324             }
325              
326             ##
327             ## The MountWatcher package will be responsible for periodically
328             ## running mount to determine which of the known disks are actually
329             ## mounted. It will follow pretty much the same approach as for the
330             ## DiskWatcher package.
331             ##
332              
333             package Local::MountWatcher;
334              
335 1     1   1581 use POE qw(Wheel::Run);
  1         3  
  1         5  
336              
337              
338              
339             ##
340             ## The MulticastServer package handles connection to a multicast group
341             ## and sending and receving messages across it.
342             ##
343              
344             package Local::MulticastServer;
345              
346 1     1   358 use POE;
  1         1  
  1         6  
347 1     1   10566 use IO::Socket::Multicast;
  1         28971  
  1         6  
348              
349 1     1   917 use constant DATAGRAM_MAXLEN => 1500;
  1         2  
  1         75  
350 1     1   23 use constant MCAST_PORT => 32003;
  1         2  
  1         42  
351 1     1   5 use constant MCAST_GROUP => '230.1.2.3';
  1         1  
  1         85  
352 1     1   5 use constant MCAST_DESTINATION => MCAST_GROUP . ':' . MCAST_PORT;
  1         2  
  1         792  
353              
354             sub new {
355              
356 0     0     my $class = shift;
357 0           my %opts = (
358             initial_delay => 5,
359             frequency => 10 * 60,
360             info => undef,
361             ttl => 1, # set >1 to traverse routers
362             @_
363             );
364              
365 0 0         die "UnixSocketServer::new requires info => \$var option\n"
366             unless defined($opts{info});
367              
368 0           my $session =
369             POE::Session->create(
370             inline_states => {
371             _start => \&peer_start,
372             get_datagram => \&peer_read,
373             send_something => \&send_something,
374             },
375             heap => {
376             initial_delay => $opts{initial_delay},
377             frequency => $opts{frequency},
378             info => $opts{info},
379             ttl => $opts{ttl},
380             },
381             );
382              
383 0           return bless { session => $session }, $class;
384             }
385              
386             # Set up the peer socket.
387              
388             sub peer_start {
389 0     0     my ($kernel,$heap) = @_[KERNEL, HEAP];
390              
391             # Don't specify an address.
392 0 0         my $socket = IO::Socket::Multicast->new(
393             LocalPort => MCAST_PORT,
394             ReuseAddr => 1,
395             #ReusePort => 1,
396             ) or die $!;
397              
398 0           $socket->mcast_ttl($heap->{ttl});
399              
400 0 0         $socket->mcast_add(MCAST_GROUP) or die $!;
401              
402             # Don't mcast_loopback(0). This disables multicast datagram
403             # delivery to all peers on the interface. Nobody gets data.
404              
405             # Begin watching for multicast datagrams.
406 0           $kernel->select_read($socket, "get_datagram");
407              
408             # Save socket in the heap
409 0           $heap->{socket} = $socket;
410              
411             # delay sending the first packet to give DiskWatcher a chance to complete
412 0           $kernel->delay(send_something => $heap->{initial_delay});
413              
414             # Send something once a second. Pass the socket as a continuation.
415             # $kernel->delay(send_something => $heap->{frequency}, $socket);
416             }
417              
418             # Receive a datagram when our socket sees it.
419              
420             sub peer_read {
421 0     0     my ($kernel, $heap, $socket) = @_[KERNEL, HEAP, ARG0];
422 0           my $info = $heap->{info};
423              
424 0           my $remote = recv($socket, my $message = "", DATAGRAM_MAXLEN, 0);
425              
426 0 0         if (defined $remote) {
427              
428 0           my ($peer_port, $peer_addr) = unpack_sockaddr_in($remote);
429 0           my $ip = inet_ntoa($peer_addr);
430              
431 0 0         if ($message =~ s/^diskd://) {
432             #print "Valid datagram received from $ip : $peer_port ... $message\n";
433 0 0         $info->unpack_disk_list($ip, $message) unless $ip eq $info->our_ip;
434             } else {
435 0           warn "Unexpected/malformed packet from $ip:$peer_port ... $message\n";
436             }
437              
438             } else {
439              
440 0           warn "multicast recv error (ignored) $!\n";
441             }
442             }
443              
444             # Periodically send the list of disks
445              
446             sub send_something {
447 0     0     my ($kernel, $heap) = @_[KERNEL, HEAP];
448 0           my $info = $heap->{info};
449 0           my $socket = $heap->{socket};
450 0           my $delay = $heap->{frequency};
451              
452             # my $message = "pid $$ sending at " . time() . " to " . MCAST_DESTINATION;
453 0           my $message = "diskd:" . $info->pack_our_disk_list;
454              
455 0 0         warn $! unless $socket->mcast_send($message, MCAST_DESTINATION);
456              
457 0           $kernel->delay(send_something => $delay);
458             }
459              
460              
461             ##
462             ## The UnixSocketServer package uses a Unix domain socket to provide a
463             ## local ineterface to the disk info and a means of sending commands
464             ## or messages to other nodes in our multicast network.
465             ##
466             ## This package comprises a main server package (UnixSocketServer)
467             ## that waits for connections to the socket, and and a package that's
468             ## spawned for each incoming connection (UnixSocketServer::Session).
469             ##
470              
471             package Local::UnixSocketServer;
472              
473 1     1   7 use POE qw(Wheel::SocketFactory Wheel::ReadWrite);
  1         2  
  1         8  
474 1     1   21204 use Socket; # For PF_UNIX.
  1         2  
  1         1112  
475              
476             # Start server at a particular rendezvous (ie, Unix domain socket)
477             sub new {
478 0     0     my $class = shift;
479 0           my $homedir = $ENV{HOME};
480 0           my %opts =
481             (
482             rendezvous => "$homedir/.diskd-socket",
483             info => undef,
484             @_,
485             );
486              
487             # warn "class: $class; opts: " . (join ", ", @_);
488              
489 0 0         die "UnixSocketServer::new requires info => \$var option\n"
490             unless defined($opts{info});
491              
492 0           POE::Session->create(
493             inline_states => {
494             _start => \&server_started,
495             got_client => \&server_accepted,
496             got_error => \&server_error,
497             },
498             heap => {
499             rendezvous => $opts{rendezvous},
500             info => $opts{info}
501             },
502             );
503             }
504              
505             # The server session has started. Create a socket factory that
506             # listens for UNIX socket connections and returns connected sockets.
507             # This unlinks the rendezvous socket
508             sub server_started {
509 0     0     my ($kernel, $heap) = @_[KERNEL, HEAP];
510 0 0         unlink $heap->{rendezvous} if -e $heap->{rendezvous};
511 0           $heap->{server} = POE::Wheel::SocketFactory->new(
512             SocketDomain => PF_UNIX,
513             BindAddress => $heap->{rendezvous},
514             SuccessEvent => 'got_client',
515             FailureEvent => 'got_error',
516             );
517             }
518              
519             # The server encountered an error while setting up or perhaps while
520             # accepting a connection. Register the error and shut down the server
521             # socket. This will not end the program until all clients have
522             # disconnected, but it will prevent the server from receiving new
523             # connections.
524             sub server_error {
525 0     0     my ($heap, $syscall, $errno, $error) = @_[HEAP, ARG0 .. ARG2];
526 0 0         $error = "Normal client disconnection." unless $errno;
527 0           warn "Server socket encountered $syscall error $errno: $error\n";
528 0           delete $heap->{server};
529             }
530              
531             # The server accepted a connection. Start another session to process
532             # data on it.
533             sub server_accepted {
534 0     0     my ($heap,$client_socket) = @_[HEAP, ARG0];
535 0           my $info = $heap->{info};
536 0           Local::UnixSocketServer::Session->new($client_socket, $info);
537             }
538              
539             ## A UnixSocketServer::Session instance is created for each incoming
540             ## connection.
541              
542             package Local::UnixSocketServer::Session;
543              
544 1     1   8 use POE::Session;
  1         3  
  1         7  
545              
546             # Constructor
547             sub new {
548 0     0     my ($class,$socket,$info) = @_;
549             #warn "new $class: $socket, $info";
550 0           POE::Session->create(
551             package_states => [ $class => [qw( _start session_input session_error)] ],
552             args => [$info, $socket],
553             );
554             }
555              
556             # The server session has started. Wrap the socket it's been given in
557             # a ReadWrite wheel. ReadWrite handles the tedious task of performing
558             # buffered reading and writing on an unbuffered socket.
559             sub _start {
560 0     0     my ($heap, $info, $socket) = @_[HEAP, ARG0, ARG1];
561 0           $heap->{client} = POE::Wheel::ReadWrite->new(
562             Handle => $socket,
563             InputEvent => 'session_input',
564             ErrorEvent => 'session_error',
565             # InputEvent => 'got_client_input',
566             # ErrorEvent => 'got_client_error',
567             );
568 0           $heap->{info}=$info;
569 0           $heap->{client}->put("diskd local interface awaiting commands\n");
570             }
571              
572             # The server session received some input from its attached client.
573             # Echo it back.
574             sub session_input {
575 0     0     my ($heap, $_) = @_[HEAP, ARG0];
576 0           my $info = $heap->{info};
577              
578 0           chomp;
579              
580 0 0         if (/^help\b/i) {
    0          
    0          
    0          
    0          
    0          
581 0           $heap->{client}->put
582             ("Available commands:\n" .
583             "list show disk info\n" .
584             "where show last known location of disk\n" .
585             "localhost report local hostname, IP address\n" .
586             "status show network statistics\n" .
587             "debug start monitoring notable events\n" .
588             "quit|exit exit client" # handled client-side
589             );
590             } elsif (/^list\b/i) {
591              
592 0           my $output = '';
593 0           foreach my $host ($info->known_hosts) {
594             #warn "Got host $host";
595 0           foreach my $listref (@{$info->disks_by_host($host)}) {
  0            
596             # Perl lets us use hash slices as well as array slices
597 0           my ($uuid, $label, $device) = @$listref;
598 0 0         $uuid = '' unless defined $uuid;
599 0 0         $label = '' unless defined $label;
600 0 0         $device = '' unless defined $device;
601              
602 0           $output.= sprintf("%-15s %-37s %-10s %s\n",
603             "$host:",$uuid,$label,$device);
604             }
605             }
606 0           $heap->{client}->put($output);
607              
608             } elsif (/^where\b/i) {
609 0 0         if (/^where\b\s+(\S+)/i) {
610              
611             } else {
612 0           $heap->{client}->put("'where' requires a disk label or uuid\n");
613             }
614             } elsif (/^localhost\b/i) {
615              
616             } elsif (/^status\b/i) {
617              
618             } elsif (/^debug\b/i) {
619              
620             } else {
621 0           $heap->{client}->put("unknown command: $_\n");
622             }
623             }
624              
625             # The server session received an error from the client socket. Log
626             # the error and shut down this session. The main server remains
627             # untouched by this.
628             sub session_error {
629 0     0     my ($heap, $syscall, $errno, $error) = @_[HEAP, ARG0 .. ARG2];
630 0 0         $error = "Normal disconnection." unless $errno;
631 0           warn "Server session encountered $syscall error $errno: $error\n";
632 0           delete $heap->{client};
633             }
634              
635              
636             package Local::UnixSocketClient;
637              
638             # This program is a simple unix socket client. It will connect to the
639             # UNIX socket specified by $rendezvous. This program is written to
640             # work with the UnixServer example in POE's cookbook. While it
641             # touches upon several POE modules, it is not meant to be an
642             # exhaustive example of them. Please consult "perldoc [module]" for
643             # more details.
644              
645 1     1   1029 use Socket qw(AF_UNIX);
  1         3  
  1         55  
646 1     1   7 use POE; # For base features.
  1         2  
  1         6  
647 1     1   451 use POE::Wheel::SocketFactory; # To create sockets.
  1         10  
  1         25  
648 1     1   6 use POE::Wheel::ReadWrite; # To read/write lines with sockets.
  1         3  
  1         20  
649 1     1   2835 use POE::Wheel::ReadLine; # To read/write lines on the console.
  0            
  0            
650              
651             # Specify a UNIX rendezvous to use. This is the location the client
652             # will connect to, and it should correspond to the location a server
653             # is listening to.
654             our $rendezvous;
655              
656             sub new {
657             my $class = shift;
658             my $homedir = $ENV{HOME};
659             my %opts =
660             (
661             rendezvous => "$homedir/.diskd-socket",
662             @_,
663             );
664              
665             $rendezvous = $opts{rendezvous};
666              
667             # Create the session that will pass information between the console
668             # and the server. The create() constructor maps a number of events
669             # to the functions that will be called to handle them. For example,
670             # the "sock_connected" event will cause the socket_connected()
671             # function to be called.
672             POE::Session->create(
673             inline_states => {
674             _start => \&client_init,
675             sock_connected => \&socket_connected,
676             sock_error => \&socket_error,
677             sock_input => \&socket_input,
678             cli_input => \&console_input,
679             },
680             );
681             }
682              
683             # The client_init() function is called when POE sends a "_start" event
684             # to the session. This happens automatically whenever a session is
685             # created, and its purpose is to notify your code when it can begin
686             # doing things.
687             # Here we create the SocketFactory that will connect a socket to the
688             # server. The socket factory is tightly associated with its session,
689             # so it is kept in the session's private storage space (its "heap").
690             # The socket factory is configured to emit two events: On a successful
691             # connection, it sends a "sock_connected" event containing the new
692             # socket. On a failure, it sends "sock_error" along with information
693             # about the problem.
694             sub client_init {
695             my $heap = $_[HEAP];
696             $heap->{connect_wheel} = POE::Wheel::SocketFactory->new(
697             SocketDomain => AF_UNIX,
698             RemoteAddress => $rendezvous,
699             SuccessEvent => 'sock_connected',
700             FailureEvent => 'sock_error',
701             );
702             }
703              
704             # socket_connected() is called when the session receives a
705             # "sock_connected" event. That event is generated by the session's
706             # SocketFactory object when it has connected to a server. The newly
707             # connected socket is passed in ARG0.
708             # This function discards the SocketFactory object since its purpose
709             # has been fulfilled. It then creates two new objects: a ReadWrite
710             # wheel to talk with the socket, and a ReadLine wheel to talk with the
711             # console. POE::Wheel::ReadLine was named after Term::ReadLine, by
712             # the way. Once socket_connected() has set us up the wheels, it calls
713             # ReadLine's get() method to prompt the user for input.
714             sub socket_connected {
715             my ($heap, $socket) = @_[HEAP, ARG0];
716             delete $heap->{connect_wheel};
717             $heap->{io_wheel} = POE::Wheel::ReadWrite->new(
718             Handle => $socket,
719             InputEvent => 'sock_input',
720             ErrorEvent => 'sock_error',
721             );
722             $heap->{cli_wheel} = POE::Wheel::ReadLine->new(InputEvent => 'cli_input');
723             $heap->{cli_wheel}->get("=> ");
724             }
725              
726             # socket_input() is called to handle "sock_input" events. These
727             # events are provided by the POE::Wheel::ReadWrite object that was
728             # created in socket_connected().
729             # socket_input() moves information from the socket to the console.
730             sub socket_input {
731             my ($heap, $input) = @_[HEAP, ARG0];
732             $heap->{cli_wheel}->put("$input");
733             }
734              
735             # socket_error() is called to handle "sock_error" events. These
736             # events can come from two places: The SocketFactory will send it if a
737             # connection fails, and the ReadWrite object will send it if a read or
738             # write error occurs.
739             # The most common way to handle I/O errors is to shut down the sockets
740             # having problems. Here we'll delete all our wheels so the program
741             # can shut down gracefully.
742             # ARG0 contains the name of the syscall that failed. It is often
743             # "connect" or "bind" or "read" or "write". ARG1 and ARG2 contain the
744             # numeric and descriptive contents of $! at the time of the failure.
745             sub socket_error {
746             my ($heap, $syscall, $errno, $error) = @_[HEAP, ARG0 .. ARG2];
747             $error = "Normal disconnection." unless $errno;
748             warn "Client socket encountered $syscall error $errno: $error";
749             delete $heap->{connect_wheel};
750             delete $heap->{io_wheel};
751             delete $heap->{cli_wheel};
752             }
753              
754             # Finally, the console_input() function is called to handle
755             # "cli_input" events. These events are created when
756             # POE::Wheel::ReadLine (created in socket_connected()) receives user
757             # input from the console.
758             # Plain input is registered with ReadLine's input history, echoed back
759             # to the console, and sent to the server. Exceptions, such as when
760             # the user presses Ctrl+C to interrupt the program, are also handled.
761             # POE::Wheel::ReadLine events include two parameters other than the
762             # usual KERNEL, HEAP, etc. The ARG0 parameter contains plain input.
763             # If that's undefined, then ARG1 will contain an exception.
764             sub console_input {
765             my ($heap, $input, $exception) = @_[HEAP, ARG0, ARG1];
766             if (defined $input) {
767             $heap->{cli_wheel}->addhistory($input);
768             # $heap->{cli_wheel}->put("You Said: $input");
769             $heap->{io_wheel}->put($input);
770             }
771             elsif ($exception eq 'cancel') {
772             $heap->{cli_wheel}->put("Canceled.");
773             }
774             else {
775             $heap->{cli_wheel}->put("Bye.");
776             delete $heap->{cli_wheel};
777             delete $heap->{io_wheel};
778             return;
779             }
780              
781             # Prompt for the next bit of input.
782             $heap->{cli_wheel}->get("=> ");
783             }
784              
785             1;
786              
787             __END__