File Coverage

blib/lib/POE/Event/Message.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1             # -*- Perl -*-
2             #
3             # File: POE/Event/Message.pm
4             # Desc: A generic messaging protocol - to use as a starting point
5             # Date: Mon Oct 10 10:35:59 2005
6             # Stat: Prototype, Experimental
7             # Note: Fix to remove message length limitation contributed
8             # by Martin Holste, Sun 21 Feb 2010
9             #
10             package POE::Event::Message;
11 3     3   82957 use 5.006;
  3         11  
  3         141  
12 3     3   21 use strict;
  3         12  
  3         113  
13 3     3   17 use warnings;
  3         10  
  3         203  
14              
15             our $PACK = __PACKAGE__;
16             our $VERSION = '0.11';
17             ### @ISA = qw( );
18              
19             # WARN: Don't use POE or the POE::Kernel classes here. This is a
20             # generic messaging class that can also be used outside of
21             # the POE environment. Support is included for sending and
22             # receiving messages across sockets (such as between a POE-
23             # based server process and a non-POE client process), and
24             # when sending messages across file handles (as in the case
25             # of a POE-based parent process communicating with a non-POE
26             # child process).
27             #
28              
29             ### POE::Kernel; # Don't use POE here!
30 3     3   1977 use POE::Event::Message::Header; # Header class wraps attributes
  3         10  
  3         103  
31             ### POE::Event::Message::Body; # ((not yet created))
32 3     3   1641 use POE::Filter::Reference; # Perl object filtering
  0            
  0            
33             use POE::Driver::SysRW; # sysread/syswrite driver
34              
35             my $HeaderClass = "POE::Event::Message::Header";
36             ## $BodyClass = "POE::Event::Message::Body"; # ((not yet created))
37             our $Driver = new POE::Driver::SysRW;
38             our $Filter = new POE::Filter::Reference;
39              
40             #-------------------------------------------------------------------------
41             # Message Creation (new message and reply message)
42             # Envelope method to create new message with body data
43              
44             sub new
45             { my($class,$header,$body) = @_;
46              
47             # This method allows for the following usage:
48             # . Creating a new message
49             # $message = $msgClass->new();
50             # $message = $msgClass->new( undef, $msgBody ); # see 'package' method
51             #
52             # . Creating a reply to an existing message
53             # $response = $msgClass->new( $message->header() ); # header obj
54             # $response = $msgClass->new( $message ); # message obj
55             #
56             if (ref($header) and $header =~ /=HASH/ and $header->can("new") ) {
57             my $msg = $header;
58             $header = $HeaderClass->new( $msg->header() ),
59              
60             } else {
61             $header = $HeaderClass->new( $header ),
62             }
63              
64             return( bless my $self = {
65             header => $header,
66             body => $body,
67             ## body => $BodyClass->new( $body ), # ((no BodyClass yet))
68             }, ref($class)||$class
69             );
70             }
71              
72             *envelope = \&package;
73              
74             sub package
75             { my($class,$msgBody) = @_;
76             return $class->new("", $msgBody); # create new message "envelope"
77             }
78              
79             #-----------------------------------------------------------------------
80             # Pass through methods for message header object
81             # and a method to get/set the message body
82              
83             sub set { $_[0]->{header} and $_[0]->{header}->set ( $_[1], $_[2] ) }
84             sub get { $_[0]->{header} and $_[0]->{header}->get ( $_[1] ) }
85             sub del { $_[0]->{header} and $_[0]->{header}->del ( $_[1] ) }
86             sub param { $_[0]->{header} and $_[0]->{header}->param ( $_[1], $_[2] ) }
87             sub setErr { $_[0]->{header} and $_[0]->{header}->setErr( $_[1], $_[2] ) }
88             sub status { $_[0]->{header} and $_[0]->{header}->status() }
89             sub stat { $_[0]->{header} and $_[0]->{header}->stat() }
90             sub err { $_[0]->{header} and $_[0]->{header}->err() }
91              
92             sub getMode { $_[0]->{header} and $_[0]->{header}->mode() }
93             sub setMode { $_[0]->{header} and $_[0]->{header}->mode( $_[1] ) }
94             sub delRouteBack { $_[0]->{header} and $_[0]->{header}->delRouteBack() }
95             sub delRouteTo { $_[0]->{header} and $_[0]->{header}->delRouteTo() }
96              
97             *delete = \&del;
98             *reset = \&del;
99             *inResponseToId = \&r2id;
100              
101             sub hasRouting { $_[0]->{header} and $_[0]->{header}->hasRouting() }
102             sub hasRouteTo { $_[0]->{header} and $_[0]->{header}->hasRouteTo() }
103             sub hasRouteBack { $_[0]->{header} and $_[0]->{header}->hasRouteBack() }
104              
105             *getRouting = \&hasRouting;
106             *getRouteTo = \&hasRouteTo;
107             *getRouteBack = \&hasRouteBack;
108              
109             sub nextRouteType { $_[0]->{header}->nextRouteType() if $_[0]->{header}}
110             sub nextRouteIsRemote { $_[0]->{header}->nextRouteIsRemote() if $_[0]->{header}}
111             sub nextRouteIsLocal { $_[0]->{header}->nextRouteIsLocal() if $_[0]->{header}}
112             sub nextRouteIsPost { $_[0]->{header}->nextRouteIsPost() if $_[0]->{header}}
113             sub nextRouteIsCall { $_[0]->{header}->nextRouteIsCall() if $_[0]->{header}}
114              
115             sub id { $_[0]->{header}->get('id') } # unique message ID
116             sub r2id { $_[0]->{header}->get('r2id')} # orig. unique message ID
117             sub header { $_[0]->{header} } # get only
118             sub body { $_[1] ? $_[0]->{body} = $_[1] : $_[0]->{body} } # get or set
119              
120             sub addRouteTo
121             { my($self, @args) = @_;
122             return undef unless $self->{header};
123             return $self->{header}->addRouteTo( @args );
124             }
125              
126             sub addRouteBack
127             { my($self, @args) = @_;
128             return undef unless $self->{header};
129             return $self->{header}->addRouteBack( @args );
130             }
131              
132             sub addRemoteRouteTo
133             { my($self, @args) = @_;
134             return undef unless $self->{header};
135             return $self->{header}->addRemoteRouteTo( @args );
136             }
137              
138             sub addRemoteRouteBack
139             { my($self, @args) = @_;
140             return undef unless $self->{header};
141             return $self->{header}->addRemoteRouteBack( @args );
142             }
143              
144             #-----------------------------------------------------------------------
145             # Message Routing and Auto-Replies
146             # This is an experimental interface to provide a flexible,
147             # semi-automatic message routing mechanism. It provides
148             # the following features as an alternative to "postbacks"
149             # and extends the concept somewhat.
150             #
151             # . immediate routing via "post" (queued, asynchronous) - post method
152             # . immediate routing via "call" (direct, synchronous) - call method
153             #
154             # . forward routing, delayed (via "post" or "call) - addRouteTo
155             # . return routing, delayed (via "post" or "call) - addRouteBack
156             # . combinations of these two
157             #
158             # . forward routing, delayed remote (via socket "write") -addRemoteRouteTo
159             # . return routing, delayed remote (via socket "write") -addRemoteRouteBack
160             # . combinations of these two
161             #
162             # In addition, this allows the "stacking" of both forward and
163             # return destinations. This is the basis of the "semi-automatic
164             # routing" feature. The "route" method DOES need to be called
165             # on a message, but the caller DOESN'T need to know anything
166             # about the next destination(s).
167             #
168             # Some experimentation and experience will determine whether
169             # these are useful as designed, or if changes are needed.
170             # The INTENT is that intermediate destinations can be added
171             # to interrupt the original routing, TEMPORARIALY redirecting
172             # a message, while STILL allowing it to return as intended.
173              
174             #---------------------------------------------
175             # Direct message routing, via "post" or "call"
176              
177             sub post { shift->_postOrCall( "post", @_ ) }
178             sub call { shift->_postOrCall( "call", @_ ) }
179              
180             sub _postOrCall # For immediate routing
181             { my($self, $mode, $session, $event, @args) = @_;
182              
183             if (! defined $INC{'POE/Kernel.pm'}) {
184             return $self->setErr(-1, "'POE::Kernel' module is not loaded in '_postOrCall' method of '$PACK'");
185             }
186              
187             $session ||= POE::Kernel->get_active_session()->ID();
188              
189             return unless ($session and $event);
190             $mode = lc $mode; # a mode arg?
191             $mode ||= lc $self->getMode(); # use prior setting?
192             $mode = "post" unless ($mode eq "call"); # default value!
193              
194             # NOTE that this changes the default behavior of POE's post/call.
195             # Here, arguments are bundled into list references so the result
196             # is "more compatible" with POE's postback/callback mechanism.
197             #
198             # Here, we ensure that the arguments passed here are sent to the
199             # target event handler as a list reference to be received in ARG0.
200             # The "$self" var (current message) is sent as the only element in
201             # a list reference to be received in ARG1. This allows "$self" to
202             # be "received" as if it was sent by the method where post/call was
203             # invoked. In addition, this maintains a compatible result with
204             # the "addRouteTo()/addRouteBack()" method, below. Clear as mud?
205             # This is either a good idea or really cludgy. Time will tell.
206             #
207             # All message events sent via "post()", "call()" or "route()" will
208             # result in ONLY two parameters received by the target event handler.
209              
210             # issue one of post/call
211              
212             my $argRef;
213              
214             if ( ($#args == 0) and (ref($args[0]) eq "ARRAY") ) {
215             $argRef = $args[0];
216             } else {
217             $argRef = [ @args ];
218             }
219              
220             # FIX: handle 'call' return here as done in sub '_autoRouting'
221              
222             if (! POE::Kernel->$mode( $session, $event, $argRef, [ $self ] ) ) {
223              
224             my $msg = "Error(1): POE_Kernel->$mode( $session, $event, [ $argRef ], [ $self ] )";
225              
226             $self->setErr(-1, "$msg: $! in '$PACK'");
227             ## warn $msg;
228             }
229              
230             # warn "DEBUG: Route: POE_Kernel->$mode( $session, $event, @args )\n";
231              
232             return;
233             }
234              
235             #---------------------------------------------
236             # Automatic message routing, via "post" or "call"
237              
238             *autoroute = \&route;
239             *autoRoute = \&route;
240              
241             sub route # forward, if possible, or reply
242             { my($self,@newArgs) = @_;
243              
244             # routeTo() until empty, then routeBack()
245             # 'post' returns: 1=routed, 0=nowhere to route
246             # 'call' returns: whatever 'called' method returns
247              
248             my(@list) = ();
249              
250             if ( $self->hasRouteTo() ) {
251             (@list) = $self->routeTo( @newArgs );
252             } elsif ( $self->hasRouteBack() ) {
253             (@list) = $self->routeBack( @newArgs );
254             }
255              
256             return( @list );
257             }
258              
259             #---------------------------------------------
260             # Forward message routing, via "post" or "call"
261              
262             *forward = \&routeTo;
263             *routeto = \&routeTo;
264              
265             sub routeTo # activate "auto-forward", if any
266             { my($self,@newArgs) = @_;
267              
268             my(@list) = $self->_autoRouting( "delRouteTo", @newArgs );
269             return( @list );
270             }
271              
272             #---------------------------------------------
273             # Return message routing, via "post" or "call"
274              
275             *reply = \&routeBack;
276             *routeback = \&routeBack;
277              
278             sub routeBack # activate "auto-reply", if any
279             { my($self,@newArgs) = @_;
280              
281             my(@list) = $self->_autoRouting( "delRouteBack", @newArgs );
282             return( @list );
283             }
284              
285             #---------------------------------------------
286             # Generic message routing method
287              
288             sub _autoRouting # activate "auto-routing," if any
289             { my($self,$type,@newArgs) = @_;
290              
291             # This "generic" method is intended to be called by
292             # either the "routeTo" or "routeBack" method, which
293             # in turn might be invoked by the "route" method.
294              
295             return undef unless $self->{header};
296              
297             #---------------------------------------------------------------
298             # Depending on the "$type" collect a "RouteTo" or a "RouteBack"
299             # Note: "@origArgs" is now included for 'delRouteTo' type.
300              
301             $type = "delRouteBack" unless $type eq "delRouteTo";
302              
303             my($host, $port, $mode, $session, $event, @origArgs) = $self->$type();
304              
305             ##warn "($host, $port, $mode, $session, $event, @origArgs)\n";
306              
307             # Must have something to post/call and somewhere to send it!
308             # FIX: differentiate between call when "no routing"
309             # and call when "incomplete routing" attributes.
310             #
311             if (! ($session and $event) ) {
312             ## warn "DEBUG: session='$session' event='$event'";
313             return;
314             }
315              
316             #---------------------------------------------------------------
317             # FIX: ToDo: complete remote routing via '$host', '$port'
318             # using the "write" method, below. First, fix up message such
319             # that, upon being received, a "route()" will forward it on.
320             #
321             ###if ($host or $port) {
322              
323             if ($port) {
324             $host ||= 'localhost';
325              
326             my $socket = $self->createSocket( $host, $port );
327             if (! $socket) {
328             my $err = "Error: connection to '${host}:$port' refused in '$PACK'";
329              
330             # FIX: Do we overwrite the original error message here? or not??
331             # YES. This is a better (clearer) message here for the user.
332             #
333             # $self->setErr(-1, $err) unless $self->status();
334             $self->setErr(-1, $err);
335              
336             return $self;
337             }
338              
339             my($stat,$err) = $self->write( $socket );
340             if ($stat) {
341             $self->setErr( $stat, $err );
342             return $self;
343             }
344              
345             my $response;
346              
347             # With a "call" or "sync" mode we expect an immediate
348             # response message from the server. With any other
349             # mode, such as "async", only a "successful send"
350             # response is generated here--this assumes that if
351             # an actual response message will be generated, an
352             # appropriate "addRemoteRouteBack" was added earlier.
353              
354             if (($mode eq "call") or ($mode eq "sync")) {
355             $response = $self->read( $socket );
356             } else {
357             # FIX: Should this be handled differently??
358             $response = $self->new( $self );
359             $response->setErr( 0, "message sent successfully" );
360             }
361              
362             $self->shutdownSocket();
363              
364             return $response;
365             }
366              
367             #---------------------------------------------------------------
368             # Determine whether we "post" (asych) or "call" (synch)
369              
370             $mode ||= lc $self->mode(); # use prior setting?
371             $mode = "post" unless ($mode eq "call"); # default value!
372              
373             #---------------------------------------------------------------
374             # If we're going to rely on POE, make sure it's available.
375              
376             if (! defined $INC{'POE/Kernel.pm'}) {
377             $self->setErr(-1, "'POE::Kernel' module is not loaded in '_autoRouting' method of '$PACK'");
378             return undef;
379             }
380              
381             #---------------------------------------------------------------
382             # This now works as does 'post/call', when using 'routeTo'.
383             # STRANGE SYNTAX here, but it allows for reasonably compatible
384             # syntax (and a plug-compatible result) to POE's "postback"
385             # mechanism.
386             #
387             # NOTE that "$self" is prepended to the "@args" list here! This
388             # allows "$self" (current message) to be "received" as if it was
389             # sent by the method where the 'post/call/postback' was invoked.
390             # --- This obviously needs some well documented examples. ---
391              
392             my (@args);
393             if ($type eq "delRouteTo") { # issue a post/call
394             # This is kind of funky usage but, at the moment, it
395             # seems to be "the right thing." Feedback is welcome.
396             # Currently we PREPEND "$self" (current message) to
397             # any "new args" when "route()" method is called.
398             #
399             # Adding "$self" AFTER the orig args and BEFORE added args
400             # is either really good or really cludgy. Time will tell.
401             #
402             (@args) = ( [ @origArgs ], [ $self, @newArgs ] );
403              
404             } else { # emulate postback
405             # Currently, these are the same. Should the above change?
406             #
407             (@args) = ( [ @origArgs ], [ $self, @newArgs ] );
408             }
409              
410             my(@list) = POE::Kernel->$mode( $session, $event, @args );
411              
412             if (! @list ) {
413              
414             if ($!) {
415             my $err = "Error(2): POE_Kernel->$mode( $session, $event, @args )";
416              
417             $self->setErr(-1, "$err: $! in '$PACK'");
418             ## warn $err;
419             }
420             }
421              
422             ## warn "DEBUG: Route: POE_Kernel->$mode( $session, $event, @args )\n";
423              
424             # Here we return the same result as the POE::Kernel
425             # . for 'post' return a boolean indicating success
426             # . for 'call' return what the called method returns
427             #
428             return( @list ) if ($mode eq 'call');
429             return 1;
430             }
431              
432             #-----------------------------------------------------------------------
433             # Reading and Writing messages, via POE::Filter::Reference,
434             # for use with file/socket handles. Note that the "$message"
435             # argument defaults to "$self" -- this allows for nice clean
436             # syntax in most situations: $message->write( $fh );
437              
438             *send = \&write;
439             *recv = \&read;
440              
441             sub write
442             { my($self,$fh,$message) = @_;
443              
444             (defined($fh) and length($fh)) or return (1,"nowhere to write");
445              
446             # Support class OR object method
447             $message = $self if ((! $message) and ref($self));
448             return (1,"nothing to write") unless $message;
449              
450             ## warn "DEBUG: write message='$message'\n";
451              
452             my $tmp = delete $self->{_socket_} if (ref $self);
453             ## warn "DEBUG fh='$fh' in 'write()'";
454              
455             $Driver->put( $Filter->put( [ $message ] ) ); # queue the message
456             $Driver->flush( $fh ); # send the message
457              
458             $! and warn "OUCH: Driver/flush failed: $!";
459             ## $! or warn "OKAY: Driver/flush succeeded";
460              
461             $self->{_socket_} = $tmp if ($tmp and ref $self);
462             return(0,""); # return success
463             }
464              
465             sub read
466             { my($self,$fh) = @_;
467              
468             ## warn "DEBUG: fh='$fh' in 'read()'";
469              
470             (defined($fh) and length($fh)) or return (1,"nowhere to read");
471              
472             my $response = $Driver->get( $fh ); # fetch a reply
473              
474             if (! $response) {
475             $response = $self->new();
476             $response->setErr(-1, "no response from server");
477             return $response;
478             }
479              
480             #-------------------------------------------------------------------
481             # Fix to remove message length limitation contributed
482             # by Martin Holste, Sun 21 Feb 2010:
483             # Parse expected length from the beginning of the first payload
484             unless (ref($response) eq 'ARRAY' and $response->[0] =~ /^(\d+)\0/){
485             $response = $self->new();
486             $response->setErr(-1, 'Error: Did not receive expected payload length at beginning of payload');
487             return $response;
488             }
489             my $expected_length = $1 + length($1) + 1;
490             my $received = length($response->[0]);
491             READ_LOOP: while ($received < $expected_length){
492             my $buf = $Driver->get($fh);
493             foreach (@$buf){
494             push @$response, $_;
495             $received += length($_);
496             }
497             }
498             #-------------------------------------------------------------------
499             # filter the reply, or report error at this line:
500             ($response) = @{ $Filter->get( $response ) }; my $line = __LINE__;
501              
502             if (($response) and ($response =~ /=HASH/) and ($response->can( "body" ))) {
503             # response is okay
504             } else {
505             my $body = $response ||"";
506             $response = $self->new();
507             $response->body( $body );
508             my $err = "Error: unknown message type in 'read' in '$PACK' at line $line";
509             $err .= "\nmessage = '$body'";
510             $err .= " (was message too big to send?)" if (! $body);
511             $response->setErr(-1, $err );
512             }
513              
514             return $response; # return the reply
515             }
516              
517             sub createSocket
518             { my($self,$host,$port) = @_;
519              
520             $host ||= 'localhost';
521             if (! $port) {
522             $self->setErr(-1, "Error: port is undefined in 'socket' in '$PACK'" );
523             return undef;
524              
525             } elsif (! defined $INC{'IO/Socket.pm'} ) {
526             $self->setErr(-1, "'IO::Socket' module is not loaded in 'socket' method of '$PACK'");
527             return undef;
528             }
529              
530             my $socket = IO::Socket::INET->new( 'PeerAddr' => $host,
531             'PeerPort' => $port,
532             'Proto' => 'tcp', );
533             if (! $socket) {
534             $self->setErr(-1, "Error: IO_Socket_INET failed in '$PACK': $!" );
535             return undef;
536             }
537              
538             $self->{_socket_} = $socket;
539             return $socket;
540             }
541              
542             sub getOpenSocket
543             { my($self) = @_;
544             return $self->{_socket_} || undef;
545             }
546              
547             sub shutdownSocket
548             { my($self, $socket) = @_;
549              
550             $socket ||= $self->getOpenSocket();
551              
552             $socket->shutdown(2); # proper socket etiquette:
553             $socket->close(); # a shutdown then a close.
554              
555             return;
556             }
557              
558             #-----------------------------------------------------------------------
559             # Somewhat useful for debugging ... replace with a better dumper.
560              
561             sub dump {
562             my($self)= @_;
563             my($pack,$file,$line)=caller();
564             my $text = "DEBUG: ($PACK\:\:dump)\n self='$self'\n";
565             $text .= "CALLER $pack at line $line\n ($file)\n";
566             my $value;
567             foreach my $param (sort keys %$self) {
568             $value = $self->{$param};
569             $value = $self->zeroStr( $value, "" ); # handles value of "0"
570             $text .= " $param = $value\n";
571             }
572             $text .= "_" x 25 ."\n";
573              
574             if (ref($self->{header})) {
575             $text .= "header:\n";
576             $text .= $self->{header}->dump("nohead");
577             }
578             return($text);
579             }
580              
581             sub zeroStr
582             { my($self,$value,$undef) = @_;
583             return $undef unless defined $value;
584             return "0" if (length($value) and ! $value);
585             return $value;
586             }
587             #_________________________
588             1; # Required by require()
589              
590             __END__