File Coverage

blib/lib/Net/BEEP/Lite/Session.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             # $Id: Session.pm,v 1.13 2004/04/22 20:45:58 davidb Exp $
2             #
3             # Copyright (C) 2003 Verisign, Inc.
4             #
5             # This library is free software; you can redistribute it and/or
6             # modify it under the terms of the GNU Lesser General Public
7             # License as published by the Free Software Foundation; either
8             # version 2.1 of the License, or (at your option) any later version.
9             #
10             # This library is distributed in the hope that it will be useful,
11             # but WITHOUT ANY WARRANTY; without even the implied warranty of
12             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13             # Lesser General Public License for more details.
14             #
15             # You should have received a copy of the GNU Lesser General Public
16             # License along with this library; if not, write to the Free Software
17             # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18             # USA
19              
20              
21             package Net::BEEP::Lite::Session;
22              
23             =head1 NAME
24              
25             Net::BEEP::Lite::Session
26              
27             =head1 DESCRIPTION
28              
29             This is a base class for BEEP sessions. It handles core tasks common
30             to both server and client sessions. This class isn't intended to be
31             used directly. Instead, use one of its subclasses.
32              
33             Note that in reality, this is really a TCP session. It is not
34             abstracted away from the TCP transport for BEEP. In the future it is
35             possible that it will be and a new TCPSession subclass will be
36             created.
37              
38             =cut
39              
40 3     3   31957 use IO::Socket;
  3         125880  
  3         18  
41              
42 3     3   6110 use Net::BEEP::Lite::Channel;
  3         12  
  3         106  
43 3     3   1633 use Net::BEEP::Lite::MgmtProfile;
  0            
  0            
44              
45             use Carp;
46             use strict;
47             use warnings;
48              
49             =head1 CONSTRUCTOR
50              
51             =over 4
52              
53             =item new( I )
54              
55             This is the main constructor. It takes a named parameter list as its
56             argument. See the C method for a list of valid parameter
57             names.
58              
59              
60             =back
61              
62             =cut
63              
64             sub new {
65             my $this = shift;
66             my $class = ref($this) || $this;
67              
68             my $self = {};
69             bless $self, $class;
70              
71             $self->initialize(@_);
72              
73             $self;
74             }
75              
76             =head1 METHODS
77              
78             =over 4
79              
80             =item initialize( I )
81              
82             Initialize the object. This is generally called by the constructor
83             and subclasses. It takes the following named parameters:
84              
85             =over 4
86              
87             =item Socket
88              
89             the socket associated with this session.
90              
91             =item NoGreeting
92              
93             Do not send the greeting message. This can be sent later with the
94             C method. This will be true if a socket isn't
95             supplied.
96              
97             =item DefaultLocalWindow
98              
99             Set the base local TCP window to a particular value. This number
100             should be 4096 (the default) or higher.
101              
102             =item IdleTimeout
103              
104             The number of seconds to wait for a frame. Zero (the default) means
105             to wait indefinitely.
106              
107             =item Timeout
108              
109             The number of seconds to wait for a frame body to be completely read.
110             This should ususally be non-zero to prevent framing errors from
111             locking the session forever. The default is 30 seconds.
112              
113             =back
114              
115             It also takes the named parameters for C.
116              
117             =cut
118              
119             sub initialize {
120             my $self = shift;
121             my %args = @_;
122              
123             # some defaults:
124              
125             $self->{debug} = 0;
126             $self->{trace} = 0;
127              
128             # we assume the initiator role. the listener subclass should set
129             # this to 2.
130             $self->{channelno_counter} = 1;
131              
132             # create our management profile. FIXME: we may want to allow this
133             # to be passed in (and only instantiate it once for all sessions.)
134             $self->{mgmt_profile} = Net::BEEP::Lite::MgmtProfile->new(%args);
135              
136             # our local profiles.
137             $self->{profiles} = {};
138             # the remote profiles.
139             $self->{remote_profiles} = {};
140              
141             # our channels. basically, a hash of channel number to channel
142             # object.
143             $self->{channels} = {};
144              
145             # The default size of our local windows. This should be at least
146             # 4096.
147             $self->{default_local_window} = 4096;
148              
149             # assign the management profile to channel zero.
150             $self->_add_channel(0, $self->{mgmt_profile});
151             $self->channel(0)->msgno(1); # start msgno at one because of
152             # greeting RPY.
153              
154             # our general received message queue;
155             $self->{messages} = [];
156              
157             # our default idle timeout
158             $self->{idle_timeout} = 0;
159             # our default read timeout for frame bodies.
160             $self->{timeout} = 60;
161              
162             for (keys %args) {
163             my $val = $args{$_};
164              
165             /^socket/io and do {
166             $self->{sock} = $val;
167             next;
168             };
169             /^no.?greeting$/io and do {
170             $self->{_no_greeting} = $val;
171             next;
172             };
173             /^default.?local.?window$/io and do {
174             $self->{default_local_window} = $val;
175             next;
176             };
177             /^debug$/io and do {
178             $self->{debug} = $val;
179             next;
180             };
181             /^trace$/io and do {
182             $self->{trace} = $val;
183             next;
184             };
185             /^idle.?timeout/io and do {
186             $self->{idle_timeout} = $val;
187             next;
188             };
189             /^timeout$/io and do {
190             $self->{timeout} = $val;
191             next;
192             };
193             }
194              
195             # set NoGreeting to true if the socket wasn't set or isn't
196             $self->{_no_greeting} = 1 if (! $self->{sock} or
197             (ref($self->{sock}) and
198             !$self->{sock}->isa('IO::Socket')));
199             }
200              
201             =item socket()
202              
203             Returns the internal socket.
204              
205             =cut
206              
207             sub _socket {
208             my $self = shift;
209              
210             $self->{sock};
211             }
212              
213             =item _set_socket($socket)
214              
215             Change the session's internal socket to the supplied socket. Only use
216             this if you know what you are doing.
217              
218             =cut
219              
220             sub _set_socket {
221             my $self = shift;
222             my $socket = shift;
223              
224             $self->{sock} = $socket;
225             }
226              
227             =item _next_channel_number()
228              
229             This returns the next channel number to request.
230              
231             =cut
232              
233             sub _next_channel_number {
234             my $self = shift;
235              
236             my $res = $self->{channelno_counter};
237             $self->{channelno_counter} += 2;
238             $self->{channelno_counter} %= 2147483648;
239              
240             $res;
241             }
242              
243             =item _add_channel($channel_number, [$profile, [$local_window_size]])
244              
245             This is called by the management profile upon receiving a or
246             message from the peer. If $profile is provided, then this
247             will bind that profile to the channel.
248              
249             =cut
250              
251             sub _add_channel {
252             my $self = shift;
253             my $number = shift;
254             my $profile = shift;
255             my $window = shift || $self->{default_local_window};
256              
257             $self->{channels}->{$number} = new Net::BEEP::Lite::Channel
258             (Number => $number,
259             Profile => $profile,
260             Window => $window,
261             Debug => $self->{debug},
262             Trace => $self->{trace});
263             }
264              
265             =item _del_channel($channel_number)
266              
267             This is called by profiles when it needs to close a channel (either a
268             close request on the channel or a tuning reset (see
269             C<_del_all_channels>).
270              
271             =cut
272              
273             sub _del_channel {
274             my $self = shift;
275             my $number = shift;
276              
277             delete $self->{channels}->{$number};
278             }
279              
280             =item _del_all_channels()
281              
282             Close (and destroy) all current channels. This is most likely to be
283             done as part of a tuning reset. You will have to re-add channel zero
284             after this.
285              
286             =cut
287              
288             sub _del_all_channels {
289             my $self = shift;
290              
291             for my $n (keys %{$self->{channels}}) {
292             $self->_del_channel($n);
293             }
294             }
295              
296             =item add_local_profile($profile)
297              
298             This method will add a (local) profile to the session. This will be
299             advertised in the greeting message.
300              
301             =cut
302              
303             sub add_local_profile {
304             my $self = shift;
305             my $p = shift;
306              
307             $self->{profiles}->{$p->uri()} = $p;
308             }
309              
310             =item get_local_profile($uri)
311              
312             Returns the profile implementation associated with the given uri.
313              
314             =cut
315              
316             sub get_local_profile {
317             my $self = shift;
318             my $uri = shift;
319              
320             $self->{profiles}->{$uri};
321             }
322              
323             =item get_local_profile_uris()
324              
325             Returns the list of profile URIs currently being advertised by this peer.
326              
327             =cut
328              
329             sub get_local_profile_uris {
330             my $self = shift;
331             keys %{$self->{profiles}};
332             }
333              
334             =item del_local_profile($uri)
335              
336             Removes a local profile.
337              
338             =cut
339              
340             sub del_local_profile {
341             my $self = shift;
342             my $uri = shift;
343              
344             delete $self->{profiles}->{$uri};
345             }
346              
347             =item add_remote_profile($uri)
348              
349             This method is used internally when the remote peer advertises a
350             profile in the greeting message.
351              
352             =cut
353              
354             sub add_remote_profile {
355             my $self = shift;
356             my $uri = shift;
357              
358             $self->{remote_profiles}->{$uri} = 1;
359             }
360              
361             =item has_remote_profile($uri)
362              
363             This method returns true if the remote profile advertised the given
364             profile uri, false otherwise.
365              
366             =cut
367              
368             sub has_remote_profile {
369             my $self = shift;
370             my $uri = shift;
371              
372             $self->{remote_profiles}->{$uri};
373             }
374              
375             =item remote_profiles()
376              
377             This method returns a list of the peer's advertised profiles.
378              
379             =cut
380              
381             sub remote_profiles {
382             my $self = shift;
383              
384             keys %{$self->{remote_profiles}};
385             }
386              
387             =item num_open_channels()
388              
389             Return the number of open channels associated with this session. This
390             does not include channel zero.
391              
392             =cut
393              
394             sub num_open_channels {
395             my $self = shift;
396              
397             # return the number of open channels, not including channel zero.
398             (scalar keys %{$self->{channels}}) - 1;
399             }
400              
401             =item channel($channel_number)
402              
403             Returns the C object for the given channel
404             number.
405              
406             =cut
407              
408             sub channel {
409             my $self = shift;
410             my $chno = shift;
411              
412             $self->{channels}->{$chno};
413             }
414              
415             =item servername([$val])
416              
417             Returns or sets the session's server name. This is normally set when
418             the first "" message is encountered with a "serverName"
419             attribute.
420              
421             =cut
422              
423             sub servername {
424             my $self = shift;
425             my $name = shift;
426              
427             $self->{servername} = $name if $name;
428              
429             $self->{servername};
430             }
431              
432             =item _tuning_reset([$send_greeting])
433              
434             This does the full tuning reset: close all channels, delete pending
435             messages in the message queue, recreate channel zero, and (optionally)
436             re-send the greeting. This defaults to sending the greeting.
437              
438             This is normally called when a profile negotiates a security layer
439             (i.e., TLS or maybe SASL/DIGEST-MD5's encryption).
440              
441             =cut
442              
443             sub _tuning_reset {
444             my $self = shift;
445             my $send_greeting = shift || 1;
446              
447             $self->_del_all_channels();
448             $self->{messages} = [];
449             $self->{remote_profiles} = {};
450             $self->_add_channel(0, $self->{mgmt_profile});
451             $self->send_greeting() if $send_greeting;
452             }
453              
454             =item send_greeting()
455              
456             Send the greeting message to the peer, and handle the greeting coming
457             from the peer. It will advertise any profiles that have been
458             configured in the session. Normally, this method is called as part of
459             the initialization process of the subclasses of this class.
460              
461             =cut
462              
463             sub send_greeting {
464             my $self = shift;
465              
466             # send the greeting message.
467             $self->{mgmt_profile}->send_greeting_message($self);
468              
469             # handle the remote greeting.
470             my $peer_greeting = $self->_recv_message();
471             $self->{mgmt_profile}->handle_message($self, $peer_greeting);
472             }
473              
474             =item send_message($message)
475              
476             This will send a BEEP message to the peer (over the channel specified
477             in the message). This will handle possible fragmentation due to the
478             channel window size.
479              
480             =cut
481              
482             sub send_message {
483             my $self = shift;
484             my $message = shift;
485              
486             my $chno = $message->channel_number();
487             my $channel = $self->channel($chno);
488              
489             croak "send_message: message is on non-existent channel $chno"
490             if not $channel;
491              
492             # New messages should never set msgno, and we will override it here
493             # if it has. Replies (RPY, ANS, ERR, NULL) should have the same
494             # msgno and the MSG to which they are replying.
495             if ($message->type() eq 'MSG') {
496             carp "MSG messages should NOT have a pre-set message number"
497             if defined $message->msgno();
498             $message->msgno($channel->next_msgno());
499             }
500             else {
501             if (not defined $message->msgno()) {
502             carp "non-MSG message should have a pre-set message number";
503             $message->msgno($channel->next_msgno());
504             }
505             }
506              
507             while ($message->has_more_frames()) {
508             my $window = $channel->remote_window();
509              
510             # if there is no more space on this channel, switch to reading for
511             # a bit while we wait for the channel to open.
512             if ($window == 0) {
513             $self->_read_for_seq($chno);
514              
515             $window = $channel->remote_window();
516             next if $window == 0;
517             }
518              
519             my $seqno = $channel->seqno();
520             # calculate the next frame
521             my $frame = $message->next_frame($seqno, $window);
522             # actually send the frame.
523             $self->_write_frame($frame);
524             # update our current sequence number.
525             $channel->update_seqno($frame->size());
526             # and adjust the remote window.
527             $channel->remote_window($window - $frame->size());
528             }
529             }
530              
531              
532             =item _recv_message()
533              
534             This will fetch the next message from the peer, returning a message
535             object. It will handle reassembling a fragmented message. It will
536             return the first complete message received on any (existing) channel.
537             It will discard frames on non-existent channels, issuing a warning.
538              
539             This method will block. It will return undef if it is not possible to
540             read from the socket, otherwise it will return the message.
541              
542             =cut
543              
544             sub _recv_message {
545             my $self = shift;
546             my %args = @_;
547              
548             # first try and return a message off the queue.
549             my $message = $self->_dequeue_message();
550             return $message if $message;
551              
552             # otherwise, we read one from the socket.
553              
554             # This handles interleaved frames for messages on different
555             # channels, or ANS messages on the same (or different channels).
556             # The channels have slots for building messages of both types. The
557             # first time we see a completing frame, we return that message.
558              
559             while (1) {
560             # get the next frame from the socket (will block here).
561             my $frame = $self->_recv_frame(%args);
562              
563             # our frame will have already gone through SEQ processing. it
564             # will also have been added to the appropriate message building
565             # slot.
566              
567             next if $frame->type() eq 'SEQ';
568              
569             my $channel = $self->channel($frame->channel_number());
570              
571             # if we have a completing frame, we need to pull the complete
572             # message from its build slot and clear it.
573              
574             if ($frame->completes()) {
575             my $message;
576              
577             if ($frame->type() eq 'ANS') {
578             $message = $channel->ans_message($frame->ansno());
579             $channel->ans_clear_message($frame->ansno());
580             }
581             else {
582             $message = $channel->message();
583             $channel->clear_message();
584             }
585              
586             return $message;
587             }
588             }
589             }
590              
591             =item recv_message()
592              
593             This will fetch the next message (on any defined channel other than
594             zero) from the peer, returning a message object. It will handle
595             reassembling a fragmented message. This will directly handle channel
596             zero messages, so this isn't all that useful for handling replies to
597             management channel messages.
598              
599             =cut
600              
601             sub recv_message {
602             my $self = shift;
603             my %args = @_;
604              
605             my $message = undef;
606              
607             while (1) {
608             $message = $self->_recv_message(%args);
609              
610             last if $message->channel_number() != 0;
611              
612             $self->{mgmt_profile}->handle_message($self, $message);
613             return 0 if not $self->_is_connected();
614             }
615             $message;
616             }
617              
618              
619             =item close_session()
620              
621             close the entire session. Normally, this should only be called after
622             sending or receiving the "" message. It can also be used in fatal
623             error situations.
624              
625             =cut
626              
627             sub close_session {
628             my $self = shift;
629              
630             # we should have already sent or received the "ok" message by now
631             # (unless we are aborting)
632             $self->{sock}->close();
633             $self->{sock} = undef;
634             print STDERR "closed socket\n" if $self->{debug};
635             }
636              
637             sub abort {
638             my $self = shift;
639             my $message = shift;
640              
641             print STDERR "aborting: $message\n" if $message && $self->{debug};
642             confess "abort: $message\n" if $self->{trace};
643             $self->close_session();
644              
645             die "aborted\n";
646             }
647              
648              
649             =item _read_for_seq([$channel_number])
650              
651             Read frames from the socket until receiving a SEQ frame. If
652             $channel_number is provided, then read until a SEQ frame on that
653             channel has been read. Non SEQ frames read are place either in the
654             various message-building slots (see _recv_message), or place on the
655             general message queue.
656              
657             =cut
658              
659             sub _read_for_seq {
660             my $self = shift;
661             my $chno = shift;
662              
663             while (1) {
664             my $frame = $self->_recv_frame();
665              
666             if (not $frame) {
667             $self->abort("null frame detected");
668             return;
669             }
670              
671             # at this point, the SEQ has been processed. This just determines
672             # if we are done.
673             if ($frame->type() eq 'SEQ') {
674             return if not defined $chno;
675             return if $chno == $frame->channel_number();
676             next;
677             }
678              
679             # for other frames, we need to make sure that we pull completed
680             # messages off the build area and onto the queue.
681              
682             if ($frame->completes()) {
683              
684             my $channel = $self->channel($frame->channel_number());
685             my $message;
686              
687             if ($frame->type() eq 'ANS') {
688             $message = $channel->ans_message($frame->ansno());
689             $channel->ans_clear_message($frame->ansno());
690             }
691             else {
692             $message = $channel->message();
693             $channel->clear_message();
694             }
695              
696             $self->_queue_message($message);
697             }
698             }
699             }
700              
701             =item _recv_frame()
702              
703             This is an intermediate wrapper around C<_read_frame>. Essentially,
704             it reads a frame from the socket, then does a little bit of post
705             processing, finally returning that frame. The processing is: if it is
706             a SEQ frame, it updates the channels remote window size accordingly;
707             if it is some other frame, it adds it to the appropriate channel's
708             message building slot.
709              
710             It returns undef if the socket could not be read from. It returns 0
711             if the frame was on a non-existent channel.
712              
713             =cut
714              
715             sub _recv_frame {
716             my $self = shift;
717             my %args = @_;
718              
719             my $noseqs;
720             if ($args{NoSEQ}) {
721             $noseqs = 1;
722             }
723              
724             my $frame = $self->_read_frame();
725              
726             # NOTE: this should never actually happen: _read_frame should
727             # abort() instead of returning anything other than a valid frame.
728             if (! $frame) {
729             $self->abort("null frame received");
730             return;
731             }
732              
733             my $channel = $self->channel($frame->channel_number());
734              
735             if (not defined $channel) {
736             $self->abort("frame received on non-existent channel " .
737             $frame->channel_number());
738             return;
739             }
740              
741             # handle SEQ frames independently.
742             if ($frame->type() eq "SEQ") {
743             # calculate new remote window: That is the advertise window
744             # minus any bytes that we have already sent.
745             my $new_window = $frame->window() -
746             ($channel->seqno() - $frame->ackno());
747             $channel->remote_window($new_window);
748             }
749             # assemble message from (possibly) multiple (sequential) frames.
750             # the ANS collating case.
751             elsif ($frame->type() eq 'ANS') {
752             $channel->ans_message_add_frame($frame);
753             }
754             # the normal message collating case.
755             else {
756             $channel->message_add_frame($frame);
757             }
758              
759             # track our last seen seqno from the peer in the channel.
760             $channel->peer_seqno($frame->seqno() + $frame->size());
761              
762             # Emit a SEQ frame if we've actually read a frame with a payload.
763             if ($frame->size() > 0 && !$noseqs) {
764             # at the moment, we consider all frames to be "immediately"
765             # consumed, so we just emit a constant for the window size.
766             my $ackno = $channel->peer_seqno();
767              
768             $self->_send_seq($channel, $ackno);
769             }
770              
771             $frame;
772             }
773              
774             =item _send_seq($channel, $ackno)
775              
776             Send a SEQ for $channel_number to the peer. $ackno is the sequence
777             number to acknowledge. Generally this is the seqno of the frame this
778             is responding to, plus the size of the payload of that frame.
779              
780             =cut
781              
782             sub _send_seq {
783             my $self = shift;
784             my $channel = shift;
785             my $seqno = shift;
786              
787             my $seq_frame = new Net::BEEP::Lite::Frame
788             (Type => 'SEQ',
789             Channel => $channel->number(),
790             Ackno => $seqno,
791             Window => $channel->local_window(),
792             Debug => $self->{debug},
793             Trace => $self->{trace});
794              
795             $self->_write_frame($seq_frame);
796             }
797              
798             =item _read_frame()
799              
800             This is an internal method for reading a single frame from the
801             internal socket. It returns a C object.
802              
803             =cut
804              
805             sub _read_frame {
806             my $self = shift;
807              
808             my $sock = $self->{sock};
809              
810             my ($header, $read, $old_alarm_value);
811              
812             # set up an alarm handler for this method only.
813             local $SIG{ALRM} = sub { die "alarm timeout\n"; };
814              
815             # read the header.
816             eval {
817             $old_alarm_value = alarm($self->{idle_timeout});
818             $header = $sock->getline();
819             };
820             if ($@ and $@ =~ /^alarm timeout/io) {
821             $self->abort("idle timeout");
822             return;
823             } elsif ($@) {
824             die $@;
825             }
826             alarm($old_alarm_value);
827              
828             # FIXME: what does a null header mean?
829             if (!$header) {
830             $self->abort("null header detected (socket closed?)");
831             return;
832             }
833              
834             my $frame = Net::BEEP::Lite::Frame->new(Header => $header,
835             Debug => $self->{debug},
836             Trace => $self->{trace});
837              
838             # make sure the frame could be built \(i.e., known frame type, valid
839             # frame headers...\)
840             if (! $frame) {
841             $self->abort("invalid frame header: '$header'");
842             return;
843             }
844              
845             # if we have no payload (SEQ, NUL), then we are done.
846             return $frame if $frame->size() == 0 and ($frame->type() eq 'SEQ' or
847             $frame->type() eq 'NUL');
848              
849             # read the payload.
850              
851             # FIXME: the following construct is not ideal. While the loop seems
852             # necessary from a theoretical perspective (underlying read
853             # operations are not guaranteed to return with all things read), it
854             # is unknown if there is a real case where this read call would
855             # return early and yet be able to continue.
856              
857             # Also note that a timer is set (and probably should always be set)
858             # to (help) recover from cases where the frame size was incorrect
859             # and too large.
860             my $offset = 0;
861             my $buffer;
862             while (1) {
863             eval {
864             $old_alarm_value = alarm($self->{timeout});
865             $read = $sock->read($buffer, $frame->size(), $offset);
866             };
867             if ($@ and $@ =~ /^alarm timeout/) {
868             $self->abort("read operation timed out (invalid frame?)");
869             return;
870             } elsif ($@) {
871             die $@;
872             }
873             alarm($old_alarm_value);
874             last if ($read == 0 || $read == $frame->size());
875             $offset += $read;
876             }
877              
878             $frame->set_payload($buffer);
879              
880             # now read the trailer
881              
882             eval {
883             $old_alarm_value = alarm($self->{timeout});
884             $read = $sock->read($buffer, 5);
885             };
886             if ($@ and $@ =~ /^alarm timeout/) {
887             $self->abort("read operation timed out (invalid frame?)");
888             return;
889             } elsif ($@) {
890             die $@;
891             }
892             alarm($old_alarm_value);
893              
894             if ($buffer ne "END\r\n") {
895             $self->abort("invalid frame trailer for '$buffer'");
896             return;
897             }
898              
899             print STDERR "_read_frame: read frame:\n", $frame->to_string, "\n"
900             if $self->{trace};
901             $frame;
902             }
903              
904             =item _write_frame($frame)
905              
906             This is an internal routine for writing a single frame to the
907             internally held socket. $frame MUST be a C
908             object.
909              
910             =cut
911              
912             sub _write_frame {
913             my $self = shift;
914             my $frame = shift;
915              
916             my $sock = $self->{sock};
917              
918             $sock->print($frame->to_string());
919             $sock->flush();
920              
921             print STDERR "_write_frame: wrote frame:\n", $frame->to_string(), "\n"
922             if $self->{trace};
923             }
924              
925             sub _queue_message {
926             my $self = shift;
927             my $message = shift;
928              
929             push @{$self->{messages}}, $message;
930             }
931              
932             sub _dequeue_message {
933             my $self = shift;
934              
935             shift @{$self->{messages}};
936             }
937              
938             sub _is_connected {
939             my $self = shift;
940              
941             return ($self->{sock} && $self->{sock}->connected());
942             }
943              
944              
945             =pod
946              
947             =back
948              
949             =head1 SEE-ALSO
950              
951             =over 4
952              
953             =item L
954              
955             =item L
956              
957             =item L
958              
959             =item L
960              
961             =back
962              
963             =cut
964              
965             1;